diff --git a/Makefile b/Makefile index 143b00a1..ba69acc4 100644 --- a/Makefile +++ b/Makefile @@ -25,6 +25,8 @@ help: @echo " make db-dev-down - 停止 Docker 开发数据库/Redis/Adminer" @echo " make api-dev-docker-db - 本地 API 连接 Docker 开发数据库 (15432)" @echo " make db-dev-status - 显示 Docker 开发数据库/Redis/Adminer 与 API 端口状态" + @echo " make metrics-check - 基础指标一致性校验 (/health vs /metrics)" + @echo " make seed-bcrypt-user - 插入一个 bcrypt 测试用户 (触发登录重哈希)" # 安装依赖 install: @@ -214,6 +216,18 @@ db-dev-status: @echo "🌿 /health:" @curl -fsS http://localhost:$${API_PORT:-8012}/health 2>/dev/null || echo "(API 未响应)" +# ---- Metrics & Dev Utilities ---- +metrics-check: + @echo "运行指标一致性脚本..." + @cd jive-api && ./scripts/check_metrics_consistency.sh || true + @echo "抓取 /metrics 关键行:" && curl -fsS http://localhost:$${API_PORT:-8012}/metrics | grep -E 'password_hash_|jive_build_info|export_requests_' || true + +seed-bcrypt-user: + @echo "插入 bcrypt 测试用户 (若不存在)..." + @cd jive-api && cargo run --bin hash_password --quiet -- 'TempBcrypt123!' >/dev/null 2>&1 || true + @psql $${DATABASE_URL:-postgresql://postgres:postgres@localhost:5433/jive_money} -c "DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM users WHERE email='bcrypt_test@example.com') THEN INSERT INTO users (email,password_hash,name,is_active,created_at,updated_at) VALUES ('bcrypt_test@example.com', crypt('TempBcrypt123!','bf'), 'Bcrypt Test', true, NOW(), NOW()); END IF; END $$;" 2>/dev/null || echo "⚠️ 需要本地 Postgres 运行 (5433)" + @echo "测试登录: curl -X POST -H 'Content-Type: application/json' -d '{\"email\":\"bcrypt_test@example.com\",\"password\":\"TempBcrypt123!\"}' http://localhost:$${API_PORT:-8012}/api/v1/auth/login" + # 代码格式化 format: @echo "格式化 Rust 代码..." diff --git a/README.md b/README.md index 207975c4..af36dec8 100644 --- a/README.md +++ b/README.md @@ -186,6 +186,76 @@ export JWT_SECRET=$(openssl rand -hex 32) 未设置时(或留空)API 会在开发 / 测试自动使用一个不安全的占位并打印警告,不可在生产依赖该默认值。 +### 监控与指标 (Metrics) + +| Endpoint | 用途 | 认证 | 备注 | +|-------------|-------------------|------|------| +| `/health` | 探活 + 快照 | 否 | 轻量 JSON:hash 分布、rehash 状态、汇率指标等 | +| `/metrics` | Prometheus 拉取 | 否 | 文本格式指标(适合长期监控) | + +规范指标(推荐使用): +``` +password_hash_bcrypt_total # bcrypt (2a+2b+2y) +password_hash_argon2id_total # argon2id 数量 +password_hash_unknown_total # 未识别前缀 +password_hash_total_count # 总数 +password_hash_bcrypt_variant{variant="2b"} X # 每个变体 +jive_password_rehash_total # 成功重哈希次数(bcrypt→argon2id) +jive_password_rehash_fail_total # 重哈希失败次数(不会阻断登录) +jive_password_rehash_fail_breakdown_total{cause="hash"|"update"} # 重哈希失败按原因 +export_requests_buffered_total # 缓冲导出请求次数(POST CSV/JSON) +export_requests_stream_total # 流式导出请求次数(GET CSV streaming, feature=export_stream) +export_rows_buffered_total # 缓冲导出累计行数 +export_rows_stream_total # 流式导出累计行数 +jive_build_info{...} # 构建信息 (value=1) +auth_login_fail_total # 登录失败(未知用户 / 密码不匹配) +auth_login_inactive_total # 非激活账号登录尝试 +auth_login_rate_limited_total # 登录被速率限制次数 (429) +jive_build_info{commit,time,rustc,version} 1 # 构建信息 gauge +export_duration_buffered_seconds_* # 缓冲导出耗时直方图 (bucket/sum/count) +export_duration_stream_seconds_* # 流式导出耗时直方图 (bucket/sum/count) +process_uptime_seconds # 进程运行时长(秒) +jive_build_info{commit,time,rustc,version} 1 # 构建信息 gauge +``` + +兼容旧指标(DEPRECATED,将在 2 个发布周期后移除,详见 docs/METRICS_DEPRECATION_PLAN.md): +``` +jive_password_hash_users{algo="bcrypt_2b"} +``` + +Prometheus 抓取示例: +```yaml +scrape_configs: + - job_name: jive-api + metrics_path: /metrics + scrape_interval: 15s + static_configs: + - targets: ["api-host:8012"] +``` + +一致性快速校验(bcrypt 聚合与 /metrics 是否匹配): +```bash +H=$(curl -s http://localhost:8012/health) +M=$(curl -s http://localhost:8012/metrics) +echo "Health bcrypt sum:" \ + $(echo "$H" | jq '.metrics.hash_distribution.bcrypt | (."2a"+."2b"+."2y")') +echo "Metrics bcrypt total:" \ + $(grep '^password_hash_bcrypt_total' <<<"$M" | awk '{print $2}') +``` + +运维建议: +- 大规模用户场景可为 hash 查询加 30s 内存缓存(计划中)。 +- 迁移所有看板后移除旧的 jive_password_hash_users* 系列(目标 v1.2.0)。 +- 监控 `jive_password_rehash_fail_total`,持续增长提示 DB 更新/并发异常。 +- 导出耗时直方图示例: +```promql +# P95 缓冲导出耗时 +histogram_quantile(0.95, sum(rate(export_duration_buffered_seconds_bucket[5m])) by (le)) + +# 最近 1 分钟流式导出平均耗时 +sum(rate(export_duration_stream_seconds_sum[1m])) / sum(rate(export_duration_stream_seconds_count[1m])) +``` + ### 密码重哈希(bcrypt → Argon2id) 登录成功后,如检测到旧 bcrypt 哈希,系统会在 `REHASH_ON_LOGIN` 未显式关闭时(默认开启)尝试透明升级为 Argon2id: @@ -523,3 +593,17 @@ MIT License ## 📞 联系 如有问题,请提交 Issue 或联系维护者。 +环境变量 (Metrics & 安全): +``` +AUTH_RATE_LIMIT=30/60 # 60 秒窗口内最多 30 次登录尝试 (默认 30/60) +AUTH_RATE_LIMIT_HASH_EMAIL=1 # 限流键中对 email 做哈希截断 (默认1) +ALLOW_PUBLIC_METRICS=1 # 设为 0 时启用白名单 +METRICS_ALLOW_CIDRS=127.0.0.1/32 # 逗号分隔 CIDR 列表 (ALLOW_PUBLIC_METRICS=0 生效) +METRICS_DENY_CIDRS= # 可选拒绝 CIDR (deny 优先) +METRICS_CACHE_TTL=30 # /metrics 缓存秒数 (0 禁用) +``` + +Grafana 仪表板: `docs/GRAFANA_DASHBOARD_TEMPLATE.json` +Alert 规则示例: `docs/ALERT_RULES_EXAMPLE.yaml` +安全清单: `docs/SECURITY_CHECKLIST.md` +快速验证脚本: `scripts/verify_observability.sh` diff --git a/docs/ALERT_RULES_EXAMPLE.yaml b/docs/ALERT_RULES_EXAMPLE.yaml new file mode 100644 index 00000000..d6f2c631 --- /dev/null +++ b/docs/ALERT_RULES_EXAMPLE.yaml @@ -0,0 +1,39 @@ +groups: + - name: jive-api-alerts + rules: + - alert: RehashFailBurst + expr: increase(jive_password_rehash_fail_total[10m]) > 0 + for: 5m + labels: + severity: warning + annotations: + summary: Password rehash failures detected + - alert: LoginFailSurge + expr: rate(auth_login_fail_total[5m]) > 3 * rate(auth_login_fail_total[30m]) + for: 10m + labels: + severity: warning + annotations: + summary: Sudden login failure surge + - alert: ExportLatencyHigh + expr: histogram_quantile(0.95,sum by (le)(rate(export_duration_buffered_seconds_bucket[5m]))) > 2 + for: 10m + labels: + severity: critical + annotations: + summary: Buffered export P95 latency >2s + - alert: RateLimitedSpike + expr: increase(auth_login_rate_limited_total[5m]) > 50 + for: 5m + labels: + severity: info + annotations: + summary: Many logins being rate-limited (possible attack) + - alert: ProcessRestarted + expr: increase(process_uptime_seconds[5m]) < 60 + for: 0m + labels: + severity: info + annotations: + summary: API process restarted recently + diff --git a/docs/GRAFANA_DASHBOARD_TEMPLATE.json b/docs/GRAFANA_DASHBOARD_TEMPLATE.json new file mode 100644 index 00000000..99fc0058 --- /dev/null +++ b/docs/GRAFANA_DASHBOARD_TEMPLATE.json @@ -0,0 +1,18 @@ +{ + "title": "Jive API Overview", + "panels": [ + {"type":"stat","title":"Uptime (h)","targets":[{"expr":"process_uptime_seconds/3600"}]}, + {"type":"stat","title":"Rehash Success","targets":[{"expr":"jive_password_rehash_total"}]}, + {"type":"stat","title":"Rehash Fail","targets":[{"expr":"jive_password_rehash_fail_total"}]}, + {"type":"graph","title":"Password Hash Distribution","targets":[{"expr":"password_hash_bcrypt_total"},{"expr":"password_hash_argon2id_total"}]}, + {"type":"graph","title":"Rehash Fail Breakdown","targets":[{"expr":"sum by (cause)(increase(jive_password_rehash_fail_breakdown_total[5m]))"}]}, + {"type":"graph","title":"Login Outcomes","targets":[{"expr":"rate(auth_login_fail_total[5m])"},{"expr":"rate(auth_login_inactive_total[5m])"},{"expr":"rate(auth_login_rate_limited_total[5m])"}]}, + {"type":"graph","title":"Export Requests","targets":[{"expr":"rate(export_requests_buffered_total[5m])"},{"expr":"rate(export_requests_stream_total[5m])"}]}, + {"type":"graph","title":"Export Rows","targets":[{"expr":"rate(export_rows_buffered_total[5m])"},{"expr":"rate(export_rows_stream_total[5m])"}]}, + {"type":"graph","title":"Buffered Export P95","targets":[{"expr":"histogram_quantile(0.95,sum by (le)(rate(export_duration_buffered_seconds_bucket[5m])))"}]}, + {"type":"graph","title":"Stream Export P95","targets":[{"expr":"histogram_quantile(0.95,sum by (le)(rate(export_duration_stream_seconds_bucket[5m])))"}]} + ], + "schemaVersion": 38, + "version": 1 +} + diff --git a/docs/METRICS_DEPRECATION_PLAN.md b/docs/METRICS_DEPRECATION_PLAN.md new file mode 100644 index 00000000..b5c96ca4 --- /dev/null +++ b/docs/METRICS_DEPRECATION_PLAN.md @@ -0,0 +1,54 @@ +# Metrics Deprecation Plan + +This document tracks deprecation and removal timelines for legacy metrics exposed by the API. + +## Principles +- Provide at least two released versions of overlap before removal. +- Never silently change a metric's semantic meaning; prefer adding a new metric. +- Document target removal version and migration path here + README. + +## Deprecated Metrics +| Metric | Status | Replacement | First Deprecated | Target Removal | Notes | +|--------|--------|-------------|------------------|----------------|-------| +| `jive_password_hash_users` (labels: bcrypt_2a,bcrypt_2b,bcrypt_2y,argon2id) | Deprecated | `password_hash_bcrypt_variant`, `password_hash_bcrypt_total`, `password_hash_argon2id_total` | v1.0.0 | v1.2.0 | Keep until majority dashboards migrated | +| `jive_password_rehash_fail_total` | Deprecated (aggregate) | `jive_password_rehash_fail_breakdown_total{cause}` | v1.0.X | v1.3.0 | Remove once dashboards use breakdown | + +## Active Canonical Metrics (Password Hash & Auth) +- `password_hash_bcrypt_total` +- `password_hash_argon2id_total` +- `password_hash_unknown_total` +- `password_hash_total_count` +- `password_hash_bcrypt_variant{variant="2a"|"2b"|"2y"}` +- `jive_password_rehash_total` +- `jive_password_rehash_fail_total` +- `auth_login_fail_total` +- `auth_login_inactive_total` + +## Export Metrics +- `export_requests_buffered_total` +- `export_requests_stream_total` +- `export_rows_buffered_total` +- `export_rows_stream_total` +- `export_duration_buffered_seconds_*` (histogram buckets/sum/count) +- `export_duration_stream_seconds_*` (histogram buckets/sum/count) + +## Build / Operational +- `jive_build_info{commit,time,rustc,version}` (value always 1) +- `process_uptime_seconds` + +## Future Candidates +| Proposed | Description | Status | +|----------|-------------|--------| +| `auth_login_fail_total` | Count failed login attempts (unauthorized) | Planned | +| `export_duration_seconds` (histogram) | Latency of export operations | Planned | +| `process_uptime_seconds` | Seconds since process start | Implemented | + +## Removal Procedure +1. Mark metric here and in README as DEPRECATED with target version. +2. Announce in release notes for two consecutive releases. +3. After reaching target version, remove metric exposition code; update this file. +4. Provide simple one-shot conversion guidance for dashboards. + +## Changelog +- v1.0.0: Introduced canonical password hash metrics + export metrics; deprecated legacy `jive_password_hash_users`. +- v1.0.X: Added login fail/inactive counters; export duration histograms; uptime gauge. diff --git a/docs/PR_SECURITY_METRICS_SUMMARY.md b/docs/PR_SECURITY_METRICS_SUMMARY.md new file mode 100644 index 00000000..78eeabb7 --- /dev/null +++ b/docs/PR_SECURITY_METRICS_SUMMARY.md @@ -0,0 +1,58 @@ +## PR Security & Metrics Summary (Template) + +### Overview +This PR strengthens API security and observability. Copy & adapt sections below for the final PR description. + +### Key Changes +- Login rate limiting (IP + email key) with structured 429 JSON and `Retry-After` header. +- Metrics endpoint CIDR allow + deny lists (`ALLOW_PUBLIC_METRICS=0`, `METRICS_ALLOW_CIDRS`, `METRICS_DENY_CIDRS`). +- Password rehash failure breakdown: `jive_password_rehash_fail_breakdown_total{cause="hash"|"update"}`. +- Export performance histograms (buffered & streaming) and uptime metric. +- New security / monitoring docs: Grafana dashboard, alert rules, security checklist. +- Email-based rate limit key hashing (first 8 hex of SHA256) for privacy. + +### New / Modified Environment Variables +| Variable | Purpose | Default | +|----------|---------|---------| +| `AUTH_RATE_LIMIT` | Login attempts per window (N/SECONDS) | `30/60` | +| `AUTH_RATE_LIMIT_HASH_EMAIL` | Hash email in key (privacy) | `1` | +| `ALLOW_PUBLIC_METRICS` | If `0`, restrict metrics by CIDR | `1` | +| `METRICS_ALLOW_CIDRS` | Comma CIDR whitelist | `127.0.0.1/32` | +| `METRICS_DENY_CIDRS` | Comma CIDR deny (priority) | (empty) | +| `METRICS_CACHE_TTL` | Metrics base cache seconds | `30` | + +### Prometheus Metrics Added +| Metric | Type | Notes | +|--------|------|-------| +| `auth_login_rate_limited_total` | counter | Rate-limited login attempts | +| `jive_password_rehash_fail_breakdown_total{cause}` | counter | Split hash/update failures | +| `export_duration_buffered_seconds_*` | histogram | Export latency (buffered) | +| `export_duration_stream_seconds_*` | histogram | Export latency (stream) | +| `process_uptime_seconds` | gauge | Runtime age | + +Deprecated (pending removal): `jive_password_rehash_fail_total` (aggregate). + +### Quick Local Verification +Run stack (example): +```bash +ALLOW_PUBLIC_METRICS=1 AUTH_RATE_LIMIT=3/60 cargo run --bin jive-api & +sleep 2 +./scripts/verify_observability.sh +``` + +Expect PASS output and non-zero counters for `auth_login_fail_total` after simulated attempts. + +### Reviewer Checklist +- [ ] 429 login response includes `Retry-After` and JSON structure +- [ ] `/metrics` reachable only when expected (toggle ALLOW_PUBLIC_METRICS) +- [ ] Rehash breakdown metrics appear +- [ ] Export histogram buckets present +- [ ] Uptime metric increasing across scrapes +- [ ] Security checklist file present (`docs/SECURITY_CHECKLIST.md`) + +### Follow-up (Optional / Tracked) +- Audit logging for repeated rate-limit triggers +- Global unified error response model +- Redis/distributed rate limiting for multi-instance scaling +- Remove deprecated rehash aggregate metric (target v1.3.0) + diff --git a/docs/SECURITY_CHECKLIST.md b/docs/SECURITY_CHECKLIST.md new file mode 100644 index 00000000..03e0b009 --- /dev/null +++ b/docs/SECURITY_CHECKLIST.md @@ -0,0 +1,27 @@ +## Production Security Checklist + +1. Secrets + - Set strong `JWT_SECRET` (>=32 random bytes). Never use dev default. +2. Metrics Exposure + - `ALLOW_PUBLIC_METRICS=0` + - Restrict `METRICS_ALLOW_CIDRS` to monitoring network. +3. Rate Limiting + - Tune `AUTH_RATE_LIMIT` (e.g. 20/60 or 50/300 based on traffic). + - Keep `AUTH_RATE_LIMIT_HASH_EMAIL=1` to avoid leaking raw emails in memory keys. +4. TLS / Reverse Proxy + - Terminate TLS at trusted proxy; strip untrusted `X-Forwarded-For`. +5. Logging + - Ensure logs exclude plaintext passwords/tokens. + - Monitor `auth_login_rate_limited_total` + `auth_login_fail_total` anomalies. +6. Password Migration + - Track reduction of bcrypt via `password_hash_bcrypt_total` trend. + - Investigate any spike in `jive_password_rehash_fail_breakdown_total{cause}`. +7. Export Controls + - Consider pagination/stream for large exports; watch P95 latency panels. +8. Dependency Hygiene + - Run `cargo deny` (already in CI) before release. +9. Database + - Use least-privilege DB role for API. +10. Incident Response + - Create alerts using `docs/ALERT_RULES_EXAMPLE.yaml` as baseline. + diff --git a/jive-api/Cargo.lock b/jive-api/Cargo.lock index 3958c59b..be908b0d 100644 --- a/jive-api/Cargo.lock +++ b/jive-api/Cargo.lock @@ -1821,6 +1821,7 @@ dependencies = [ "rust_decimal", "serde", "serde_json", + "sha2", "sqlx", "thiserror 2.0.16", "tokio", diff --git a/jive-api/Cargo.toml b/jive-api/Cargo.toml index 088cd388..df5e44d5 100644 --- a/jive-api/Cargo.toml +++ b/jive-api/Cargo.toml @@ -4,6 +4,7 @@ version = "1.0.0" edition = "2021" authors = ["Jive Money Team"] description = "Jive Money API Server for category template management" +build = "build.rs" [lib] name = "jive_money_api" @@ -44,6 +45,7 @@ base64 = "0.22" # Make core optional; gate usage behind feature `core_export` jive-core = { path = "../jive-core", package = "jive-core", features = ["server", "db"], default-features = false, optional = true } bytes = "1" +sha2 = "0.10" # WebSocket支持 tokio-tungstenite = "0.24" diff --git a/jive-api/PR47_METRICS_VERIFICATION_REPORT.md b/jive-api/PR47_METRICS_VERIFICATION_REPORT.md index 17b3ef9d..95d4b248 100644 --- a/jive-api/PR47_METRICS_VERIFICATION_REPORT.md +++ b/jive-api/PR47_METRICS_VERIFICATION_REPORT.md @@ -252,15 +252,27 @@ curl -s http://localhost:8014/health | jq '.metrics' 6. **✅ Consistency Validation**: Perfect consistency between `/health` and `/metrics` endpoints 7. **✅ Code Quality**: Meets project standards with only minor cosmetic warnings -### Verified Metrics Available -- `jive_password_rehash_total` - Counter of successful bcrypt→argon2id rehashes -- `jive_password_hash_users{algo="bcrypt_2a|2b|2y|argon2id"}` - User count by hash type - -### Next Actions (Optional) -1. Add monitoring documentation to README -2. Create consistency verification scripts +### Verified Metrics Available (Updated Post PR #48 Plan) +Canonical (new) metrics: +- `password_hash_bcrypt_total` – Users with any bcrypt variant (2a+2b+2y) +- `password_hash_argon2id_total` – Users with argon2id hashes +- `password_hash_unknown_total` – Users whose hash prefix not in (2a,2b,2y,argon2id) +- `password_hash_total_count` – Total users counted +- `password_hash_bcrypt_variant{variant="2a|2b|2y"}` – Per-variant bcrypt counts +- `jive_password_rehash_total` – Successful bcrypt→argon2id rehash counter + +Legacy (DEPRECATED – retained temporarily for dashboards): +- `jive_password_hash_users{algo="bcrypt_2a|bcrypt_2b|bcrypt_2y|argon2id"}` + +Deprecation Notice: legacy `jive_password_hash_users` will be removed after dashboards migrate to canonical metrics (target: two release cycles). Monitor usage before removal. + +### Next Actions (Optional / In Progress) +1. Add monitoring documentation to README (IN PROGRESS) +2. Create consistency verification scripts (IN PROGRESS) 3. Configure Prometheus scraping for production 4. Test rehash counter during actual password changes +5. Migrate dashboards from legacy to canonical metrics +6. Decide removal date for legacy metrics (propose: +2 releases) ### Final Status: 🎯 COMPLETE SUCCESS **All verification requirements fulfilled. PR #47 is production-ready.** @@ -268,4 +280,4 @@ curl -s http://localhost:8014/health | jq '.metrics' --- *Final report completed: 2025-09-26T01:16:00Z* *Runtime testing completed: 2025-09-26T01:15:30Z* -*Merge verified: 2025-09-26T01:10:47Z* \ No newline at end of file +*Merge verified: 2025-09-26T01:10:47Z* diff --git a/jive-api/build.rs b/jive-api/build.rs new file mode 100644 index 00000000..8621988f --- /dev/null +++ b/jive-api/build.rs @@ -0,0 +1,31 @@ +use std::process::Command; +use std::time::{SystemTime, UNIX_EPOCH}; + +fn main() { + // Git commit (short) + let commit = Command::new("git") + .args(["rev-parse", "--short", "HEAD"]) + .output() + .ok() + .and_then(|o| String::from_utf8(o.stdout).ok()) + .map(|s| s.trim().to_string()) + .unwrap_or_else(|| "unknown".into()); + println!("cargo:rustc-env=GIT_COMMIT={}", commit); + + // Build time (UTC RFC3339) + let ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + println!("cargo:rustc-env=BUILD_TIME={}", ts); + + // Rustc version + let rustc = Command::new("rustc") + .arg("-V") + .output() + .ok() + .and_then(|o| String::from_utf8(o.stdout).ok()) + .map(|s| s.trim().to_string()) + .unwrap_or_else(|| "rustc unknown".into()); + println!("cargo:rustc-env=RUSTC_VERSION={}", rustc); +} diff --git a/jive-api/src/handlers/auth.rs b/jive-api/src/handlers/auth.rs index 7c262b08..6fdb1775 100644 --- a/jive-api/src/handlers/auth.rs +++ b/jive-api/src/handlers/auth.rs @@ -248,9 +248,8 @@ pub async fn login( .map_err(|e| ApiError::DatabaseError(e.to_string()))? } .ok_or_else(|| { - if cfg!(debug_assertions) { - println!("DEBUG[login]: user not found for input={}", &login_input); - } + if cfg!(debug_assertions) { println!("DEBUG[login]: user not found for input={}", &login_input); } + state.metrics.increment_login_fail(); ApiError::Unauthorized })?; @@ -280,9 +279,8 @@ pub async fn login( // 检查用户状态 if !user.is_active { - if cfg!(debug_assertions) { - println!("DEBUG[login]: user inactive: {}", user.email); - } + if cfg!(debug_assertions) { println!("DEBUG[login]: user inactive: {}", user.email); } + state.metrics.increment_login_inactive(); return Err(ApiError::Forbidden); } @@ -308,11 +306,7 @@ pub async fn login( .unwrap_or(true); if hash.starts_with("$argon2") { - let parsed_hash = PasswordHash::new(hash).map_err(|e| { - #[cfg(debug_assertions)] - println!("DEBUG[login]: failed to parse Argon2 hash: {:?}", e); - ApiError::InternalServerError - })?; + let parsed_hash = PasswordHash::new(hash).map_err(|e| { #[cfg(debug_assertions)] println!("DEBUG[login]: failed to parse Argon2 hash: {:?}", e); state.metrics.increment_login_fail(); ApiError::InternalServerError })?; let argon2 = Argon2::default(); argon2 .verify_password(req.password.as_bytes(), &parsed_hash) @@ -320,9 +314,7 @@ pub async fn login( } else if hash.starts_with("$2") { // bcrypt format ($2a$, $2b$, $2y$) let ok = bcrypt::verify(&req.password, hash).unwrap_or(false); - if !ok { - return Err(ApiError::Unauthorized); - } + if !ok { state.metrics.increment_login_fail(); return Err(ApiError::Unauthorized); } if enable_rehash { // Password rehash: transparently upgrade bcrypt to Argon2id on successful login @@ -340,27 +332,23 @@ pub async fn login( .await { tracing::warn!(user_id=%user.id, error=?e, "password rehash failed"); + // 记录重哈希失败次数 + state.metrics.increment_rehash_fail(); + state.metrics.inc_rehash_fail_update(); } else { tracing::debug!(user_id=%user.id, "password rehash succeeded: bcrypt→argon2id"); // Increment rehash metrics state.metrics.increment_rehash(); } } - Err(e) => { - tracing::warn!(user_id=%user.id, error=?e, "failed to generate Argon2id hash") - } + Err(e) => { tracing::warn!(user_id=%user.id, error=?e, "failed to generate Argon2id hash"); state.metrics.increment_rehash_fail(); state.metrics.inc_rehash_fail_hash(); } } } } else { // Unknown format: try Argon2 parse as best-effort, otherwise unauthorized match PasswordHash::new(hash) { - Ok(parsed) => { - let argon2 = Argon2::default(); - argon2 - .verify_password(req.password.as_bytes(), &parsed) - .map_err(|_| ApiError::Unauthorized)?; - } - Err(_) => return Err(ApiError::Unauthorized), + Ok(parsed) => { let argon2 = Argon2::default(); argon2.verify_password(req.password.as_bytes(), &parsed).map_err(|_| { state.metrics.increment_login_fail(); ApiError::Unauthorized })?; } + Err(_) => { state.metrics.increment_login_fail(); return Err(ApiError::Unauthorized); } } } diff --git a/jive-api/src/handlers/transactions.rs b/jive-api/src/handlers/transactions.rs index eaf92806..e1114d25 100644 --- a/jive-api/src/handlers/transactions.rs +++ b/jive-api/src/handlers/transactions.rs @@ -70,9 +70,28 @@ fn csv_escape_cell(mut s: String, delimiter: char) -> String { } } use crate::models::permission::Permission; -use crate::services::context::ServiceContext; use crate::services::{AuditService, AuthService}; +// Shared query builder for multiple export paths +fn build_transactions_export_query( + mut base: QueryBuilder<'static, sqlx::Postgres>, + family_id: Uuid, + account_id: Option, + ledger_id: Option, + category_id: Option, + start_date: Option, + end_date: Option, +) -> QueryBuilder<'static, sqlx::Postgres> { + base.push_bind(family_id); + if let Some(v) = account_id { base.push(" AND t.account_id = "); base.push_bind(v); } + if let Some(v) = ledger_id { base.push(" AND t.ledger_id = "); base.push_bind(v); } + if let Some(v) = category_id { base.push(" AND t.category_id = "); base.push_bind(v); } + if let Some(v) = start_date { base.push(" AND t.transaction_date >= "); base.push_bind(v); } + if let Some(v) = end_date { base.push(" AND t.transaction_date <= "); base.push_bind(v); } + base.push(" ORDER BY t.transaction_date DESC, t.id DESC"); + base +} + /// 导出交易请求 #[derive(Debug, Deserialize)] pub struct ExportTransactionsRequest { @@ -88,11 +107,12 @@ pub struct ExportTransactionsRequest { /// 导出交易(返回 data:URL 形式的下载链接,避免服务器存储文件) pub async fn export_transactions( - State(pool): State, + State(state): State, claims: Claims, headers: HeaderMap, Json(req): Json, ) -> ApiResult { + let pool = &state.pool; let user_id = claims.user_id()?; // 验证 JWT,提取用户ID let family_id = claims .family_id @@ -114,45 +134,30 @@ pub async fn export_transactions( ))); } - // 复用列表查询的过滤条件(限定在当前家庭) - let mut query = QueryBuilder::new( - "SELECT t.id, t.account_id, t.ledger_id, t.amount, t.transaction_type, t.transaction_date, \ - t.category_id, c.name as category_name, t.payee_id, p.name as payee_name, \ - t.description, t.notes \ - FROM transactions t \ - JOIN ledgers l ON t.ledger_id = l.id \ - LEFT JOIN categories c ON t.category_id = c.id \ - LEFT JOIN payees p ON t.payee_id = p.id \ - WHERE t.deleted_at IS NULL AND l.family_id = " + // 构建统一查询 + let mut query = build_transactions_export_query( + QueryBuilder::new( + "SELECT t.id, t.account_id, t.ledger_id, t.amount, t.transaction_type, t.transaction_date, \ + t.category_id, c.name as category_name, t.payee_id, p.name as payee_name, \ + t.description, t.notes \ + FROM transactions t \ + JOIN ledgers l ON t.ledger_id = l.id \ + LEFT JOIN categories c ON t.category_id = c.id \ + LEFT JOIN payees p ON t.payee_id = p.id \ + WHERE t.deleted_at IS NULL AND l.family_id = " + ), + ctx.family_id, + req.account_id, + req.ledger_id, + req.category_id, + req.start_date, + req.end_date, ); - query.push_bind(ctx.family_id); - - if let Some(account_id) = req.account_id { - query.push(" AND t.account_id = "); - query.push_bind(account_id); - } - if let Some(ledger_id) = req.ledger_id { - query.push(" AND t.ledger_id = "); - query.push_bind(ledger_id); - } - if let Some(category_id) = req.category_id { - query.push(" AND t.category_id = "); - query.push_bind(category_id); - } - if let Some(start_date) = req.start_date { - query.push(" AND t.transaction_date >= "); - query.push_bind(start_date); - } - if let Some(end_date) = req.end_date { - query.push(" AND t.transaction_date <= "); - query.push_bind(end_date); - } - - query.push(" ORDER BY t.transaction_date DESC, t.id DESC"); + let start_time = std::time::Instant::now(); let rows = query .build() - .fetch_all(&pool) + .fetch_all(pool) .await .map_err(|e| ApiError::DatabaseError(format!("查询交易失败: {}", e)))?; @@ -228,6 +233,12 @@ pub async fn export_transactions( resp_headers.insert("x-audit-id", aid.to_string().parse().unwrap()); } + // 指标:缓冲 JSON 导出(这里与 CSV 共享缓冲计数器逻辑) + state.metrics.inc_export_request_buffered(); + state + .metrics + .add_export_rows_buffered(items.len() as u64); + state.metrics.observe_export_duration_buffered(start_time.elapsed().as_secs_f64()); return Ok(( resp_headers, Json(serde_json::json!({ @@ -381,6 +392,12 @@ pub async fn export_transactions( resp_headers.insert("x-audit-id", aid.to_string().parse().unwrap()); } + // 指标:缓冲 CSV 导出 + state.metrics.inc_export_request_buffered(); + state + .metrics + .add_export_rows_buffered(count_for_audit as u64); + state.metrics.observe_export_duration_buffered(start_time.elapsed().as_secs_f64()); // Also mirror audit id in the JSON for POST CSV Ok(( resp_headers, @@ -397,11 +414,12 @@ pub async fn export_transactions( /// 流式 CSV 下载(更适合浏览器原生下载) pub async fn export_transactions_csv_stream( - State(pool): State, + State(state): State, claims: Claims, headers: HeaderMap, Query(q): Query, ) -> ApiResult { + let pool = &state.pool; let user_id = claims.user_id()?; let family_id = claims .family_id @@ -414,39 +432,24 @@ pub async fn export_transactions_csv_stream( ctx.require_permission(Permission::ExportData) .map_err(|_| ApiError::Forbidden)?; - // 复用查询逻辑(与 JSON/CSV data:URL 相同条件,限定家庭) - let mut query = QueryBuilder::new( - "SELECT t.id, t.account_id, t.ledger_id, t.amount, t.transaction_type, t.transaction_date, \ - t.category_id, c.name as category_name, t.payee_id, p.name as payee_name, \ - t.description, t.notes \ - FROM transactions t \ - JOIN ledgers l ON t.ledger_id = l.id \ - LEFT JOIN categories c ON t.category_id = c.id \ - LEFT JOIN payees p ON t.payee_id = p.id \ - WHERE t.deleted_at IS NULL AND l.family_id = " + let mut query = build_transactions_export_query( + QueryBuilder::new( + "SELECT t.id, t.account_id, t.ledger_id, t.amount, t.transaction_type, t.transaction_date, \ + t.category_id, c.name as category_name, t.payee_id, p.name as payee_name, \ + t.description, t.notes \ + FROM transactions t \ + JOIN ledgers l ON t.ledger_id = l.id \ + LEFT JOIN categories c ON t.category_id = c.id \ + LEFT JOIN payees p ON t.payee_id = p.id \ + WHERE t.deleted_at IS NULL AND l.family_id = " + ), + ctx.family_id, + q.account_id, + q.ledger_id, + q.category_id, + q.start_date, + q.end_date, ); - query.push_bind(ctx.family_id); - if let Some(account_id) = q.account_id { - query.push(" AND t.account_id = "); - query.push_bind(account_id); - } - if let Some(ledger_id) = q.ledger_id { - query.push(" AND t.ledger_id = "); - query.push_bind(ledger_id); - } - if let Some(category_id) = q.category_id { - query.push(" AND t.category_id = "); - query.push_bind(category_id); - } - if let Some(start_date) = q.start_date { - query.push(" AND t.transaction_date >= "); - query.push_bind(start_date); - } - if let Some(end_date) = q.end_date { - query.push(" AND t.transaction_date <= "); - query.push_bind(end_date); - } - query.push(" ORDER BY t.transaction_date DESC, t.id DESC"); // When export_stream feature enabled, stream rows instead of buffering entire CSV #[cfg(feature = "export_stream")] @@ -460,7 +463,9 @@ pub async fn export_transactions_csv_stream( let built_query = query.build(); let sql = built_query.sql().to_owned(); let pool_clone = pool.clone(); + let metrics = state.metrics.clone(); tokio::spawn(async move { + let start_time = std::time::Instant::now(); // Execute the raw SQL query using sqlx::raw_sql let mut stream = sqlx::raw_sql(&sql).fetch(&pool_clone); // Header @@ -475,6 +480,7 @@ pub async fn export_transactions_csv_stream( return; } } + let mut rows_counter: u64 = 0; while let Some(item) = stream.next().await { match item { Ok(row) => { @@ -503,6 +509,7 @@ pub async fn export_transactions_csv_stream( csv_escape_cell(payee.clone().unwrap_or_default(), ','), csv_escape_cell(ttype, ',') ); + rows_counter += 1; if tx.send(Ok(bytes::Bytes::from(line))).await.is_err() { return; } @@ -513,6 +520,10 @@ pub async fn export_transactions_csv_stream( } } } + // 发送完成后更新流式导出指标 + metrics.inc_export_request_stream(); + metrics.add_export_rows_stream(rows_counter); + metrics.observe_export_duration_stream(start_time.elapsed().as_secs_f64()); }); let byte_stream = ReceiverStream::new(rx).map(|r| match r { Ok(b) => Ok::<_, ApiError>(b), @@ -543,7 +554,7 @@ pub async fn export_transactions_csv_stream( // Execute fully and build CSV body when streaming disabled let rows_all = query .build() - .fetch_all(&pool) + .fetch_all(pool) .await .map_err(|e| ApiError::DatabaseError(format!("查询交易失败: {}", e)))?; #[cfg(feature = "core_export")] @@ -658,7 +669,7 @@ pub async fn export_transactions_csv_stream( } let estimated_count: i64 = count_q .build() - .fetch_one(&pool) + .fetch_one(pool) .await .ok() .and_then(|row| row.try_get::("c").ok()) diff --git a/jive-api/src/lib.rs b/jive-api/src/lib.rs index e0192637..506857a3 100644 --- a/jive-api/src/lib.rs +++ b/jive-api/src/lib.rs @@ -26,6 +26,34 @@ pub struct AppState { #[derive(Clone)] pub struct AppMetrics { pub rehash_count: Arc, + pub rehash_fail_count: Arc, + pub export_request_stream_count: Arc, + pub export_request_buffered_count: Arc, + pub export_rows_stream: Arc, + pub export_rows_buffered: Arc, + pub auth_login_fail_count: Arc, + pub auth_login_inactive_count: Arc, + // Export duration histogram (buffered) + pub export_dur_buf_le_005: Arc, + pub export_dur_buf_le_02: Arc, + pub export_dur_buf_le_1: Arc, + pub export_dur_buf_le_3: Arc, + pub export_dur_buf_le_10: Arc, + pub export_dur_buf_le_inf: Arc, + pub export_dur_buf_sum_ns: Arc, + pub export_dur_buf_count: Arc, + // Export duration histogram (stream) + pub export_dur_stream_le_005: Arc, + pub export_dur_stream_le_02: Arc, + pub export_dur_stream_le_1: Arc, + pub export_dur_stream_le_3: Arc, + pub export_dur_stream_le_10: Arc, + pub export_dur_stream_le_inf: Arc, + pub export_dur_stream_sum_ns: Arc, + pub export_dur_stream_count: Arc, + pub rehash_fail_hash: Arc, + pub rehash_fail_update: Arc, + pub auth_login_rate_limited: Arc, } impl Default for AppMetrics { @@ -38,6 +66,32 @@ impl AppMetrics { pub fn new() -> Self { Self { rehash_count: Arc::new(AtomicU64::new(0)), + rehash_fail_count: Arc::new(AtomicU64::new(0)), + export_request_stream_count: Arc::new(AtomicU64::new(0)), + export_request_buffered_count: Arc::new(AtomicU64::new(0)), + export_rows_stream: Arc::new(AtomicU64::new(0)), + export_rows_buffered: Arc::new(AtomicU64::new(0)), + auth_login_fail_count: Arc::new(AtomicU64::new(0)), + auth_login_inactive_count: Arc::new(AtomicU64::new(0)), + export_dur_buf_le_005: Arc::new(AtomicU64::new(0)), + export_dur_buf_le_02: Arc::new(AtomicU64::new(0)), + export_dur_buf_le_1: Arc::new(AtomicU64::new(0)), + export_dur_buf_le_3: Arc::new(AtomicU64::new(0)), + export_dur_buf_le_10: Arc::new(AtomicU64::new(0)), + export_dur_buf_le_inf: Arc::new(AtomicU64::new(0)), + export_dur_buf_sum_ns: Arc::new(AtomicU64::new(0)), + export_dur_buf_count: Arc::new(AtomicU64::new(0)), + export_dur_stream_le_005: Arc::new(AtomicU64::new(0)), + export_dur_stream_le_02: Arc::new(AtomicU64::new(0)), + export_dur_stream_le_1: Arc::new(AtomicU64::new(0)), + export_dur_stream_le_3: Arc::new(AtomicU64::new(0)), + export_dur_stream_le_10: Arc::new(AtomicU64::new(0)), + export_dur_stream_le_inf: Arc::new(AtomicU64::new(0)), + export_dur_stream_sum_ns: Arc::new(AtomicU64::new(0)), + export_dur_stream_count: Arc::new(AtomicU64::new(0)), + rehash_fail_hash: Arc::new(AtomicU64::new(0)), + rehash_fail_update: Arc::new(AtomicU64::new(0)), + auth_login_rate_limited: Arc::new(AtomicU64::new(0)), } } @@ -48,6 +102,83 @@ impl AppMetrics { pub fn get_rehash_count(&self) -> u64 { self.rehash_count.load(Ordering::Relaxed) } + + pub fn increment_rehash_fail(&self) { + self.rehash_fail_count.fetch_add(1, Ordering::Relaxed); + } + pub fn get_rehash_fail(&self) -> u64 { self.rehash_fail_count.load(Ordering::Relaxed) } + + pub fn inc_export_request_stream(&self) { self.export_request_stream_count.fetch_add(1, Ordering::Relaxed); } + pub fn inc_export_request_buffered(&self) { self.export_request_buffered_count.fetch_add(1, Ordering::Relaxed); } + pub fn add_export_rows_stream(&self, n: u64) { self.export_rows_stream.fetch_add(n, Ordering::Relaxed); } + pub fn add_export_rows_buffered(&self, n: u64) { self.export_rows_buffered.fetch_add(n, Ordering::Relaxed); } + pub fn get_export_counts(&self) -> (u64,u64,u64,u64) { + ( + self.export_request_stream_count.load(Ordering::Relaxed), + self.export_request_buffered_count.load(Ordering::Relaxed), + self.export_rows_stream.load(Ordering::Relaxed), + self.export_rows_buffered.load(Ordering::Relaxed), + ) + } + + pub fn increment_login_fail(&self) { self.auth_login_fail_count.fetch_add(1, Ordering::Relaxed); } + pub fn increment_login_inactive(&self) { self.auth_login_inactive_count.fetch_add(1, Ordering::Relaxed); } + pub fn get_login_fail(&self) -> u64 { self.auth_login_fail_count.load(Ordering::Relaxed) } + pub fn get_login_inactive(&self) -> u64 { self.auth_login_inactive_count.load(Ordering::Relaxed) } + + fn observe_histogram( + dur_secs: f64, + sum_ns: &AtomicU64, + count: &AtomicU64, + b005: &AtomicU64, + b02: &AtomicU64, + b1: &AtomicU64, + b3: &AtomicU64, + b10: &AtomicU64, + binf: &AtomicU64, + ) { + let ns = (dur_secs * 1_000_000_000.0) as u64; + sum_ns.fetch_add(ns, Ordering::Relaxed); + count.fetch_add(1, Ordering::Relaxed); + if dur_secs <= 0.05 { b005.fetch_add(1, Ordering::Relaxed); } + if dur_secs <= 0.2 { b02.fetch_add(1, Ordering::Relaxed); } + if dur_secs <= 1.0 { b1.fetch_add(1, Ordering::Relaxed); } + if dur_secs <= 3.0 { b3.fetch_add(1, Ordering::Relaxed); } + if dur_secs <= 10.0 { b10.fetch_add(1, Ordering::Relaxed); } + binf.fetch_add(1, Ordering::Relaxed); // +Inf bucket always + } + + pub fn observe_export_duration_buffered(&self, dur_secs: f64) { + Self::observe_histogram( + dur_secs, + &self.export_dur_buf_sum_ns, + &self.export_dur_buf_count, + &self.export_dur_buf_le_005, + &self.export_dur_buf_le_02, + &self.export_dur_buf_le_1, + &self.export_dur_buf_le_3, + &self.export_dur_buf_le_10, + &self.export_dur_buf_le_inf, + ); + } + pub fn observe_export_duration_stream(&self, dur_secs: f64) { + Self::observe_histogram( + dur_secs, + &self.export_dur_stream_sum_ns, + &self.export_dur_stream_count, + &self.export_dur_stream_le_005, + &self.export_dur_stream_le_02, + &self.export_dur_stream_le_1, + &self.export_dur_stream_le_3, + &self.export_dur_stream_le_10, + &self.export_dur_stream_le_inf, + ); + } + pub fn inc_rehash_fail_hash(&self) { self.rehash_fail_hash.fetch_add(1, Ordering::Relaxed); } + pub fn inc_rehash_fail_update(&self) { self.rehash_fail_update.fetch_add(1, Ordering::Relaxed); } + pub fn get_rehash_fail_breakdown(&self) -> (u64,u64) { (self.rehash_fail_hash.load(Ordering::Relaxed), self.rehash_fail_update.load(Ordering::Relaxed)) } + pub fn inc_login_rate_limited(&self) { self.auth_login_rate_limited.fetch_add(1, Ordering::Relaxed); } + pub fn get_login_rate_limited(&self) -> u64 { self.auth_login_rate_limited.load(Ordering::Relaxed) } } // 实现FromRef trait以便子状态可以从AppState中提取 diff --git a/jive-api/src/main.rs b/jive-api/src/main.rs index 3c03939b..20693903 100644 --- a/jive-api/src/main.rs +++ b/jive-api/src/main.rs @@ -19,6 +19,8 @@ use std::sync::Arc; use tokio::net::TcpListener; use tower::ServiceBuilder; use tower_http::trace::TraceLayer; +use jive_money_api::middleware::rate_limit::{RateLimiter, login_rate_limit}; +use jive_money_api::middleware::metrics_guard::{metrics_guard, MetricsGuardState, Cidr}; use tracing::{error, info, warn}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -224,6 +226,24 @@ async fn main() -> Result<(), Box> { metrics: AppMetrics::new(), }; + // Rate limiter (login) configuration + let (rl_max, rl_window) = std::env::var("AUTH_RATE_LIMIT") + .ok() + .and_then(|v| { + let parts: Vec<&str> = v.split('/').collect(); + if parts.len()==2 { Some((parts[0].parse().ok()?, parts[1].parse().ok()?)) } else { None } + }) + .unwrap_or((30u32, 60u64)); + let rate_limiter = RateLimiter::new(rl_max, rl_window); + let metrics_guard_state = { + let enabled = std::env::var("ALLOW_PUBLIC_METRICS").map(|v| v=="0").unwrap_or(false); + let allow_list = std::env::var("METRICS_ALLOW_CIDRS").unwrap_or("127.0.0.1/32".to_string()); + let deny_list = std::env::var("METRICS_DENY_CIDRS").unwrap_or_default(); + let allow = allow_list.split(',').filter_map(|c| Cidr::parse(c.trim())).collect(); + let deny = deny_list.split(',').filter_map(|c| Cidr::parse(c.trim())).collect(); + MetricsGuardState { allow, deny, enabled } + }; + // 启动定时任务(汇率更新等) info!("🕒 Starting scheduled tasks..."); let pool_arc = Arc::new(pool.clone()); @@ -280,7 +300,9 @@ async fn main() -> Result<(), Box> { get(get_transaction_statistics), ) // Metrics endpoint - .route("/metrics", get(metrics::metrics_handler)) + .route("/metrics", get(metrics::metrics_handler).route_layer( + axum::middleware::from_fn_with_state(metrics_guard_state.clone(), metrics_guard) + )) // 收款人管理 API .route("/api/v1/payees", get(list_payees)) .route("/api/v1/payees", post(create_payee)) @@ -302,7 +324,9 @@ async fn main() -> Result<(), Box> { "/api/v1/auth/register", post(auth_handlers::register_with_family), ) - .route("/api/v1/auth/login", post(auth_handlers::login)) + .route("/api/v1/auth/login", post(auth_handlers::login).route_layer( + axum::middleware::from_fn_with_state((rate_limiter.clone(), app_state.clone()), login_rate_limit) + )) .route("/api/v1/auth/refresh", post(auth_handlers::refresh_token)) .route("/api/v1/auth/user", get(auth_handlers::get_current_user)) .route("/api/v1/auth/profile", get(auth_handlers::get_current_user)) // Alias for Flutter app @@ -659,6 +683,12 @@ async fn health_check(State(state): State) -> Json "status": "healthy", "service": "jive-money-api", "mode": mode.trim(), + "build": { + "commit": option_env!("GIT_COMMIT").unwrap_or("unknown"), + "time": option_env!("BUILD_TIME").unwrap_or("unknown"), + "rustc": option_env!("RUSTC_VERSION").unwrap_or("unknown"), + "version": env!("CARGO_PKG_VERSION") + }, "features": { "websocket": true, "database": true, @@ -680,7 +710,18 @@ async fn health_check(State(state): State) -> Json }, "rehash": { "enabled": std::env::var("REHASH_ON_LOGIN").map(|v| !matches!(v.as_str(), "0" | "false" | "FALSE")).unwrap_or(true), - "count": state.metrics.get_rehash_count() + "count": state.metrics.get_rehash_count(), + "fail_count": state.metrics.get_rehash_fail() + }, + "auth_login": { + "fail": state.metrics.get_login_fail(), + "inactive": state.metrics.get_login_inactive() + }, + "export": { + "requests_stream": state.metrics.get_export_counts().0, + "requests_buffered": state.metrics.get_export_counts().1, + "rows_stream": state.metrics.get_export_counts().2, + "rows_buffered": state.metrics.get_export_counts().3 } }, "timestamp": chrono::Utc::now().to_rfc3339() diff --git a/jive-api/src/metrics.rs b/jive-api/src/metrics.rs index d82557db..8f5537ab 100644 --- a/jive-api/src/metrics.rs +++ b/jive-api/src/metrics.rs @@ -1,12 +1,48 @@ use crate::AppState; use axum::{http::StatusCode, response::IntoResponse}; use sqlx::PgPool; +use std::sync::{Mutex, OnceLock}; +use std::time::{Instant, Duration}; + +// Simple 30s cache to reduce DB load on high scrape frequencies. +static METRICS_CACHE: OnceLock> = OnceLock::new(); +static START_TIME: OnceLock = OnceLock::new(); // Produce Prometheus-style metrics text with backward-compatible legacy metrics. pub async fn metrics_handler( axum::extract::State(state): axum::extract::State, ) -> impl IntoResponse { + // Optional access control + if std::env::var("ALLOW_PUBLIC_METRICS").map(|v| v == "0").unwrap_or(false) { + if let Some(addr) = std::env::var("METRICS_ALLOW_LOCALONLY").ok() { + if addr == "1" { + // Only allow loopback; we rely on X-Forwarded-For not being spoofed internally (basic safeguard) + // In Axum we don't have the request here directly (simplified), extension to pass remote addr could be added. + } + } + // Fallback minimal IP check using std::env FLAG; real enforcement should be middleware. + } + START_TIME.get_or_init(Instant::now); + let cache_lock = METRICS_CACHE.get_or_init(|| Mutex::new((Instant::now(), String::new()))); + let ttl_secs: u64 = std::env::var("METRICS_CACHE_TTL").ok().and_then(|v| v.parse().ok()).unwrap_or(30); + let mut cached_base: Option = None; + { + let guard = cache_lock.lock().unwrap(); + if ttl_secs > 0 && !guard.1.is_empty() && guard.0.elapsed() < Duration::from_secs(ttl_secs) { + cached_base = Some(guard.1.clone()); + } + } + let uptime_line = { + let start = START_TIME.get().unwrap(); + let secs = start.elapsed().as_secs_f64(); + format!("process_uptime_seconds {}\n", secs) + }; + if let Some(base) = cached_base { return (StatusCode::OK, [(axum::http::header::CONTENT_TYPE, "text/plain; version=0.0.4")], format!("{}{}", base, uptime_line)); } let pool: &PgPool = &state.pool; + // Build info gauge (value always 1) emitted once per scrape + let build_commit = option_env!("GIT_COMMIT").unwrap_or("unknown"); + let build_time = option_env!("BUILD_TIME").unwrap_or("unknown"); + let rustc_version = option_env!("RUSTC_VERSION").unwrap_or("unknown"); // Hash distribution + totals (best-effort) let (b2a, b2b, b2y, a2id, total, unknown) = if let Ok(row) = sqlx::query( "SELECT \ @@ -33,14 +69,105 @@ pub async fn metrics_handler( }; let rehash_count = state.metrics.get_rehash_count(); + let rehash_fail = state.metrics.get_rehash_fail(); + let (req_stream, req_buffered, rows_stream, rows_buffered) = state.metrics.get_export_counts(); + let login_fail = state.metrics.get_login_fail(); + let login_inactive = state.metrics.get_login_inactive(); + let login_rate_limited = state.metrics.get_login_rate_limited(); + // Histogram exports: convert ns sum back to seconds for Prometheus _sum + let buf_sum_sec = state.metrics.export_dur_buf_sum_ns.load(std::sync::atomic::Ordering::Relaxed) as f64 / 1e9; + let buf_count = state.metrics.export_dur_buf_count.load(std::sync::atomic::Ordering::Relaxed); + let b005 = state.metrics.export_dur_buf_le_005.load(std::sync::atomic::Ordering::Relaxed); + let b02 = state.metrics.export_dur_buf_le_02.load(std::sync::atomic::Ordering::Relaxed); + let b1 = state.metrics.export_dur_buf_le_1.load(std::sync::atomic::Ordering::Relaxed); + let b3 = state.metrics.export_dur_buf_le_3.load(std::sync::atomic::Ordering::Relaxed); + let b10 = state.metrics.export_dur_buf_le_10.load(std::sync::atomic::Ordering::Relaxed); + let binf = state.metrics.export_dur_buf_le_inf.load(std::sync::atomic::Ordering::Relaxed); + + let stream_sum_sec = state.metrics.export_dur_stream_sum_ns.load(std::sync::atomic::Ordering::Relaxed) as f64 / 1e9; + let stream_count = state.metrics.export_dur_stream_count.load(std::sync::atomic::Ordering::Relaxed); + let s005 = state.metrics.export_dur_stream_le_005.load(std::sync::atomic::Ordering::Relaxed); + let s02 = state.metrics.export_dur_stream_le_02.load(std::sync::atomic::Ordering::Relaxed); + let s1 = state.metrics.export_dur_stream_le_1.load(std::sync::atomic::Ordering::Relaxed); + let s3 = state.metrics.export_dur_stream_le_3.load(std::sync::atomic::Ordering::Relaxed); + let s10 = state.metrics.export_dur_stream_le_10.load(std::sync::atomic::Ordering::Relaxed); + let sinf = state.metrics.export_dur_stream_le_inf.load(std::sync::atomic::Ordering::Relaxed); let bcrypt_total = b2a + b2b + b2y; let mut buf = String::new(); - // Rehash counter + // Rehash counters buf.push_str("# HELP jive_password_rehash_total Total successful bcrypt to argon2id password rehashes.\n"); buf.push_str("# TYPE jive_password_rehash_total counter\n"); buf.push_str(&format!("jive_password_rehash_total {}\n", rehash_count)); + buf.push_str("# HELP jive_password_rehash_fail_total Total failed password rehash attempts.\n"); + buf.push_str("# TYPE jive_password_rehash_fail_total counter\n"); + buf.push_str(&format!("jive_password_rehash_fail_total {}\n", rehash_fail)); + let (rf_hash, rf_update) = state.metrics.get_rehash_fail_breakdown(); + buf.push_str("# HELP jive_password_rehash_fail_breakdown_total Password rehash failures by cause.\n"); + buf.push_str("# TYPE jive_password_rehash_fail_breakdown_total counter\n"); + buf.push_str(&format!("jive_password_rehash_fail_breakdown_total{{cause=\"hash\"}} {}\n", rf_hash)); + buf.push_str(&format!("jive_password_rehash_fail_breakdown_total{{cause=\"update\"}} {}\n", rf_update)); + + // Export metrics + buf.push_str("# HELP export_requests_stream_total Number of streaming export requests.\n"); + buf.push_str("# TYPE export_requests_stream_total counter\n"); + buf.push_str(&format!("export_requests_stream_total {}\n", req_stream)); + buf.push_str("# HELP export_requests_buffered_total Number of buffered export requests (JSON+CSV).\n"); + buf.push_str("# TYPE export_requests_buffered_total counter\n"); + buf.push_str(&format!("export_requests_buffered_total {}\n", req_buffered)); + buf.push_str("# HELP export_rows_stream_total Rows exported via streaming.\n"); + buf.push_str("# TYPE export_rows_stream_total counter\n"); + buf.push_str(&format!("export_rows_stream_total {}\n", rows_stream)); + buf.push_str("# HELP export_rows_buffered_total Rows exported via buffered path.\n"); + buf.push_str("# TYPE export_rows_buffered_total counter\n"); + buf.push_str(&format!("export_rows_buffered_total {}\n", rows_buffered)); + + // Auth login metrics + buf.push_str("# HELP auth_login_fail_total Failed login attempts (wrong credentials / unknown user).\n"); + buf.push_str("# TYPE auth_login_fail_total counter\n"); + buf.push_str(&format!("auth_login_fail_total {}\n", login_fail)); + buf.push_str("# HELP auth_login_inactive_total Login attempts with inactive/disabled accounts.\n"); + buf.push_str("# TYPE auth_login_inactive_total counter\n"); + buf.push_str(&format!("auth_login_inactive_total {}\n", login_inactive)); + buf.push_str("# HELP auth_login_rate_limited_total Login attempts blocked by rate limiter.\n"); + buf.push_str("# TYPE auth_login_rate_limited_total counter\n"); + buf.push_str(&format!("auth_login_rate_limited_total {}\n", login_rate_limited)); + + // Export buffered duration histogram + buf.push_str("# HELP export_duration_buffered_seconds Export (buffered) duration histogram.\n"); + buf.push_str("# TYPE export_duration_buffered_seconds histogram\n"); + buf.push_str(&format!("export_duration_buffered_seconds_bucket{{le=\"0.05\"}} {}\n", b005)); + buf.push_str(&format!("export_duration_buffered_seconds_bucket{{le=\"0.2\"}} {}\n", b02)); + buf.push_str(&format!("export_duration_buffered_seconds_bucket{{le=\"1\"}} {}\n", b1)); + buf.push_str(&format!("export_duration_buffered_seconds_bucket{{le=\"3\"}} {}\n", b3)); + buf.push_str(&format!("export_duration_buffered_seconds_bucket{{le=\"10\"}} {}\n", b10)); + buf.push_str(&format!("export_duration_buffered_seconds_bucket{{le=\"+Inf\"}} {}\n", binf)); + buf.push_str(&format!("export_duration_buffered_seconds_sum {}\n", buf_sum_sec)); + buf.push_str(&format!("export_duration_buffered_seconds_count {}\n", buf_count)); + + // Export streaming duration histogram + buf.push_str("# HELP export_duration_stream_seconds Export (stream) duration histogram.\n"); + buf.push_str("# TYPE export_duration_stream_seconds histogram\n"); + buf.push_str(&format!("export_duration_stream_seconds_bucket{{le=\"0.05\"}} {}\n", s005)); + buf.push_str(&format!("export_duration_stream_seconds_bucket{{le=\"0.2\"}} {}\n", s02)); + buf.push_str(&format!("export_duration_stream_seconds_bucket{{le=\"1\"}} {}\n", s1)); + buf.push_str(&format!("export_duration_stream_seconds_bucket{{le=\"3\"}} {}\n", s3)); + buf.push_str(&format!("export_duration_stream_seconds_bucket{{le=\"10\"}} {}\n", s10)); + buf.push_str(&format!("export_duration_stream_seconds_bucket{{le=\"+Inf\"}} {}\n", sinf)); + buf.push_str(&format!("export_duration_stream_seconds_sum {}\n", stream_sum_sec)); + buf.push_str(&format!("export_duration_stream_seconds_count {}\n", stream_count)); + + // Build info metric (labels) + buf.push_str("# HELP jive_build_info Build information (value is always 1).\n"); + buf.push_str("# TYPE jive_build_info gauge\n"); + buf.push_str(&format!( + "jive_build_info{{commit=\"{}\",time=\"{}\",rustc=\"{}\",version=\"{}\"}} 1\n", + build_commit, + build_time, + rustc_version.replace('"', "'"), + env!("CARGO_PKG_VERSION") + )); // New canonical metrics buf.push_str("# HELP password_hash_bcrypt_total Users with any bcrypt hash (2a+2b+2y).\n"); @@ -92,12 +219,11 @@ pub async fn metrics_handler( a2id )); - ( - StatusCode::OK, - [( - axum::http::header::CONTENT_TYPE, - "text/plain; version=0.0.4", - )], - buf, - ) + // Store base (without dynamic uptime) into cache + { + let mut guard = cache_lock.lock().unwrap(); + *guard = (Instant::now(), buf.clone()); + } + let full = format!("{}{}", buf, uptime_line); + (StatusCode::OK, [(axum::http::header::CONTENT_TYPE, "text/plain; version=0.0.4")], full) } diff --git a/jive-api/src/middleware/metrics_guard.rs b/jive-api/src/middleware/metrics_guard.rs new file mode 100644 index 00000000..79609a05 --- /dev/null +++ b/jive-api/src/middleware/metrics_guard.rs @@ -0,0 +1,57 @@ +use std::{net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, str::FromStr}; +use axum::{http::{Request, StatusCode}, response::Response, middleware::Next, body::Body}; +use tokio::net::lookup_host; + +#[derive(Clone, Debug)] +pub struct Cidr { network: IpAddr, mask: u32 } + +impl Cidr { + pub fn parse(s: &str) -> Option { + if s.is_empty() { return None; } + let mut parts = s.split('/'); + let ip = parts.next()?; + let mask: u32 = parts.next().unwrap_or("32").parse().ok()?; + let ipaddr = IpAddr::from_str(ip).ok()?; + Some(Self { network: ipaddr, mask }) + } + pub fn contains(&self, ip: &IpAddr) -> bool { + match (self.network, ip) { + (IpAddr::V4(n), IpAddr::V4(t)) => { + if self.mask > 32 { return false; } + let nm = u32::from(n); + let tm = u32::from(*t); + let m = if self.mask == 0 { 0 } else { u32::MAX.checked_shl(32 - self.mask).unwrap_or(0) }; + (nm & m) == (tm & m) + } + (IpAddr::V6(n), IpAddr::V6(t)) => { + if self.mask > 128 { return false; } + let nb = u128::from(n); + let tb = u128::from(*t); + let m = if self.mask == 0 { 0 } else { u128::MAX.checked_shl(128 - self.mask).unwrap_or(0) }; + (nb & m) == (tb & m) + } + _ => false, + } + } +} + +#[derive(Clone)] +pub struct MetricsGuardState { pub allow: Vec, pub deny: Vec, pub enabled: bool } + +pub async fn metrics_guard( + axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo, + axum::extract::State(state): axum::extract::State, + req: Request, + next: Next, +) -> Result { + if !state.enabled { return Ok(next.run(req).await); } + // Prefer X-Forwarded-For first hop if present (left-most) + let mut ip = addr.ip(); + if let Some(xff) = req.headers().get("x-forwarded-for").and_then(|v| v.to_str().ok()) { + if let Some(first) = xff.split(',').next() { if let Ok(parsed) = first.trim().parse() { ip = parsed; } } + } + // Deny precedence + for d in &state.deny { if d.contains(&ip) { return Err(StatusCode::FORBIDDEN); } } + for a in &state.allow { if a.contains(&ip) { return Ok(next.run(req).await); } } + Err(StatusCode::FORBIDDEN) +} diff --git a/jive-api/src/middleware/mod.rs b/jive-api/src/middleware/mod.rs index 0461b0f1..fd09b729 100644 --- a/jive-api/src/middleware/mod.rs +++ b/jive-api/src/middleware/mod.rs @@ -3,3 +3,4 @@ pub mod cors; pub mod error_handler; pub mod permission; pub mod rate_limit; +pub mod metrics_guard; diff --git a/jive-api/src/middleware/rate_limit.rs b/jive-api/src/middleware/rate_limit.rs index f40ed974..44f39790 100644 --- a/jive-api/src/middleware/rate_limit.rs +++ b/jive-api/src/middleware/rate_limit.rs @@ -1,131 +1,86 @@ -//! 请求限流中间件 +use std::{collections::HashMap, time::{Instant, Duration}, sync::{Arc, Mutex}}; +use axum::{http::{Request, StatusCode, HeaderValue}, response::Response, middleware::Next, body::Body, extract::State}; +use crate::AppState; +use tracing::warn; +use sha2::{Sha256, Digest}; +use tower::BoxError; -use axum::{ - extract::{Request, State}, - http::StatusCode, - middleware::Next, - response::{IntoResponse, Response}, - Json, -}; -use serde_json::json; -use std::{ - collections::HashMap, - sync::Arc, - time::{Duration, Instant}, -}; -use tokio::sync::RwLock; - -/// 限流配置 -#[derive(Clone)] -pub struct RateLimitConfig { - /// 时间窗口(秒) - pub window_seconds: u64, - /// 窗口内最大请求数 - pub max_requests: u32, -} - -impl Default for RateLimitConfig { - fn default() -> Self { - Self { - window_seconds: 60, // 1分钟 - max_requests: 100, // 100个请求 - } - } -} - -/// 请求记录 -#[derive(Debug, Clone)] -struct RequestRecord { - count: u32, - window_start: Instant, -} - -/// 限流器 #[derive(Clone)] pub struct RateLimiter { - config: RateLimitConfig, - records: Arc>>, + pub inner: Arc>>, // key -> (count, window_start) + pub max: u32, + pub window: Duration, + pub hash_email: bool, } impl RateLimiter { - pub fn new(config: RateLimitConfig) -> Self { - Self { - config, - records: Arc::new(RwLock::new(HashMap::new())), - } + pub fn new(max: u32, window_secs: u64) -> Self { + let hash_email = std::env::var("AUTH_RATE_LIMIT_HASH_EMAIL").map(|v| v=="1" || v.eq_ignore_ascii_case("true")).unwrap_or(true); + Self { inner: Arc::new(Mutex::new(HashMap::new())), max, window: Duration::from_secs(window_secs), hash_email } } - - /// 检查是否应该限流 - pub async fn check_rate_limit(&self, client_id: String) -> bool { - let mut records = self.records.write().await; + fn check(&self, key: &str) -> (bool, u32, u64) { + let mut map = self.inner.lock().unwrap(); let now = Instant::now(); - let window_duration = Duration::from_secs(self.config.window_seconds); - - match records.get_mut(&client_id) { - Some(record) => { - // 检查是否在同一个时间窗口内 - if now.duration_since(record.window_start) < window_duration { - // 在同一窗口内,增加计数 - if record.count >= self.config.max_requests { - return true; // 超过限制 - } - record.count += 1; - } else { - // 新的时间窗口,重置计数 - record.count = 1; - record.window_start = now; - } - } - None => { - // 首次请求,创建记录 - records.insert( - client_id, - RequestRecord { - count: 1, - window_start: now, - }, - ); - } + // Opportunistic cleanup if map large + if map.len() > 10_000 { + let window = self.window; + map.retain(|_, (_c, start)| now.duration_since(*start) <= window); } - - // 清理过期记录(可选,防止内存泄漏) - records.retain(|_, record| now.duration_since(record.window_start) < window_duration * 2); - - false + let entry = map.entry(key.to_string()).or_insert((0, now)); + if now.duration_since(entry.1) > self.window { *entry = (0, now); } + entry.0 += 1; + let allowed = entry.0 <= self.max; + let remaining = if entry.0 >= self.max { 0 } else { self.max - entry.0 }; + let retry_after = self.window.saturating_sub(now.duration_since(entry.1)).as_secs(); + (allowed, remaining, retry_after) } } -/// 限流中间件 -pub async fn rate_limit_middleware( - State(limiter): State>, - request: Request, +pub async fn login_rate_limit( + State((limiter, app_state)): State<(RateLimiter, AppState)>, + req: Request, next: Next, ) -> Result { - // 获取客户端标识(可以是IP地址、用户ID等) - let client_id = request - .headers() - .get("x-forwarded-for") - .and_then(|h| h.to_str().ok()) - .or_else(|| { - request - .headers() - .get("x-real-ip") - .and_then(|h| h.to_str().ok()) - }) - .unwrap_or("unknown") - .to_string(); - - // 检查限流 - if limiter.check_rate_limit(client_id).await { - return Ok(Json(json!({ - "error": { - "type": "rate_limited", - "message": "Too many requests. Please try again later.", - "retry_after": limiter.config.window_seconds, - } - })) - .into_response()); + // Buffer body (login payload is small) + let (parts, body) = req.into_parts(); + let bytes = match axum::body::to_bytes(body, 64 * 1024).await { Ok(b) => b, Err(_) => { + return Ok(Response::builder().status(StatusCode::BAD_REQUEST) + .header("Content-Type","application/json") + .body(Body::from("{\"error_code\":\"INVALID_BODY\"}")) + .unwrap()); } }; + let ip = parts.headers.get("x-forwarded-for") + .and_then(|v| v.to_str().ok()).and_then(|s| s.split(',').next()) + .unwrap_or("unknown").trim().to_string(); + let email_key = extract_email_key(&bytes, limiter.hash_email); + let key = format!("{}:{}", ip, email_key.unwrap_or_else(|| "_".into())); + let (allowed, _remain, retry_after) = limiter.check(&key); + let req_restored = Request::from_parts(parts, Body::from(bytes)); + if !allowed { + app_state.metrics.inc_login_rate_limited(); + warn!(event="auth_rate_limit", ip=%ip, retry_after=retry_after, key=%key, "login rate limit triggered"); + let body = serde_json::json!({ + "error_code": "RATE_LIMITED", + "message": "Too many login attempts. Please retry later.", + "retry_after": retry_after + }); + let resp = Response::builder() + .status(StatusCode::TOO_MANY_REQUESTS) + .header("Content-Type", "application/json") + .header("Retry-After", HeaderValue::from_str(&retry_after.to_string()).unwrap()) + .body(Body::from(body.to_string())) + .unwrap(); + return Ok(resp); } + Ok(next.run(req_restored).await) +} - Ok(next.run(request).await) +fn extract_email_key(bytes: &[u8], hash: bool) -> Option { + if bytes.is_empty() { return None; } + let v: serde_json::Value = serde_json::from_slice(bytes).ok()?; + let raw = v.get("email")?.as_str()?; + let norm = raw.trim().to_lowercase(); + if norm.is_empty() { return None; } + if !hash { return Some(norm); } + let mut h = Sha256::new(); h.update(&norm); let hex = format!("{:x}", h.finalize()); + Some(hex[..8].to_string()) } diff --git a/jive-api/src/services/currency_service.rs b/jive-api/src/services/currency_service.rs index b45bbed3..d25e9077 100644 --- a/jive-api/src/services/currency_service.rs +++ b/jive-api/src/services/currency_service.rs @@ -104,12 +104,12 @@ impl CurrencyService { let currencies = rows .into_iter() .map(|row| { - let code = row.code.clone(); + let _code = row.code.clone(); Currency { code: row.code, name: row.name, - // Handle potentially nullable symbol field - symbol: row.symbol.unwrap_or(code), + // symbol column非空:直接使用;保持与之前“若为空给默认”语义一致(当前 schema 下为空可能性极低) + symbol: row.symbol, decimal_places: row.decimal_places.unwrap_or(2), is_active: row.is_active.unwrap_or(true), } @@ -210,7 +210,7 @@ impl CurrencyService { Ok(FamilyCurrencySettings { family_id, // Handle potentially nullable base_currency field - base_currency: settings.base_currency.unwrap_or_else(|| "CNY".to_string()), + base_currency: if settings.base_currency.is_empty() { "CNY".to_string() } else { settings.base_currency }, allow_multi_currency: settings.allow_multi_currency.unwrap_or(false), auto_convert: settings.auto_convert.unwrap_or(false), supported_currencies: supported, diff --git a/jive-api/tests/integration/auth_login_metrics_test.rs b/jive-api/tests/integration/auth_login_metrics_test.rs new file mode 100644 index 00000000..1beb2232 --- /dev/null +++ b/jive-api/tests/integration/auth_login_metrics_test.rs @@ -0,0 +1,66 @@ +#[cfg(test)] +mod tests { + use axum::{routing::post, Router}; + use http::{Request, header, StatusCode}; + use hyper::Body; + use tower::ServiceExt; + + use jive_money_api::{handlers::auth::login, AppMetrics, AppState}; + use crate::fixtures::create_test_pool; + use uuid::Uuid; + + fn extract_metric(body: &str, name: &str) -> Option { + body.lines().filter(|l| !l.starts_with('#')).find_map(|l| { + if l.starts_with(name) { + l.split_whitespace().last()?.parse().ok() + } else { None } + }) + } + + #[tokio::test] + async fn login_fail_and_inactive_counters_increment() { + let pool = create_test_pool().await; + let metrics = AppMetrics::new(); + let state = AppState { pool: pool.clone(), ws_manager: None, redis: None, metrics: metrics.clone() }; + let app = Router::new() + .route("/api/v1/auth/login", post(login)) + .route("/metrics", axum::routing::get(jive_money_api::metrics::metrics_handler)) + .with_state(state.clone()); + + // Seed one inactive user (is_active=false) + let email_inactive = format!("inactive_{}@example.com", Uuid::new_v4()); + sqlx::query("INSERT INTO users (email,password_hash,name,is_active,created_at,updated_at) VALUES ($1,$2,$3,false,NOW(),NOW())") + .bind(&email_inactive) + .bind("$argon2id$v=19$m=4096,t=3,p=1$ZmFrZVNhbHQAAAAAAAAAAA$1YJzJ6x3P0fakefakefakefakefakefakefake") + .bind("Inactive User") + .execute(&pool).await.expect("insert inactive"); + + // 1) Unknown user login -> fail counter + let req_fail = Request::builder().method("POST").uri("/api/v1/auth/login") + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from("{\"email\":\"nouser@example.com\",\"password\":\"X\"}")) + .unwrap(); + let resp_fail = app.clone().oneshot(req_fail).await.unwrap(); + assert_eq!(resp_fail.status(), StatusCode::UNAUTHORIZED); + + // 2) Inactive user login -> inactive counter + let req_inactive = Request::builder().method("POST").uri("/api/v1/auth/login") + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(format!("{{\"email\":\"{}\",\"password\":\"whatever\"}}", email_inactive))) + .unwrap(); + let resp_inactive = app.clone().oneshot(req_inactive).await.unwrap(); + assert_eq!(resp_inactive.status(), StatusCode::FORBIDDEN); + + // Fetch metrics + let mreq = Request::builder().uri("/metrics").body(Body::empty()).unwrap(); + let mresp = app.clone().oneshot(mreq).await.unwrap(); + assert_eq!(mresp.status(), StatusCode::OK); + let body = hyper::body::to_bytes(mresp.into_body()).await.unwrap(); + let txt = String::from_utf8(body.to_vec()).unwrap(); + let fail = extract_metric(&txt, "auth_login_fail_total").unwrap_or(0); + let inactive = extract_metric(&txt, "auth_login_inactive_total").unwrap_or(0); + assert!(fail >= 1, "expected fail >=1, got {}", fail); + assert!(inactive >= 1, "expected inactive >=1, got {}", inactive); + } +} + diff --git a/jive-api/tests/integration/auth_rehash_fail_metrics_test.rs b/jive-api/tests/integration/auth_rehash_fail_metrics_test.rs new file mode 100644 index 00000000..1eb16402 --- /dev/null +++ b/jive-api/tests/integration/auth_rehash_fail_metrics_test.rs @@ -0,0 +1,60 @@ +#[cfg(test)] +mod tests { + use axum::{routing::post, Router}; + use http::{Request, header, StatusCode}; + use hyper::Body; + use tower::ServiceExt; + use uuid::Uuid; + + use jive_money_api::handlers::auth::login; + use jive_money_api::{AppMetrics, AppState}; + + use crate::fixtures::create_test_pool; + + // Simulate rehash failure by using a read-only role for the UPDATE step (one approach). + // Here we instead force failure by deleting the user between verify and update in parallel thread. + #[tokio::test] + async fn rehash_failure_increments_fail_counter() { + std::env::set_var("REHASH_ON_LOGIN", "1"); + let pool = create_test_pool().await; + let metrics = AppMetrics::new(); + let state = AppState { pool: pool.clone(), ws_manager: None, redis: None, metrics: metrics.clone() }; + + let email = format!("rehash_fail_{}@example.com", Uuid::new_v4()); + let password = "Fail123!"; + let bcrypt_hash = bcrypt::hash(password, bcrypt::DEFAULT_COST).unwrap(); + sqlx::query("INSERT INTO users (email,password_hash,name,is_active,created_at,updated_at) VALUES ($1,$2,'RF User',true,NOW(),NOW())") + .bind(&email) + .bind(&bcrypt_hash) + .execute(&pool) + .await + .expect("insert user"); + + // Spawn a task that deletes the user right after a short delay to race the UPDATE + let pool_del = pool.clone(); + let email_del = email.clone(); + tokio::spawn(async move { + // Small sleep to let handler pass password verify + tokio::time::sleep(std::time::Duration::from_millis(30)).await; + let _ = sqlx::query("DELETE FROM users WHERE LOWER(email)=LOWER($1)") + .bind(&email_del) + .execute(&pool_del).await; + }); + + let app = Router::new() + .route("/api/v1/auth/login", post(login)) + .with_state(state.clone()); + + let req = Request::builder() + .method("POST") + .uri("/api/v1/auth/login") + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(format!("{{\"email\":\"{}\",\"password\":\"{}\"}}", email, password))) + .unwrap(); + let _ = app.clone().oneshot(req).await; // success or unauthorized both fine + + // Fail counter should be >=1 (may also have no success increment since user removed) + assert!(metrics.get_rehash_fail() >= 1, "rehash_fail_count not incremented"); + } +} + diff --git a/jive-api/tests/integration/export_metrics_test.rs b/jive-api/tests/integration/export_metrics_test.rs new file mode 100644 index 00000000..90078275 --- /dev/null +++ b/jive-api/tests/integration/export_metrics_test.rs @@ -0,0 +1,150 @@ +#[cfg(test)] +mod tests { + use axum::{routing::{post, get}, Router}; + use http::{Request, header}; + use hyper::Body; + use tower::ServiceExt; + use serde_json::json; + use uuid::Uuid; + use chrono::NaiveDate; + use rust_decimal::Decimal; + + use jive_money_api::handlers::transactions::{export_transactions, export_transactions_csv_stream}; + use jive_money_api::auth::Claims; + use jive_money_api::AppState; + + use crate::fixtures::{create_test_pool, create_test_user, create_test_family}; + + // Helper: bearer token + async fn bearer_for(pool: &sqlx::PgPool, user_id: Uuid, family_id: Uuid) -> String { + let claims = Claims::new(user_id, format!("test_{}@example.com", user_id), Some(family_id)); + format!("Bearer {}", claims.to_token().unwrap()) + } + + // Extract metric value from /metrics body (simple regex-less parse) + fn find_metric(body: &str, name: &str) -> Option { + for line in body.lines() { + if line.starts_with('#') { continue; } + if let Some(rest) = line.strip_prefix(name) { + let parts: Vec<&str> = rest.trim().split_whitespace().collect(); + if let Some(val_str) = parts.last() { + if let Ok(v) = val_str.parse::() { return Some(v); } + } + } + } + None + } + + #[tokio::test] + async fn export_buffered_and_stream_metrics_increment() { + // Use feature export_stream in test command to cover both paths; this test + // tolerates absence of streaming feature by skipping that section if 404. + let pool = create_test_pool().await; + let user = create_test_user(&pool).await; + let family = create_test_family(&pool, user.id).await; + let token = bearer_for(&pool, user.id, family.id).await; + + // Seed default ledger id + let default_ledger_id: Uuid = sqlx::query_scalar( + "SELECT id FROM ledgers WHERE family_id = $1 AND is_default = true LIMIT 1" + ) + .bind(family.id) + .fetch_one(&pool) + .await + .expect("default ledger"); + + // Seed account + a few transactions + let account_id = Uuid::new_v4(); + sqlx::query(r#"INSERT INTO accounts (id, ledger_id, name, account_type, current_balance, created_at, updated_at) + VALUES ($1,$2,'Acct','checking',0,NOW(),NOW())"#) + .bind(account_id) + .bind(default_ledger_id) + .execute(&pool).await.expect("insert account"); + + for (idx, amt) in [1234, 5678, 9012].iter().enumerate() { + let tx_id = Uuid::new_v4(); + sqlx::query(r#"INSERT INTO transactions ( + id, account_id, ledger_id, amount, transaction_type, transaction_date, + description, status, is_recurring, created_at, updated_at) + VALUES ($1,$2,$3,$4,'expense',$5,$6,'cleared',false,NOW(),NOW())"#) + .bind(tx_id) + .bind(account_id) + .bind(default_ledger_id) + .bind(Decimal::new(*amt, 2)) + .bind(NaiveDate::from_ymd_opt(2024, 9, 10 + idx as u32).unwrap()) + .bind(format!("Item{}", idx)) + .execute(&pool).await.expect("insert txn"); + } + + // Build AppState to expose metrics + let state = AppState { pool: pool.clone(), ws_manager: None, redis: None, metrics: jive_money_api::AppMetrics::new() }; + let app = Router::new() + .route("/api/v1/transactions/export", post(export_transactions)) + .route("/api/v1/transactions/export.csv", get(export_transactions_csv_stream)) + .route("/metrics", axum::routing::get(jive_money_api::metrics::metrics_handler)) + .with_state(state.clone()); + + // Baseline metrics + let m0 = app.clone().oneshot(Request::builder().uri("/metrics").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(m0.status(), http::StatusCode::OK); + let base_body = hyper::body::to_bytes(m0.into_body()).await.unwrap(); + let base = String::from_utf8(base_body.to_vec()).unwrap(); + let base_buf_req = find_metric(&base, "export_requests_buffered_total").unwrap_or(0); + let base_buf_rows = find_metric(&base, "export_rows_buffered_total").unwrap_or(0); + let base_stream_req = find_metric(&base, "export_requests_stream_total").unwrap_or(0); + let base_stream_rows = find_metric(&base, "export_rows_stream_total").unwrap_or(0); + + // Buffered POST CSV + let req_csv = Request::builder() + .method("POST") + .uri("/api/v1/transactions/export") + .header(header::AUTHORIZATION, token.clone()) + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(json!({"format":"csv"}).to_string())) + .unwrap(); + let resp_csv = app.clone().oneshot(req_csv).await.unwrap(); + assert_eq!(resp_csv.status(), http::StatusCode::OK); + + // Buffered POST JSON + let req_json = Request::builder() + .method("POST") + .uri("/api/v1/transactions/export") + .header(header::AUTHORIZATION, token.clone()) + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(json!({"format":"json"}).to_string())) + .unwrap(); + let resp_json = app.clone().oneshot(req_json).await.unwrap(); + assert_eq!(resp_json.status(), http::StatusCode::OK); + + // Streaming GET (may be disabled). If 200, expect increments. + let req_stream = Request::builder() + .method("GET") + .uri("/api/v1/transactions/export.csv") + .header(header::AUTHORIZATION, token.clone()) + .body(Body::empty()) + .unwrap(); + let resp_stream = app.clone().oneshot(req_stream).await.unwrap(); + let streaming_enabled = resp_stream.status() == http::StatusCode::OK; + if streaming_enabled { + // drain body to ensure task completes + let _ = hyper::body::to_bytes(resp_stream.into_body()).await.unwrap(); + } + + // Fetch metrics again + let m1 = app.clone().oneshot(Request::builder().uri("/metrics").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(m1.status(), http::StatusCode::OK); + let body1 = hyper::body::to_bytes(m1.into_body()).await.unwrap(); + let txt1 = String::from_utf8(body1.to_vec()).unwrap(); + let buf_req_after = find_metric(&txt1, "export_requests_buffered_total").unwrap(); + let buf_rows_after = find_metric(&txt1, "export_rows_buffered_total").unwrap(); + assert_eq!(buf_req_after, base_buf_req + 2, "buffered request count mismatch"); + assert!(buf_rows_after >= base_buf_rows + 3, "expected at least 3 data rows added"); + if streaming_enabled { + let stream_req_after = find_metric(&txt1, "export_requests_stream_total").unwrap(); + let stream_rows_after = find_metric(&txt1, "export_rows_stream_total").unwrap(); + assert_eq!(stream_req_after, base_stream_req + 1, "stream request count mismatch"); + assert!(stream_rows_after >= base_stream_rows + 3, "expected stream rows increment"); + } + } +} + diff --git a/jive-api/tests/integration/login_rate_limit_email_test.rs b/jive-api/tests/integration/login_rate_limit_email_test.rs new file mode 100644 index 00000000..83a08e08 --- /dev/null +++ b/jive-api/tests/integration/login_rate_limit_email_test.rs @@ -0,0 +1,50 @@ +#[cfg(test)] +mod tests { + use axum::{routing::post, Router}; + use http::{Request, header, StatusCode}; + use hyper::Body; + use tower::ServiceExt; + use uuid::Uuid; + use jive_money_api::{handlers::auth::login, AppMetrics, AppState}; + use jive_money_api::middleware::rate_limit::{RateLimiter, login_rate_limit}; + use crate::fixtures::create_test_pool; + + // Helper to insert a user + async fn seed_user(pool: &sqlx::PgPool, email: &str) { + sqlx::query("INSERT INTO users (email,password_hash,name,is_active,created_at,updated_at) VALUES ($1,'$argon2id$v=19$m=4096,t=3,p=1$dGVzdHNhbHQAAAAAAAAAAA$Jr7Z5fakehashHashHashHashHashHash','RL U',true,NOW(),NOW())") + .bind(email).execute(pool).await.unwrap(); + } + + #[tokio::test] + async fn rate_limit_is_per_email() { + let pool = create_test_pool().await; + let email1 = format!("rl_email1_{}@example.com", Uuid::new_v4()); + let email2 = format!("rl_email2_{}@example.com", Uuid::new_v4()); + seed_user(&pool, &email1).await; + seed_user(&pool, &email2).await; + let metrics = AppMetrics::new(); + let state = AppState { pool: pool.clone(), ws_manager: None, redis: None, metrics }; + let limiter = RateLimiter::new(3, 60); // 3 attempts per key + let app = Router::new().route("/api/v1/auth/login", post(login).route_layer( + axum::middleware::from_fn_with_state((limiter, state), login_rate_limit) + )); + + // Email1: 3 attempts allowed, 4th blocked + for i in 0..4 { + let req = Request::builder().method("POST").uri("/api/v1/auth/login") + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(format!("{{\"email\":\"{}\",\"password\":\"Bad{}\"}}", email1, i))) + .unwrap(); + let resp = app.clone().oneshot(req).await.unwrap(); + if i < 3 { assert_ne!(resp.status(), StatusCode::TOO_MANY_REQUESTS); } else { assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS); } + } + // Email2 still independent -> first attempt should be allowed + let req2 = Request::builder().method("POST").uri("/api/v1/auth/login") + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(format!("{{\"email\":\"{}\",\"password\":\"Bad\"}}", email2))) + .unwrap(); + let resp2 = app.clone().oneshot(req2).await.unwrap(); + assert_ne!(resp2.status(), StatusCode::TOO_MANY_REQUESTS); + } +} + diff --git a/jive-api/tests/integration/login_rate_limit_test.rs b/jive-api/tests/integration/login_rate_limit_test.rs new file mode 100644 index 00000000..d087e3e0 --- /dev/null +++ b/jive-api/tests/integration/login_rate_limit_test.rs @@ -0,0 +1,38 @@ +#[cfg(test)] +mod tests { + use axum::{routing::post, Router}; + use http::{Request, header, StatusCode}; + use hyper::Body; + use tower::ServiceExt; + use jive_money_api::{handlers::auth::login, AppState, AppMetrics}; + use jive_money_api::middleware::rate_limit::{RateLimiter, login_rate_limit}; + use crate::fixtures::create_test_pool; + use uuid::Uuid; + + #[tokio::test] + async fn login_rate_limit_blocks_after_threshold() { + let pool = create_test_pool().await; + // Seed a user so we can attempt logins (with wrong password to avoid side effects) + let email = format!("rl_{}@example.com", Uuid::new_v4()); + sqlx::query("INSERT INTO users (email,password_hash,name,is_active,created_at,updated_at) VALUES ($1,'$argon2id$v=19$m=4096,t=3,p=1$dGVzdHNhbHQAAAAAAAAAAA$Jr7Z5fakehashHashHashHashHashHash','RL User',true,NOW(),NOW())") + .bind(&email).execute(&pool).await.unwrap(); + let metrics = AppMetrics::new(); + let state = AppState { pool: pool.clone(), ws_manager: None, redis: None, metrics: metrics.clone() }; + let limiter = RateLimiter::new(3, 60); // allow 3 attempts + let app = Router::new() + .route("/api/v1/auth/login", post(login).route_layer( + axum::middleware::from_fn_with_state((limiter, state.clone()), login_rate_limit) + )); + + // Perform 4 attempts -> last should be 429 + for i in 0..4 { + let req = Request::builder().method("POST").uri("/api/v1/auth/login") + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(format!("{{\"email\":\"{}\",\"password\":\"Bad{}\"}}", email, i))) + .unwrap(); + let resp = app.clone().oneshot(req).await.unwrap(); + if i < 3 { assert_ne!(resp.status(), StatusCode::TOO_MANY_REQUESTS); } else { assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS); } + } + } +} + diff --git a/jive-api/tests/integration/metrics_guard_ipv6_test.rs b/jive-api/tests/integration/metrics_guard_ipv6_test.rs new file mode 100644 index 00000000..cdf207c7 --- /dev/null +++ b/jive-api/tests/integration/metrics_guard_ipv6_test.rs @@ -0,0 +1,20 @@ +#[cfg(test)] +mod tests { + use axum::{Router, routing::get}; + use http::Request; + use hyper::Body; + use tower::ServiceExt; + use jive_money_api::{metrics, AppMetrics, AppState}; + use sqlx::PgPool; + + // For simplicity we just ensure handler returns 200 when whitelist disabled; IPv6 matching logic is unit-level. + #[tokio::test] + async fn metrics_v6_allowed_when_public() { + std::env::remove_var("ALLOW_PUBLIC_METRICS"); + let dummy_pool = PgPool::connect_lazy("postgresql://ignored").unwrap_err(); + // Skip full state since test only checks routing; create minimal state is complex, so we just assert handler builds. + // This test is a placeholder; full integration would need real AppState. Here we simply ensure no panic. + assert!(true); + } +} + diff --git a/scripts/check_metrics_consistency.sh b/scripts/check_metrics_consistency.sh new file mode 100755 index 00000000..2b5e83a5 --- /dev/null +++ b/scripts/check_metrics_consistency.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +set -euo pipefail + +API_URL=${API_URL:-http://localhost:8012} + +health_json=$(curl -fsS "${API_URL}/health") || { echo "FAIL: cannot fetch /health" >&2; exit 1; } +metrics_text=$(curl -fsS "${API_URL}/metrics") || { echo "FAIL: cannot fetch /metrics" >&2; exit 1; } + +health_bcrypt_sum=$(echo "$health_json" | jq '.metrics.hash_distribution.bcrypt | (."2a"+."2b"+."2y")') +metrics_bcrypt_total=$(grep '^password_hash_bcrypt_total ' <<<"$metrics_text" | awk '{print $2}') + +if [[ -z "$metrics_bcrypt_total" ]]; then + echo "FAIL: password_hash_bcrypt_total not present in /metrics" >&2 + exit 2 +fi + +if [[ "$health_bcrypt_sum" != "$metrics_bcrypt_total" ]]; then + echo "FAIL: mismatch bcrypt total (health=$health_bcrypt_sum metrics=$metrics_bcrypt_total)" >&2 + exit 3 +fi + +echo "OK: bcrypt total consistent ($metrics_bcrypt_total)" +exit 0 diff --git a/scripts/verify_observability.sh b/scripts/verify_observability.sh new file mode 100755 index 00000000..bafb092b --- /dev/null +++ b/scripts/verify_observability.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env bash +set -euo pipefail + +API_URL="${API_URL:-http://localhost:8012}" +EMAIL_BASE="obs_test_$RANDOM" +PASS="Wrong123!" + +log() { echo "[verify] $*"; } +fail() { echo "[FAIL] $*" >&2; exit 1; } + +check_metric() { + local name=$1 + grep -E "^${name}(\{|\s)" metrics.out >/dev/null || fail "Missing metric: $name" +} + +log "Simulating failed logins..." +for i in 1 2; do + curl -s -o /dev/null -w '%{http_code}\n' -H 'Content-Type: application/json' \ + -d "{\"email\":\"${EMAIL_BASE}${i}@example.com\",\"password\":\"$PASS\"}" \ + "$API_URL/api/v1/auth/login" || true +done + +log "Triggering rate limit (using same email)..." +for i in 1 2 3 4; do + curl -s -o /dev/null -H 'Content-Type: application/json' \ + -d "{\"email\":\"${EMAIL_BASE}-limit@example.com\",\"password\":\"$PASS\"}" \ + "$API_URL/api/v1/auth/login" || true +done + +log "Fetching metrics..." +curl -s "$API_URL/metrics" > metrics.out || fail "Cannot fetch /metrics" + +REQUIRED=( \ + auth_login_fail_total \ + auth_login_rate_limited_total \ + jive_password_rehash_fail_breakdown_total \ + export_duration_buffered_seconds_bucket \ + process_uptime_seconds \ +) + +for m in "${REQUIRED[@]}"; do + check_metric "$m" +done + +echo "PASS: Core observability metrics present." +rm -f metrics.out