Skip to content

feat(ingest): implement idempotent batch upsert endpoint for sales_daily#19

Merged
w7-mgfcode merged 3 commits into
devfrom
feat/prp-3-ingest-layer
Jan 26, 2026
Merged

feat(ingest): implement idempotent batch upsert endpoint for sales_daily#19
w7-mgfcode merged 3 commits into
devfrom
feat/prp-3-ingest-layer

Conversation

@w7-mgfcode
Copy link
Copy Markdown
Owner

@w7-mgfcode w7-mgfcode commented Jan 26, 2026

Summary

  • Implement POST /ingest/sales-daily endpoint with natural key resolution (store_code → store_id, sku → product_id)
  • Add PostgreSQL ON CONFLICT DO UPDATE for replay-safe idempotent upserts
  • Support partial success pattern: valid rows processed, invalid rows returned with error details

Changes

New Files

  • app/features/ingest/ - Complete vertical slice (schemas, service, routes, tests)
  • examples/api/ingest_sales_daily.http - HTTP client examples

Modified Files

  • app/core/config.py - Added ingest_batch_size, ingest_timeout_seconds settings
  • app/main.py - Registered ingest router
  • pyproject.toml - Updated ruff per-file-ignores for routes

Features

  • KeyResolver service for batch key lookups
  • Calendar date FK validation (dates must exist in calendar table)
  • Structured logging: ingest.sales_daily.{action}_{state} events
  • Request correlation via X-Request-ID header

Test plan

  • Unit tests for schemas (18 tests)
  • Unit tests for service logic (15 tests)
  • Integration tests for API endpoint (9 tests)
  • Idempotency verification (replay same payload = update, not duplicate)
  • Partial success with mixed valid/invalid rows
  • All validation gates passing (ruff, mypy, pyright, pytest: 101 tests)

🤖 Generated with Claude Code

Summary by Sourcery

Add an ingest feature for idempotent batch upsert of daily sales data exposed via a new /ingest/sales-daily API endpoint, including key resolution, validation, and structured logging.

New Features:

  • Introduce a vertical ingest module with schemas, service layer, and FastAPI routes for POST /ingest/sales-daily batch upserts using natural keys.
  • Provide typed request/response models that support row-level error reporting and partial success semantics for sales daily ingestion.
  • Add HTTP example file for invoking the sales daily ingest endpoint.

Enhancements:

  • Extend application settings with configurable ingest batch size and timeout options.
  • Register the ingest router in the main FastAPI application to expose the new endpoint.
  • Introduce a KeyResolver service to batch-resolve store codes, SKUs, and calendar dates to internal IDs and validate referential integrity.
  • Implement replay-safe PostgreSQL upsert logic for SalesDaily using ON CONFLICT DO UPDATE with structured logging around ingest operations.

Build:

  • Relax Ruff linting in test files and ingest route modules to accommodate new patterns.

Documentation:

  • Add a detailed PRP design document describing the ingest layer architecture, behavior, and validation strategy.

Tests:

  • Add unit tests for ingest schemas, key resolution, and batch upsert service behavior including partial failures.
  • Add integration tests for the /ingest/sales-daily endpoint covering happy path, idempotency, validation errors, and request ID propagation.
  • Introduce shared ingest test fixtures for reusable sample payloads and mock key resolution.

w7-learn and others added 2 commits January 26, 2026 12:11
Comprehensive implementation plan for idempotent batch upsert endpoints:
- POST /ingest/sales-daily with natural key resolution (store_code, sku)
- PostgreSQL ON CONFLICT DO UPDATE for replay-safe ingestion
- Partial success pattern with row-level error reporting
- Configurable batch size and timeout settings
- 13 ordered implementation tasks with validation gates

Confidence score: 8/10

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Implements PRP-3 Ingest Layer with:

- POST /ingest/sales-daily endpoint accepting natural keys (store_code, sku)
- KeyResolver service for store_code → store_id, sku → product_id resolution
- Calendar date FK validation (dates must exist in calendar table)
- PostgreSQL ON CONFLICT DO UPDATE for replay-safe upserts
- Partial success pattern: valid rows processed, invalid rows returned as errors
- Configurable batch size and timeout via Settings
- Structured logging with ingest.sales_daily.{action}_{state} events

New files:
- app/features/ingest/ - Complete vertical slice (schemas, service, routes, tests)
- examples/api/ingest_sales_daily.http - HTTP client examples

Test coverage: 42 new tests (18 schema + 15 service + 9 integration)
All validation gates passing: ruff, mypy, pyright, pytest (101 tests)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Jan 26, 2026

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@sourcery-ai
Copy link
Copy Markdown

sourcery-ai Bot commented Jan 26, 2026

Reviewer's Guide

Implements a new idempotent batch ingest endpoint POST /ingest/sales-daily with natural key resolution, partial-success semantics, and Postgres ON CONFLICT upserts, including full vertical slice (schemas, service, routes, tests), new ingest settings, router wiring, and lint config tweaks.

Sequence diagram for POST_ingest_sales_daily_batch_upsert_flow

sequenceDiagram
    actor Client
    participant FastAPIApp
    participant IngestRouter
    participant IngestService
    participant KeyResolver
    participant Database

    Client->>FastAPIApp: POST /ingest/sales-daily
    FastAPIApp->>IngestRouter: route ingest_sales_daily
    IngestRouter->>IngestRouter: validate SalesDailyIngestRequest
    IngestRouter->>IngestService: upsert_sales_daily_batch(records, db, KeyResolver)

    IngestService->>KeyResolver: resolve_store_codes(db, store_codes)
    KeyResolver-->>IngestService: store_code_to_id_map

    IngestService->>KeyResolver: resolve_skus(db, skus)
    KeyResolver-->>IngestService: sku_to_product_id_map

    IngestService->>KeyResolver: resolve_dates(db, dates)
    KeyResolver-->>IngestService: valid_dates

    IngestService->>IngestService: build valid_rows and IngestRowError list

    alt has_valid_rows
        IngestService->>Database: pg_insert(SalesDaily).on_conflict_do_update
        Database-->>IngestService: upsert result
    end

    IngestService-->>IngestRouter: UpsertResult(inserted, updated, rejected, errors)
    IngestRouter->>IngestRouter: build SalesDailyIngestResponse
    IngestRouter-->>FastAPIApp: 200 OK + SalesDailyIngestResponse
    FastAPIApp-->>Client: JSON response with counts and errors
Loading

Class_diagram_for_ingest_feature_schemas_and_services

classDiagram
    class SalesDailyIngestRow {
        date_type date
        str store_code
        str sku
        int quantity
        Decimal unit_price
        Decimal total_amount
        validate_total_amount_consistency() SalesDailyIngestRow
    }

    class SalesDailyIngestRequest {
        list~SalesDailyIngestRow~ records
    }

    class IngestRowError {
        int row_index
        str store_code
        str sku
        date_type date
        str error_code
        str error_message
    }

    class SalesDailyIngestResponse {
        int inserted_count
        int updated_count
        int rejected_count
        int total_processed
        list~IngestRowError~ errors
        float duration_ms
    }

    class UpsertResult {
        int inserted_count
        int updated_count
        int rejected_count
        list~IngestRowError~ errors
    }

    class KeyResolverProtocol {
        <<interface>>
        resolve_store_codes(db, codes) dict~str,int~
        resolve_skus(db, skus) dict~str,int~
        resolve_dates(db, dates) set~date_type~
    }

    class KeyResolver {
        resolve_store_codes(db, codes) dict~str,int~
        resolve_skus(db, skus) dict~str,int~
        resolve_dates(db, dates) set~date_type~
    }

    SalesDailyIngestRequest "1" --> "*" SalesDailyIngestRow : records
    SalesDailyIngestResponse "1" --> "*" IngestRowError : errors
    UpsertResult "1" --> "*" IngestRowError : errors

    KeyResolverProtocol <|.. KeyResolver : implements
Loading

Architecture_flow_for_new_ingest_sales_daily_endpoint

flowchart LR
    Client[Client]
    FastAPIApp[FastAPI_app]
    IngestRouter[Ingest_router_ingest_sales_daily]
    IngestService[upsert_sales_daily_batch]
    KeyResolver[KeyResolver]
    Database[(PostgreSQL_database)]

    Client -->|HTTP_POST_ingest_sales_daily| FastAPIApp
    FastAPIApp --> IngestRouter
    IngestRouter --> IngestService
    IngestService --> KeyResolver
    IngestService -->|pg_insert_ON_CONFLICT_DO_UPDATE| Database
    Database --> IngestService
    IngestService --> IngestRouter
    IngestRouter --> FastAPIApp
    FastAPIApp -->|JSON_response_counts_and_errors| Client
Loading

File-Level Changes

Change Details Files
Add ingest vertical slice for POST /ingest/sales-daily with natural-key-based, idempotent batch upsert and partial success handling.
  • Define Pydantic request/response models for sales-daily ingest rows, batch payloads, and per-row errors, enforcing non-negative numeric fields, string length bounds, and light cross-field monetary consistency checks.
  • Implement KeyResolver service to batch-resolve store_code, sku, and dates against Store, Product, and Calendar tables using async SQLAlchemy queries.
  • Implement upsert_sales_daily_batch to validate resolved keys and calendar dates, build valid_rows vs per-row IngestRowError list, and execute a PostgreSQL insert ... on_conflict_do_update upsert on SalesDaily(date, store_id, product_id) with updated financial fields and timestamps, returning aggregated counts.
  • Expose a FastAPI router with POST /ingest/sales-daily that calls the service, logs structured ingest.sales_daily.* events, measures duration, wraps results into the response schema, and converts unexpected failures into DatabaseError.
  • Export ingest feature surface (router, schemas, service) via package init and add feature-specific pytest fixtures for sample rows and a MockKeyResolver.
app/features/ingest/__init__.py
app/features/ingest/schemas.py
app/features/ingest/service.py
app/features/ingest/routes.py
app/features/ingest/tests/__init__.py
app/features/ingest/tests/conftest.py
Add comprehensive unit and integration tests for the ingest feature, including end-to-end API behavior and idempotency.
  • Add schema-focused tests for SalesDailyIngestRow, SalesDailyIngestRequest, IngestRowError, and SalesDailyIngestResponse covering happy paths and validation failures.
  • Add service-focused tests for KeyResolver (stores, products, dates) and upsert_sales_daily_batch for all-valid, all-rejected, and mixed valid/invalid batches using the mock resolver.
  • Add integration tests that spin up a temporary DB schema, seed Store/Product/Calendar data, exercise POST /ingest/sales-daily for happy path, idempotent replays, partial success, unknown keys/dates, validation errors, and assert X-Request-ID propagation plus persisted SalesDaily rows.
app/features/ingest/tests/test_schemas.py
app/features/ingest/tests/test_service.py
app/features/ingest/tests/test_routes.py
Wire ingest feature into the application and configuration, and adjust lint settings for routes and tests.
  • Introduce ingest_batch_size and ingest_timeout_seconds settings on the core Settings model for future batching/timeout control.
  • Register the ingest router in the FastAPI app so /ingest/sales-daily is exposed alongside existing routers.
  • Relax Ruff per-file-ignores to allow additional unused-arg-style warnings in tests and ignore B008 for all feature route modules.
  • Add an HTTP example file documenting typical ingest requests, idempotent replay, and partial success responses (file body not shown in diff).
app/core/config.py
app/main.py
pyproject.toml
examples/api/ingest_sales_daily.http
Add internal design/PRP document describing the ingest layer architecture and constraints.
  • Document goals, success criteria, codebase context, implementation blueprint, testing/validation strategy, and anti-patterns for the ingest layer and sales-daily endpoint.
  • Capture SQLAlchemy/Postgres/Pydantic/FastAPI nuances (ON CONFLICT usage, Decimal handling, FK requirements, batching constraints) to guide future changes and reviewers.
PRPs/PRP-3-ingest-layer.md

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 5 issues, and left some high level feedback:

  • In SalesDailyIngestResponse, the errors field uses Field(default=[]), which creates a shared mutable default list across instances; switch this to default_factory=list to avoid cross-request state leakage.
  • The upsert_sales_daily_batch implementation always reports inserted_count and leaves updated_count at 0, even on conflict updates; either implement the xmax-based distinction you mention in the comments or adjust the API/response docs and tests to make it clear that both inserts and updates are currently counted as inserted.
  • The validate_total_amount_consistency model validator in SalesDailyIngestRow is effectively a no-op (it just passes even when outside tolerance); consider either enforcing the check (and surfacing a structured error) or removing the validator and documenting that total_amount is trusted as-is.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `SalesDailyIngestResponse`, the `errors` field uses `Field(default=[])`, which creates a shared mutable default list across instances; switch this to `default_factory=list` to avoid cross-request state leakage.
- The `upsert_sales_daily_batch` implementation always reports `inserted_count` and leaves `updated_count` at 0, even on conflict updates; either implement the xmax-based distinction you mention in the comments or adjust the API/response docs and tests to make it clear that both inserts and updates are currently counted as `inserted`.
- The `validate_total_amount_consistency` model validator in `SalesDailyIngestRow` is effectively a no-op (it just `pass`es even when outside tolerance); consider either enforcing the check (and surfacing a structured error) or removing the validator and documenting that `total_amount` is trusted as-is.

## Individual Comments

### Comment 1
<location> `app/features/ingest/service.py:215-216` </location>
<code_context>
+        ).returning(SalesDaily.id)
+
+        # Execute and get results
+        result = await db.execute(upsert_stmt)
+        rows_affected = len(result.fetchall())
+
+        # Note: Distinguishing inserted vs updated accurately requires xmax check
</code_context>

<issue_to_address>
**suggestion (performance):** Avoid fetching all returned rows just to compute the affected count.

Since the returned rows are unused, consider either dropping `RETURNING` and relying on `result.rowcount` for the affected count, or otherwise avoid materializing all rows. This prevents unnecessary memory use and improves efficiency for large batches.

Suggested implementation:

```python
            set_={
                "quantity": insert_stmt.excluded.quantity,
                "unit_price": insert_stmt.excluded.unit_price,
                "total_amount": insert_stmt.excluded.total_amount,
                "updated_at": func.now(),
            },
        )

        # Execute and get results
        result = await db.execute(upsert_stmt)
        rows_affected = result.rowcount or 0

```

```python
        # Note: Distinguishing inserted vs updated accurately requires xmax check
        # For simplicity, count all as processed (could enhance later)
        # A row is "inserted" if xmax = 0, "updated" if xmax > 0
        # For now, we report total as "inserted" for new batches

```
</issue_to_address>

### Comment 2
<location> `app/features/ingest/service.py:218-199` </location>
<code_context>
+        result = await db.execute(upsert_stmt)
+        rows_affected = len(result.fetchall())
+
+        # Note: Distinguishing inserted vs updated accurately requires xmax check
+        # For simplicity, count all as processed (could enhance later)
+        # A row is "inserted" if xmax = 0, "updated" if xmax > 0
+        # For now, we report total as "inserted" for new batches
+        inserted = rows_affected
+        updated = 0
+
+    logger.info(
</code_context>

<issue_to_address>
**issue:** The `updated_count` in the result and API response is always 0, which can be misleading to clients.

Because all affected rows are counted as `inserted` and `updated` is always 0, the exposed counters don’t reflect actual inserts vs updates and can mislead API consumers. Please either (1) distinguish inserts vs updates using system columns (e.g., `xmax`), (2) expose a single `processed_count`, or (3) explicitly define and implement semantics where `updated_count` mirrors `inserted_count` when they can’t be differentiated.
</issue_to_address>

### Comment 3
<location> `app/features/ingest/routes.py:66-75` </location>
<code_context>
+
+        logger.info("ingest.sales_daily.request_received", record_count=len(request.records))
+
+        try:
+            key_resolver = KeyResolver()
+            result = await upsert_sales_daily_batch(db, request.records, key_resolver)
+
+            duration_ms = (time.time() - start_time) * 1000
+
+            return SalesDailyIngestResponse(
+                inserted_count=result.inserted_count,
+                updated_count=result.updated_count,
+                rejected_count=result.rejected_count,
+                total_processed=len(request.records),
+                errors=result.errors,
+                duration_ms=round(duration_ms, 2),
+            )
+        except Exception as e:
+            logger.error(
+                "ingest.sales_daily.request_failed",
</code_context>

<issue_to_address>
**issue (bug_risk):** Catching bare `Exception` and wrapping as `DatabaseError` may hide non-database issues.

This `try/except Exception` block masks all failures (including programming errors and misconfigurations) as `DatabaseError`, which obscures the real root cause and complicates debugging. Prefer catching only the specific DB/SQLAlchemy exceptions you expect here and let other exceptions propagate so they surface with their original type and stack trace.
</issue_to_address>

### Comment 4
<location> `app/features/ingest/tests/test_routes.py:124-133` </location>
<code_context>
+    async def test_ingest_valid_records(self, client, db_session, seed_data):
</code_context>

<issue_to_address>
**issue (testing):** Integration tests use a standalone db_session that is not wired into the FastAPI app dependencies

The `db_session` fixture creates its own engine/session, while the `/ingest/sales-daily` endpoint uses the app-wide `get_db` dependency, so your assertions may be querying a different database/session than the one the route writes to. Please override `get_db` in these tests so the endpoint and assertions share the same session/engine (e.g. `app.dependency_overrides[get_db] = lambda: db_session` or an equivalent async version) to ensure the test actually verifies persisted data.
</issue_to_address>

### Comment 5
<location> `app/features/ingest/schemas.py:25` </location>
<code_context>
+    unit_price: Decimal = Field(..., ge=0, description="Price per unit")
+    total_amount: Decimal = Field(..., ge=0, description="Total sales amount")
+
+    @model_validator(mode="after")
+    def validate_total_amount_consistency(self) -> "SalesDailyIngestRow":
+        """Warn if total_amount doesn't match quantity * unit_price."""
</code_context>

<issue_to_address>
**issue (complexity):** Consider either implementing concrete behavior in this validator or removing it and documenting the intent on the field instead, so its purpose is clear and not misleading.

You can reduce complexity here by either removing the no-op branch or wiring in the intended behavior. Right now the validator looks important but does nothing, which increases mental overhead.

If you do want to keep it for consistency checks, make the behavior explicit, e.g. log a warning:

```python
import logging

logger = logging.getLogger(__name__)

class SalesDailyIngestRow(BaseModel):
    ...
    @model_validator(mode="after")
    def validate_total_amount_consistency(self) -> "SalesDailyIngestRow":
        """Warn if total_amount doesn't match quantity * unit_price.

        Allows small floating-point discrepancies (0.01 tolerance).
        """
        expected = self.quantity * self.unit_price
        if abs(self.total_amount - expected) > Decimal("0.01"):
            logger.warning(
                "total_amount mismatch for sales row",
                extra={
                    "date": self.date,
                    "store_code": self.store_code,
                    "sku": self.sku,
                    "quantity": self.quantity,
                    "unit_price": str(self.unit_price),
                    "total_amount": str(self.total_amount),
                    "expected": str(expected),
                },
            )
        return self
```

If you don’t need any behavior yet, simplify by removing the validator entirely and leaving the intent in a comment near `total_amount`:

```python
class SalesDailyIngestRow(BaseModel):
    ...
    total_amount: Decimal = Field(
        ...,
        ge=0,
        decimal_places=2,
        description="Total sales amount (≈ quantity * unit_price; validated downstream)",
    )
```

Either way, you avoid a validator that appears to enforce a contract but is currently a no-op.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread app/features/ingest/service.py Outdated
Comment on lines +215 to +216
result = await db.execute(upsert_stmt)
rows_affected = len(result.fetchall())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Avoid fetching all returned rows just to compute the affected count.

Since the returned rows are unused, consider either dropping RETURNING and relying on result.rowcount for the affected count, or otherwise avoid materializing all rows. This prevents unnecessary memory use and improves efficiency for large batches.

Suggested implementation:

            set_={
                "quantity": insert_stmt.excluded.quantity,
                "unit_price": insert_stmt.excluded.unit_price,
                "total_amount": insert_stmt.excluded.total_amount,
                "updated_at": func.now(),
            },
        )

        # Execute and get results
        result = await db.execute(upsert_stmt)
        rows_affected = result.rowcount or 0
        # Note: Distinguishing inserted vs updated accurately requires xmax check
        # For simplicity, count all as processed (could enhance later)
        # A row is "inserted" if xmax = 0, "updated" if xmax > 0
        # For now, we report total as "inserted" for new batches

Comment thread app/features/ingest/service.py Outdated

# Perform upsert for valid rows
inserted = 0
updated = 0
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: The updated_count in the result and API response is always 0, which can be misleading to clients.

Because all affected rows are counted as inserted and updated is always 0, the exposed counters don’t reflect actual inserts vs updates and can mislead API consumers. Please either (1) distinguish inserts vs updates using system columns (e.g., xmax), (2) expose a single processed_count, or (3) explicitly define and implement semantics where updated_count mirrors inserted_count when they can’t be differentiated.

Comment thread app/features/ingest/routes.py Outdated
Comment on lines +66 to +75
try:
key_resolver = KeyResolver()
result = await upsert_sales_daily_batch(db, request.records, key_resolver)

duration_ms = (time.perf_counter() - start_time) * 1000

logger.info(
"ingest.sales_daily.request_completed",
inserted=result.inserted_count,
updated=result.updated_count,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Catching bare Exception and wrapping as DatabaseError may hide non-database issues.

This try/except Exception block masks all failures (including programming errors and misconfigurations) as DatabaseError, which obscures the real root cause and complicates debugging. Prefer catching only the specific DB/SQLAlchemy exceptions you expect here and let other exceptions propagate so they surface with their original type and stack trace.

Comment on lines +124 to +133
async def test_ingest_valid_records(self, client, db_session, seed_data):
"""Test ingesting valid sales records."""
payload = {
"records": [
{
"date": "2024-01-15",
"store_code": "S001",
"sku": "SKU-001",
"quantity": 10,
"unit_price": "9.99",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (testing): Integration tests use a standalone db_session that is not wired into the FastAPI app dependencies

The db_session fixture creates its own engine/session, while the /ingest/sales-daily endpoint uses the app-wide get_db dependency, so your assertions may be querying a different database/session than the one the route writes to. Please override get_db in these tests so the endpoint and assertions share the same session/engine (e.g. app.dependency_overrides[get_db] = lambda: db_session or an equivalent async version) to ensure the test actually verifies persisted data.

Comment thread app/features/ingest/schemas.py Outdated
unit_price: Decimal = Field(..., ge=0, decimal_places=2, description="Price per unit")
total_amount: Decimal = Field(..., ge=0, decimal_places=2, description="Total sales amount")

@model_validator(mode="after")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider either implementing concrete behavior in this validator or removing it and documenting the intent on the field instead, so its purpose is clear and not misleading.

You can reduce complexity here by either removing the no-op branch or wiring in the intended behavior. Right now the validator looks important but does nothing, which increases mental overhead.

If you do want to keep it for consistency checks, make the behavior explicit, e.g. log a warning:

import logging

logger = logging.getLogger(__name__)

class SalesDailyIngestRow(BaseModel):
    ...
    @model_validator(mode="after")
    def validate_total_amount_consistency(self) -> "SalesDailyIngestRow":
        """Warn if total_amount doesn't match quantity * unit_price.

        Allows small floating-point discrepancies (0.01 tolerance).
        """
        expected = self.quantity * self.unit_price
        if abs(self.total_amount - expected) > Decimal("0.01"):
            logger.warning(
                "total_amount mismatch for sales row",
                extra={
                    "date": self.date,
                    "store_code": self.store_code,
                    "sku": self.sku,
                    "quantity": self.quantity,
                    "unit_price": str(self.unit_price),
                    "total_amount": str(self.total_amount),
                    "expected": str(expected),
                },
            )
        return self

If you don’t need any behavior yet, simplify by removing the validator entirely and leaving the intent in a comment near total_amount:

class SalesDailyIngestRow(BaseModel):
    ...
    total_amount: Decimal = Field(
        ...,
        ge=0,
        decimal_places=2,
        description="Total sales amount (≈ quantity * unit_price; validated downstream)",
    )

Either way, you avoid a validator that appears to enforce a contract but is currently a no-op.

- Replace inserted_count/updated_count with processed_count (ON CONFLICT
  can't distinguish inserts vs updates without xmax check complexity)
- Remove misleading updated_count field that was always 0
- Use result.rowcount instead of len(result.fetchall()) for efficiency
- Change errors field from mutable default=[] to default_factory=list
- Catch specific SQLAlchemyError instead of bare Exception
- Wire db_session into FastAPI deps override in integration tests
- Remove no-op validate_total_amount_consistency validator

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@w7-mgfcode w7-mgfcode merged commit 0e15cb3 into dev Jan 26, 2026
8 checks passed
@w7-mgfcode w7-mgfcode deleted the feat/prp-3-ingest-layer branch January 26, 2026 12:57
This was referenced Jan 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants