-
Couldn't load subscription status.
- Fork 0
api: auth negative tests, family ledger test, superadmin doc, export streaming draft #39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| ## Export Streaming Design (Draft) | ||
|
|
||
| Current implementation (GET /transactions/export.csv and POST JSON export) builds the entire CSV payload in memory before responding. This is acceptable for small/medium datasets (< ~5–10k rows) but risks: | ||
| - Elevated peak memory usage proportional to row count × serialized width | ||
| - Increased latency before first byte (TTFB) for large exports | ||
| - Potential timeouts on slow clients / large data | ||
|
|
||
| ### Target Goals | ||
| - Stream CSV rows incrementally to the client | ||
| - Maintain existing endpoint semantics (query params + include_header) | ||
| - Preserve authorization and filtering logic | ||
| - Avoid loading all rows simultaneously; use DB cursor / chunked fetch | ||
| - Keep memory O(chunk_size) instead of O(total_rows) | ||
|
|
||
| ### Proposed Approach | ||
| 1. Introduce an async Stream body (e.g. `axum::body::Body` from a `tokio_stream::wrappers::ReceiverStream`). | ||
| 2. Acquire a server-side channel (mpsc) or use `async_stream::try_stream!` to yield `Result<Bytes>` chunks. | ||
| 3. Write header first (conditional on `include_header`). | ||
| 4. Fetch rows in chunks: `LIMIT $N OFFSET loop*chunk` or preferably a server-side cursor / keyset pagination if ordering stable. | ||
| 5. Serialize each row to a CSV line and push into the stream; flush periodically (small `Bytes` frames of ~8–32 KB to balance syscall overhead vs latency). | ||
| 6. Close stream when no more rows; ensure cancellation drops DB cursor. | ||
|
|
||
| ### Database Access Pattern | ||
| - Option A (simple): repeated `SELECT ... ORDER BY id LIMIT $chunk OFFSET $offset` until fewer than chunk results. | ||
| - Pros: trivial to implement. | ||
| - Cons: OFFSET penalty grows with large tables. | ||
| - Option B (keyset): track last (id, date) composite and use `WHERE (date,id) > ($last_date,$last_id)` ORDER BY (date,id) LIMIT $chunk. | ||
| - Pros: stable performance. | ||
| - Cons: Requires deterministic ordering and composite index. | ||
| - Option C (cursor): Use PostgreSQL declared cursor inside a transaction with `FETCH FORWARD $chunk`. | ||
| - Pros: Minimal SQL complexity, effective for very large sets. | ||
| - Cons: Keeps transaction open; need timeout/abort on client disconnect. | ||
|
|
||
| Initial recommendation: start with keyset pagination if the transactions table already has suitable indexes (date + id). Fall back to OFFSET if index not present, then iterate. | ||
|
|
||
| ### Error Handling | ||
| - If an error occurs mid-stream (DB/network), terminate stream and rely on client detecting incomplete CSV (documented). Optionally append a trailing comment line starting with `# ERROR:` for internal tooling (not for production by default). | ||
| - Authorization is validated before streaming begins; per-row permissions should already be enforced by the query predicate. | ||
|
|
||
| ### Backpressure & Chunk Size | ||
| - Default chunk size: 500 rows. | ||
| - Tune by measuring latency vs memory: each row ~150 bytes average → 500 rows ≈ 75 KB before encoding to Bytes (still reasonable). | ||
| - Emit each chunk as one Bytes frame; header as a separate first frame. | ||
|
|
||
| ### Include Header Logic | ||
| - If `include_header=false`, skip header frame. | ||
| - Otherwise first frame = `b"col1,col2,...\n"`. | ||
|
|
||
| ### CSV Writer | ||
| - Reuse existing row-to-CSV logic; adapt it to a function returning `String` line without accumulating in Vec. | ||
| - Avoid allocation churn: use a `String` buffer with `clear()` per row. | ||
|
|
||
| ### Observability | ||
| - Add tracing spans: `export.start` (with row_estimate if available), `export.chunk_emitted` (chunk_index, rows_in_chunk), `export.complete` (total_rows, duration_ms). | ||
| - Consider a soft limit guard (e.g. if > 200k rows warn user or require async job + presigned URL pattern—out of scope for first iteration). | ||
|
|
||
| ### Compatibility | ||
| - Existing clients expecting entire body still work; streaming is transparent at HTTP level. | ||
| - For very small datasets overhead is negligible (one header + one chunk frame). | ||
|
|
||
| ### Future Extensions | ||
| 1. Async job offload + download token when row count exceeds threshold. | ||
| 2. Compression: optional `Accept-Encoding: gzip` support via layered body wrapper. | ||
| 3. Column selection / dynamic schema negotiation. | ||
| 4. Rate limiting or concurrency caps per user. | ||
|
|
||
| ### Minimal Implementation Checklist | ||
| - [ ] Refactor current CSV builder into row serializer | ||
| - [ ] Add streaming variant behind feature flag `export_stream` (optional) | ||
| - [ ] Implement keyset pagination helper | ||
| - [ ] Write integration test for large (e.g. 5k rows) export ensuring early first-byte (<500ms on local) and total content hash matches non-stream version | ||
| - [ ] Bench memory before/after (heap snapshot or simple RSS sampling) | ||
|
|
||
| --- | ||
| Status: Draft for review. Adjust chunk strategy after initial benchmarks. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| #[cfg(test)] | ||
| mod tests { | ||
| use axum::{routing::post, Router}; | ||
| use http::StatusCode; | ||
| use hyper::Body; | ||
| use tower::ServiceExt; | ||
|
|
||
| use jive_money_api::handlers::auth::{login, refresh_token}; | ||
|
|
||
| use crate::fixtures::create_test_pool; | ||
|
|
||
| async fn post_json(app: &Router, path: &str, body: serde_json::Value) -> http::Response<hyper::Body> { | ||
| let req = http::Request::builder() | ||
| .method("POST") | ||
| .uri(path) | ||
| .header(http::header::CONTENT_TYPE, "application/json") | ||
| .body(Body::from(body.to_string())) | ||
| .unwrap(); | ||
| app.clone().oneshot(req).await.unwrap() | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn login_fails_with_wrong_password_bcrypt() { | ||
| let pool = create_test_pool().await; | ||
| let email = format!("bcrypt_fail_{}@example.com", uuid::Uuid::new_v4()); | ||
| let good_plain = "CorrectPass123!"; | ||
| let bcrypt_hash = bcrypt::hash(good_plain, bcrypt::DEFAULT_COST).unwrap(); | ||
|
|
||
| sqlx::query( | ||
| r#"INSERT INTO users (email, password_hash, name, is_active, created_at, updated_at) | ||
| VALUES ($1,$2,$3,true,NOW(),NOW())"#, | ||
| ) | ||
| .bind(&email) | ||
| .bind(&bcrypt_hash) | ||
| .bind("Bcrypt Fail") | ||
| .execute(&pool) | ||
| .await | ||
| .expect("insert bcrypt user"); | ||
|
|
||
| let app = Router::new() | ||
| .route("/api/v1/auth/login", post(login)) | ||
| .with_state(pool.clone()); | ||
|
|
||
| // Wrong password | ||
| let resp = post_json(&app, "/api/v1/auth/login", serde_json::json!({ | ||
| "email": email, | ||
| "password": "BadPass999!", | ||
| })).await; | ||
| assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); | ||
|
|
||
| // Cleanup | ||
| sqlx::query("DELETE FROM users WHERE LOWER(email)=LOWER($1)") | ||
| .bind(&email) | ||
| .execute(&pool) | ||
| .await | ||
| .ok(); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn refresh_fails_for_inactive_user() { | ||
| let pool = create_test_pool().await; | ||
| let email = format!("inactive_refresh_{}@example.com", uuid::Uuid::new_v4()); | ||
|
|
||
| // Create inactive user (argon2) | ||
| let salt = argon2::password_hash::SaltString::generate(&mut argon2::password_hash::rand_core::OsRng); | ||
|
||
| let argon2 = argon2::Argon2::default(); | ||
| let hash = argon2 | ||
| .hash_password("InactivePass123!".as_bytes(), &salt) | ||
| .unwrap() | ||
| .to_string(); | ||
| let user_id: uuid::Uuid = uuid::Uuid::new_v4(); | ||
| sqlx::query( | ||
| r#"INSERT INTO users (id, email, password_hash, name, is_active, created_at, updated_at) | ||
| VALUES ($1,$2,$3,$4,false,NOW(),NOW())"#, | ||
| ) | ||
| .bind(user_id) | ||
| .bind(&email) | ||
| .bind(&hash) | ||
| .bind("Inactive Refresh") | ||
| .execute(&pool) | ||
| .await | ||
| .expect("insert inactive user"); | ||
|
|
||
| // Generate a JWT manually to simulate prior login (even though user inactive now) | ||
| let token = jive_money_api::auth::generate_jwt(user_id, None).unwrap(); | ||
|
|
||
| let app = Router::new() | ||
| .route("/api/v1/auth/refresh", post(refresh_token)) | ||
| .with_state(pool.clone()); | ||
|
|
||
| // Attempt refresh | ||
| let req = http::Request::builder() | ||
| .method("POST") | ||
| .uri("/api/v1/auth/refresh") | ||
| .header("Authorization", format!("Bearer {}", token)) | ||
| .body(Body::empty()) | ||
| .unwrap(); | ||
| let resp = app.clone().oneshot(req).await.unwrap(); | ||
| assert_eq!(resp.status(), StatusCode::FORBIDDEN); | ||
|
|
||
| sqlx::query("DELETE FROM users WHERE id = $1") | ||
| .bind(user_id) | ||
| .execute(&pool) | ||
| .await | ||
| .ok(); | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,49 @@ | ||||||
| #[cfg(test)] | ||||||
| mod tests { | ||||||
| use jive_money_api::services::{auth_service::{AuthService, RegisterRequest}, FamilyService}; | ||||||
| use crate::fixtures::create_test_pool; | ||||||
|
|
||||||
| #[tokio::test] | ||||||
| async fn family_creation_sets_default_ledger() { | ||||||
| let pool = create_test_pool().await; | ||||||
| let auth = AuthService::new(pool.clone()); | ||||||
| let email = format!("family_def_{}@example.com", uuid::Uuid::new_v4()); | ||||||
| let uc = auth.register_with_family(RegisterRequest { | ||||||
| email: email.clone(), | ||||||
| password: "FamilyDef123!".to_string(), | ||||||
| name: Some("Family Owner".to_string()), | ||||||
| username: None, | ||||||
| }).await.expect("register user"); | ||||||
|
|
||||||
| let user_id = uc.user_id; | ||||||
| let family_id = uc.current_family_id.expect("family id"); | ||||||
|
|
||||||
| // Query ledger(s) | ||||||
| #[derive(sqlx::FromRow, Debug)] | ||||||
| struct LedgerRow { id: uuid::Uuid, family_id: uuid::Uuid, is_default: Option<bool>, created_by: Option<uuid::Uuid>, name: String } | ||||||
| let ledgers = sqlx::query_as::<_, LedgerRow>( | ||||||
| "SELECT id, family_id, is_default, created_by, name FROM ledgers WHERE family_id = $1" | ||||||
| ) | ||||||
| .bind(family_id) | ||||||
| .fetch_all(&pool).await.expect("fetch ledgers"); | ||||||
|
|
||||||
| assert_eq!(ledgers.len(), 1, "exactly one default ledger expected"); | ||||||
| let ledger = &ledgers[0]; | ||||||
| assert_eq!(ledger.family_id, family_id); | ||||||
| assert_eq!(ledger.is_default.unwrap_or(false), true, "ledger should be default"); | ||||||
|
||||||
| assert_eq!(ledger.is_default.unwrap_or(false), true, "ledger should be default"); | |
| assert_eq!(ledger.is_default, Some(true), "ledger should be default"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing article: should be 'until fewer than chunk_size results' or 'until fewer results than chunk_size'.