feat(ingest): implement idempotent batch upsert endpoint for sales_daily#19
Conversation
Comprehensive implementation plan for idempotent batch upsert endpoints: - POST /ingest/sales-daily with natural key resolution (store_code, sku) - PostgreSQL ON CONFLICT DO UPDATE for replay-safe ingestion - Partial success pattern with row-level error reporting - Configurable batch size and timeout settings - 13 ordered implementation tasks with validation gates Confidence score: 8/10 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Implements PRP-3 Ingest Layer with:
- POST /ingest/sales-daily endpoint accepting natural keys (store_code, sku)
- KeyResolver service for store_code → store_id, sku → product_id resolution
- Calendar date FK validation (dates must exist in calendar table)
- PostgreSQL ON CONFLICT DO UPDATE for replay-safe upserts
- Partial success pattern: valid rows processed, invalid rows returned as errors
- Configurable batch size and timeout via Settings
- Structured logging with ingest.sales_daily.{action}_{state} events
New files:
- app/features/ingest/ - Complete vertical slice (schemas, service, routes, tests)
- examples/api/ingest_sales_daily.http - HTTP client examples
Test coverage: 42 new tests (18 schema + 15 service + 9 integration)
All validation gates passing: ruff, mypy, pyright, pytest (101 tests)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Reviewer's GuideImplements a new idempotent batch ingest endpoint POST /ingest/sales-daily with natural key resolution, partial-success semantics, and Postgres ON CONFLICT upserts, including full vertical slice (schemas, service, routes, tests), new ingest settings, router wiring, and lint config tweaks. Sequence diagram for POST_ingest_sales_daily_batch_upsert_flowsequenceDiagram
actor Client
participant FastAPIApp
participant IngestRouter
participant IngestService
participant KeyResolver
participant Database
Client->>FastAPIApp: POST /ingest/sales-daily
FastAPIApp->>IngestRouter: route ingest_sales_daily
IngestRouter->>IngestRouter: validate SalesDailyIngestRequest
IngestRouter->>IngestService: upsert_sales_daily_batch(records, db, KeyResolver)
IngestService->>KeyResolver: resolve_store_codes(db, store_codes)
KeyResolver-->>IngestService: store_code_to_id_map
IngestService->>KeyResolver: resolve_skus(db, skus)
KeyResolver-->>IngestService: sku_to_product_id_map
IngestService->>KeyResolver: resolve_dates(db, dates)
KeyResolver-->>IngestService: valid_dates
IngestService->>IngestService: build valid_rows and IngestRowError list
alt has_valid_rows
IngestService->>Database: pg_insert(SalesDaily).on_conflict_do_update
Database-->>IngestService: upsert result
end
IngestService-->>IngestRouter: UpsertResult(inserted, updated, rejected, errors)
IngestRouter->>IngestRouter: build SalesDailyIngestResponse
IngestRouter-->>FastAPIApp: 200 OK + SalesDailyIngestResponse
FastAPIApp-->>Client: JSON response with counts and errors
Class_diagram_for_ingest_feature_schemas_and_servicesclassDiagram
class SalesDailyIngestRow {
date_type date
str store_code
str sku
int quantity
Decimal unit_price
Decimal total_amount
validate_total_amount_consistency() SalesDailyIngestRow
}
class SalesDailyIngestRequest {
list~SalesDailyIngestRow~ records
}
class IngestRowError {
int row_index
str store_code
str sku
date_type date
str error_code
str error_message
}
class SalesDailyIngestResponse {
int inserted_count
int updated_count
int rejected_count
int total_processed
list~IngestRowError~ errors
float duration_ms
}
class UpsertResult {
int inserted_count
int updated_count
int rejected_count
list~IngestRowError~ errors
}
class KeyResolverProtocol {
<<interface>>
resolve_store_codes(db, codes) dict~str,int~
resolve_skus(db, skus) dict~str,int~
resolve_dates(db, dates) set~date_type~
}
class KeyResolver {
resolve_store_codes(db, codes) dict~str,int~
resolve_skus(db, skus) dict~str,int~
resolve_dates(db, dates) set~date_type~
}
SalesDailyIngestRequest "1" --> "*" SalesDailyIngestRow : records
SalesDailyIngestResponse "1" --> "*" IngestRowError : errors
UpsertResult "1" --> "*" IngestRowError : errors
KeyResolverProtocol <|.. KeyResolver : implements
Architecture_flow_for_new_ingest_sales_daily_endpointflowchart LR
Client[Client]
FastAPIApp[FastAPI_app]
IngestRouter[Ingest_router_ingest_sales_daily]
IngestService[upsert_sales_daily_batch]
KeyResolver[KeyResolver]
Database[(PostgreSQL_database)]
Client -->|HTTP_POST_ingest_sales_daily| FastAPIApp
FastAPIApp --> IngestRouter
IngestRouter --> IngestService
IngestService --> KeyResolver
IngestService -->|pg_insert_ON_CONFLICT_DO_UPDATE| Database
Database --> IngestService
IngestService --> IngestRouter
IngestRouter --> FastAPIApp
FastAPIApp -->|JSON_response_counts_and_errors| Client
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 5 issues, and left some high level feedback:
- In
SalesDailyIngestResponse, theerrorsfield usesField(default=[]), which creates a shared mutable default list across instances; switch this todefault_factory=listto avoid cross-request state leakage. - The
upsert_sales_daily_batchimplementation always reportsinserted_countand leavesupdated_countat 0, even on conflict updates; either implement the xmax-based distinction you mention in the comments or adjust the API/response docs and tests to make it clear that both inserts and updates are currently counted asinserted. - The
validate_total_amount_consistencymodel validator inSalesDailyIngestRowis effectively a no-op (it justpasses even when outside tolerance); consider either enforcing the check (and surfacing a structured error) or removing the validator and documenting thattotal_amountis trusted as-is.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `SalesDailyIngestResponse`, the `errors` field uses `Field(default=[])`, which creates a shared mutable default list across instances; switch this to `default_factory=list` to avoid cross-request state leakage.
- The `upsert_sales_daily_batch` implementation always reports `inserted_count` and leaves `updated_count` at 0, even on conflict updates; either implement the xmax-based distinction you mention in the comments or adjust the API/response docs and tests to make it clear that both inserts and updates are currently counted as `inserted`.
- The `validate_total_amount_consistency` model validator in `SalesDailyIngestRow` is effectively a no-op (it just `pass`es even when outside tolerance); consider either enforcing the check (and surfacing a structured error) or removing the validator and documenting that `total_amount` is trusted as-is.
## Individual Comments
### Comment 1
<location> `app/features/ingest/service.py:215-216` </location>
<code_context>
+ ).returning(SalesDaily.id)
+
+ # Execute and get results
+ result = await db.execute(upsert_stmt)
+ rows_affected = len(result.fetchall())
+
+ # Note: Distinguishing inserted vs updated accurately requires xmax check
</code_context>
<issue_to_address>
**suggestion (performance):** Avoid fetching all returned rows just to compute the affected count.
Since the returned rows are unused, consider either dropping `RETURNING` and relying on `result.rowcount` for the affected count, or otherwise avoid materializing all rows. This prevents unnecessary memory use and improves efficiency for large batches.
Suggested implementation:
```python
set_={
"quantity": insert_stmt.excluded.quantity,
"unit_price": insert_stmt.excluded.unit_price,
"total_amount": insert_stmt.excluded.total_amount,
"updated_at": func.now(),
},
)
# Execute and get results
result = await db.execute(upsert_stmt)
rows_affected = result.rowcount or 0
```
```python
# Note: Distinguishing inserted vs updated accurately requires xmax check
# For simplicity, count all as processed (could enhance later)
# A row is "inserted" if xmax = 0, "updated" if xmax > 0
# For now, we report total as "inserted" for new batches
```
</issue_to_address>
### Comment 2
<location> `app/features/ingest/service.py:218-199` </location>
<code_context>
+ result = await db.execute(upsert_stmt)
+ rows_affected = len(result.fetchall())
+
+ # Note: Distinguishing inserted vs updated accurately requires xmax check
+ # For simplicity, count all as processed (could enhance later)
+ # A row is "inserted" if xmax = 0, "updated" if xmax > 0
+ # For now, we report total as "inserted" for new batches
+ inserted = rows_affected
+ updated = 0
+
+ logger.info(
</code_context>
<issue_to_address>
**issue:** The `updated_count` in the result and API response is always 0, which can be misleading to clients.
Because all affected rows are counted as `inserted` and `updated` is always 0, the exposed counters don’t reflect actual inserts vs updates and can mislead API consumers. Please either (1) distinguish inserts vs updates using system columns (e.g., `xmax`), (2) expose a single `processed_count`, or (3) explicitly define and implement semantics where `updated_count` mirrors `inserted_count` when they can’t be differentiated.
</issue_to_address>
### Comment 3
<location> `app/features/ingest/routes.py:66-75` </location>
<code_context>
+
+ logger.info("ingest.sales_daily.request_received", record_count=len(request.records))
+
+ try:
+ key_resolver = KeyResolver()
+ result = await upsert_sales_daily_batch(db, request.records, key_resolver)
+
+ duration_ms = (time.time() - start_time) * 1000
+
+ return SalesDailyIngestResponse(
+ inserted_count=result.inserted_count,
+ updated_count=result.updated_count,
+ rejected_count=result.rejected_count,
+ total_processed=len(request.records),
+ errors=result.errors,
+ duration_ms=round(duration_ms, 2),
+ )
+ except Exception as e:
+ logger.error(
+ "ingest.sales_daily.request_failed",
</code_context>
<issue_to_address>
**issue (bug_risk):** Catching bare `Exception` and wrapping as `DatabaseError` may hide non-database issues.
This `try/except Exception` block masks all failures (including programming errors and misconfigurations) as `DatabaseError`, which obscures the real root cause and complicates debugging. Prefer catching only the specific DB/SQLAlchemy exceptions you expect here and let other exceptions propagate so they surface with their original type and stack trace.
</issue_to_address>
### Comment 4
<location> `app/features/ingest/tests/test_routes.py:124-133` </location>
<code_context>
+ async def test_ingest_valid_records(self, client, db_session, seed_data):
</code_context>
<issue_to_address>
**issue (testing):** Integration tests use a standalone db_session that is not wired into the FastAPI app dependencies
The `db_session` fixture creates its own engine/session, while the `/ingest/sales-daily` endpoint uses the app-wide `get_db` dependency, so your assertions may be querying a different database/session than the one the route writes to. Please override `get_db` in these tests so the endpoint and assertions share the same session/engine (e.g. `app.dependency_overrides[get_db] = lambda: db_session` or an equivalent async version) to ensure the test actually verifies persisted data.
</issue_to_address>
### Comment 5
<location> `app/features/ingest/schemas.py:25` </location>
<code_context>
+ unit_price: Decimal = Field(..., ge=0, description="Price per unit")
+ total_amount: Decimal = Field(..., ge=0, description="Total sales amount")
+
+ @model_validator(mode="after")
+ def validate_total_amount_consistency(self) -> "SalesDailyIngestRow":
+ """Warn if total_amount doesn't match quantity * unit_price."""
</code_context>
<issue_to_address>
**issue (complexity):** Consider either implementing concrete behavior in this validator or removing it and documenting the intent on the field instead, so its purpose is clear and not misleading.
You can reduce complexity here by either removing the no-op branch or wiring in the intended behavior. Right now the validator looks important but does nothing, which increases mental overhead.
If you do want to keep it for consistency checks, make the behavior explicit, e.g. log a warning:
```python
import logging
logger = logging.getLogger(__name__)
class SalesDailyIngestRow(BaseModel):
...
@model_validator(mode="after")
def validate_total_amount_consistency(self) -> "SalesDailyIngestRow":
"""Warn if total_amount doesn't match quantity * unit_price.
Allows small floating-point discrepancies (0.01 tolerance).
"""
expected = self.quantity * self.unit_price
if abs(self.total_amount - expected) > Decimal("0.01"):
logger.warning(
"total_amount mismatch for sales row",
extra={
"date": self.date,
"store_code": self.store_code,
"sku": self.sku,
"quantity": self.quantity,
"unit_price": str(self.unit_price),
"total_amount": str(self.total_amount),
"expected": str(expected),
},
)
return self
```
If you don’t need any behavior yet, simplify by removing the validator entirely and leaving the intent in a comment near `total_amount`:
```python
class SalesDailyIngestRow(BaseModel):
...
total_amount: Decimal = Field(
...,
ge=0,
decimal_places=2,
description="Total sales amount (≈ quantity * unit_price; validated downstream)",
)
```
Either way, you avoid a validator that appears to enforce a contract but is currently a no-op.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| result = await db.execute(upsert_stmt) | ||
| rows_affected = len(result.fetchall()) |
There was a problem hiding this comment.
suggestion (performance): Avoid fetching all returned rows just to compute the affected count.
Since the returned rows are unused, consider either dropping RETURNING and relying on result.rowcount for the affected count, or otherwise avoid materializing all rows. This prevents unnecessary memory use and improves efficiency for large batches.
Suggested implementation:
set_={
"quantity": insert_stmt.excluded.quantity,
"unit_price": insert_stmt.excluded.unit_price,
"total_amount": insert_stmt.excluded.total_amount,
"updated_at": func.now(),
},
)
# Execute and get results
result = await db.execute(upsert_stmt)
rows_affected = result.rowcount or 0 # Note: Distinguishing inserted vs updated accurately requires xmax check
# For simplicity, count all as processed (could enhance later)
# A row is "inserted" if xmax = 0, "updated" if xmax > 0
# For now, we report total as "inserted" for new batches|
|
||
| # Perform upsert for valid rows | ||
| inserted = 0 | ||
| updated = 0 |
There was a problem hiding this comment.
issue: The updated_count in the result and API response is always 0, which can be misleading to clients.
Because all affected rows are counted as inserted and updated is always 0, the exposed counters don’t reflect actual inserts vs updates and can mislead API consumers. Please either (1) distinguish inserts vs updates using system columns (e.g., xmax), (2) expose a single processed_count, or (3) explicitly define and implement semantics where updated_count mirrors inserted_count when they can’t be differentiated.
| try: | ||
| key_resolver = KeyResolver() | ||
| result = await upsert_sales_daily_batch(db, request.records, key_resolver) | ||
|
|
||
| duration_ms = (time.perf_counter() - start_time) * 1000 | ||
|
|
||
| logger.info( | ||
| "ingest.sales_daily.request_completed", | ||
| inserted=result.inserted_count, | ||
| updated=result.updated_count, |
There was a problem hiding this comment.
issue (bug_risk): Catching bare Exception and wrapping as DatabaseError may hide non-database issues.
This try/except Exception block masks all failures (including programming errors and misconfigurations) as DatabaseError, which obscures the real root cause and complicates debugging. Prefer catching only the specific DB/SQLAlchemy exceptions you expect here and let other exceptions propagate so they surface with their original type and stack trace.
| async def test_ingest_valid_records(self, client, db_session, seed_data): | ||
| """Test ingesting valid sales records.""" | ||
| payload = { | ||
| "records": [ | ||
| { | ||
| "date": "2024-01-15", | ||
| "store_code": "S001", | ||
| "sku": "SKU-001", | ||
| "quantity": 10, | ||
| "unit_price": "9.99", |
There was a problem hiding this comment.
issue (testing): Integration tests use a standalone db_session that is not wired into the FastAPI app dependencies
The db_session fixture creates its own engine/session, while the /ingest/sales-daily endpoint uses the app-wide get_db dependency, so your assertions may be querying a different database/session than the one the route writes to. Please override get_db in these tests so the endpoint and assertions share the same session/engine (e.g. app.dependency_overrides[get_db] = lambda: db_session or an equivalent async version) to ensure the test actually verifies persisted data.
| unit_price: Decimal = Field(..., ge=0, decimal_places=2, description="Price per unit") | ||
| total_amount: Decimal = Field(..., ge=0, decimal_places=2, description="Total sales amount") | ||
|
|
||
| @model_validator(mode="after") |
There was a problem hiding this comment.
issue (complexity): Consider either implementing concrete behavior in this validator or removing it and documenting the intent on the field instead, so its purpose is clear and not misleading.
You can reduce complexity here by either removing the no-op branch or wiring in the intended behavior. Right now the validator looks important but does nothing, which increases mental overhead.
If you do want to keep it for consistency checks, make the behavior explicit, e.g. log a warning:
import logging
logger = logging.getLogger(__name__)
class SalesDailyIngestRow(BaseModel):
...
@model_validator(mode="after")
def validate_total_amount_consistency(self) -> "SalesDailyIngestRow":
"""Warn if total_amount doesn't match quantity * unit_price.
Allows small floating-point discrepancies (0.01 tolerance).
"""
expected = self.quantity * self.unit_price
if abs(self.total_amount - expected) > Decimal("0.01"):
logger.warning(
"total_amount mismatch for sales row",
extra={
"date": self.date,
"store_code": self.store_code,
"sku": self.sku,
"quantity": self.quantity,
"unit_price": str(self.unit_price),
"total_amount": str(self.total_amount),
"expected": str(expected),
},
)
return selfIf you don’t need any behavior yet, simplify by removing the validator entirely and leaving the intent in a comment near total_amount:
class SalesDailyIngestRow(BaseModel):
...
total_amount: Decimal = Field(
...,
ge=0,
decimal_places=2,
description="Total sales amount (≈ quantity * unit_price; validated downstream)",
)Either way, you avoid a validator that appears to enforce a contract but is currently a no-op.
- Replace inserted_count/updated_count with processed_count (ON CONFLICT can't distinguish inserts vs updates without xmax check complexity) - Remove misleading updated_count field that was always 0 - Use result.rowcount instead of len(result.fetchall()) for efficiency - Change errors field from mutable default=[] to default_factory=list - Catch specific SQLAlchemyError instead of bare Exception - Wire db_session into FastAPI deps override in integration tests - Remove no-op validate_total_amount_consistency validator Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Summary
POST /ingest/sales-dailyendpoint with natural key resolution (store_code → store_id, sku → product_id)ON CONFLICT DO UPDATEfor replay-safe idempotent upsertsChanges
New Files
app/features/ingest/- Complete vertical slice (schemas, service, routes, tests)examples/api/ingest_sales_daily.http- HTTP client examplesModified Files
app/core/config.py- Addedingest_batch_size,ingest_timeout_secondssettingsapp/main.py- Registered ingest routerpyproject.toml- Updated ruff per-file-ignores for routesFeatures
ingest.sales_daily.{action}_{state}eventsTest plan
🤖 Generated with Claude Code
Summary by Sourcery
Add an ingest feature for idempotent batch upsert of daily sales data exposed via a new /ingest/sales-daily API endpoint, including key resolution, validation, and structured logging.
New Features:
Enhancements:
Build:
Documentation:
Tests: