
---

# 📨 Kafka + Event Schemas (Pydantic v2)

### 🎯 Intent

Use **Pydantic v2** to define **typed, versioned event schemas** for Kafka (or any MQ) so producers/consumers stay safe, compatible, and lean.

---

### 🧩 Core Components

1. **📦 Event Envelope (standard shape)**

   ```python
   class EventEnvelope(BaseModel):
       id: UUID
       type: Literal["user.created", "order.paid"]
       occurred_at: AwareDatetime
       source: str
       schema_version: int = 1
       data: dict
   ```

   * Always wrap events with common metadata.

2. **🔀 Variant Payloads**

   * Define models per event and discriminate by `type`.
   * Example:

     ```python
     class UserCreated(BaseModel): user_id: UUID; email: EmailStr
     class OrderPaid(BaseModel): order_id: UUID; amount: Decimal
     Event = Union[UserCreated, OrderPaid]
     ```

3. **📤 Serialization**

   * **Producer**:
     `event.model_dump_json(mode="json", by_alias=True, exclude_none=True)`
   * **Consumer**:
     `EventEnvelope.model_validate_json(msg.value())`

4. **🧰 Bulk Validation**

   * `TypeAdapter(list[EventEnvelope]).validate_python(batch)`
   * Errors show which item failed (`loc` has index).

5. **📜 Versioning**

   * Keep `schema_version`.
   * Make additive changes (new optional fields).
   * Mark deprecated fields with `Field(deprecated=True)`.

6. **🏷️ Contracts / Docs**

   * Publish JSON schema:
     `EventEnvelope.model_json_schema(by_alias=True)`
   * Use snapshots to detect breaking changes.

7. **🧭 Kafka Keys & Ordering**

   * Use a stable key (`user_id`, `order_id`) for partitioning.
   * Validate it exists in payload.

8. **🛡️ Sensitive Data**

   * Mask with `@field_serializer`.
   * Only send IDs; fetch sensitive details later.

9. **🧪 Consumer Validation**

   * On failure → log `e.errors()`, push to **DLQ**.
   * Don’t crash the whole consumer loop.

10. **⚙️ Avro / Schema Registry**

* Generate Avro/JSON Schema from Pydantic models.
* Keep enums/literals consistent across systems.

11. **⏱ Performance**

* Keep payloads **flat & small**.
* Compress at Kafka level (gzip/zstd).
* Reuse adapters/models across messages.

12. **🧯 Idempotency & Replays**

* Deduplicate by `id: UUID`.
* Consumers must be **idempotent** (safe to retry).

13. **📊 Observability**

* Add `trace_id`, `span_id` in envelope.
* Log errors per event type/version.
* Export metrics for monitoring.

---
