This project implements a simple e-commerce system using Domain-Driven Design (DDD), Command Query Responsibility Segregation (CQRS), and event-driven patterns. The application is built as a monolith with two main domains: Orders and Customers, designed with eventual microservice extraction in mind.
- E-Commerce DDD, CQRS, and Event Sourcing
The application follows a layered architecture, separating concerns to promote maintainability and scalability.
graph TD
API[API Layer] --> |Commands/Queries| AppLayer[Application Layer]
AppLayer --> |Uses| DomainLayer[Domain Layer]
AppLayer --> |Uses| Infra[Infrastructure Layer]
subgraph AppLayer [Application Layer]
CmdHandlers[Command Handlers]
QueryHandlers[Query Handlers]
end
subgraph DomainLayer [Domain Layer]
Agg[Aggregates & Entities]
InternalDomEvents[Internal Domain Events]
end
subgraph Infra [Infrastructure Layer]
WM[Write Model DB - SQLite];
RM[Read Model DB - MongoDB];
InternalDispatcher[DomainEventDispatcher];
IntegrationDispatcher[IntegrationEventDispatcher];
IntegrationPublishers[Integration Event Publishers];
Mappers[Event Mappers Internal to Public];
EventHandlerRegistry[Event Handler Registry];
EvStore[Event Store - Auditing];
Repos[Repositories];
end
subgraph IntegrationContracts [Integration Event Contracts]
PublicEvents[Public Event DTOs - Pydantic];
end
CmdHandlers --> Agg
Agg -- Generates --> InternalDomEvents
CmdHandlers -- Dispatches via --> InternalDispatcher
CmdHandlers -- Publishes specific events via --> IntegrationPublishers
IntegrationPublishers -- Uses --> Mappers
Mappers -- Creates --> PublicEvents
IntegrationPublishers -- Dispatches via --> IntegrationDispatcher
InternalDispatcher -- Uses --> EventHandlerRegistry
IntegrationDispatcher -- Uses --> EventHandlerRegistry
EventHandlerRegistry -- Routes internal events to --> InternalHandlers[Internal Event Handlers - Same BC];
EventHandlerRegistry -- Routes public contracts to --> IntegrationHandlers[Integration Event Handlers - Other BCs];
InternalHandlers -- Updates --> RM
InternalHandlers -- Updates --> EvStore
IntegrationHandlers -- Updates --> RM
QueryHandlers -- Reads from --> RM
CmdHandlers -- Writes to --> WM
Key Layers:
- API Layer: Exposes HTTP endpoints (FastAPI). Converts HTTP requests to Commands or Queries.
- Application Layer: Orchestrates use cases. Contains Command Handlers and Query Handlers. It uses the Domain Layer for business logic and the Infrastructure Layer for persistence and other services.
- Domain Layer: The heart of the business logic. Contains Aggregates, Entities, Value Objects, and internal Domain Events. This layer is independent of other layers.
- Infrastructure Layer: Provides technical implementations for cross-cutting concerns like database access (Repositories), event dispatching (both internal and integration), event publishing, and external service integrations.
- Integration Contracts Layer: Defines the public, shared event DTOs (Pydantic models) that are used for communication between bounded contexts.
Domain logic is encapsulated within Entities and Aggregates. AggregateRoot
is a base class for aggregates, providing a mechanism to collect domain events.
# src/domain/core/AggregateRoot.py
class AggregateRoot(Entity):
def __init__(self):
self._domain_events: List[DomainEvent] = [] # Internal domain events
def add_domain_event(self, event: DomainEvent):
self._domain_events.append(event)
# ... other methods
These events represent significant occurrences within a specific domain. They are simple data classes inheriting from a base DomainEvent
and are handled by the DomainEventDispatcher
.
# src/domain/core/events/DomainEvent.py
class DomainEvent(Message): # Assuming Message is a common base
# ... common event properties
pass
# Example: src/domain/orders/events/OrderPlacedEvent.py
class OrderPlacedEvent(DomainEvent): # Internal event
# ... order specific details
pass
Examples: OrderPlacedEvent
(internal to Orders), CustomerRegisteredEvent
(internal to Customers).
Commands (write operations) and Queries (read operations) are strictly separated.
Commands express an intent to change the state of the system. Command Handlers process these commands, interact with Aggregates, and utilize repositories for persistence. They are now also responsible for explicitly deciding which internal events should be published as public integration events.
# src/application/orders/commands/PlaceOrderCommand.py
class PlaceOrderCommand(BaseModel):
customer_id: UUID
items: list[OrderItem]
# src/application/orders/commands/PlaceOrderHandler.py
class PlaceOrderHandler:
def __init__(
self,
write_repository: IOrderWriteRepository,
domain_event_dispatcher: DomainEventDispatcher, # For internal events
integration_event_publisher: OrderIntegrationEventPublisher # For publishing public events
):
# ...
async def handle(self, command: PlaceOrderCommand):
# 1. Create/load Order Aggregate
# 2. Execute business logic on Aggregate (which generates internal domain events)
# 3. Save Aggregate via Write Repository
# 4. Dispatch generated internal Domain Events (using domain_event_dispatcher)
# 5. For specific internal events, explicitly publish them as public integration events
# (using integration_event_publisher, which maps and uses IntegrationEventDispatcher)
for event in order.domain_events:
await self._domain_event_dispatcher.dispatch(event) # Internal dispatch
if isinstance(event, InternalOrderPlacedEvent): # InternalOrderPlacedEvent is an alias
await self._integration_event_publisher.publish(event) # Publish as public contract
Queries are used to retrieve data for presentation. Query Handlers directly fetch data from the Read Model (e.g., MongoDB) without involving domain aggregates. (This part remains largely unchanged).
The system uses separate data stores:
- Write Model: SQLite (via SQLAlchemy) for transactional consistency for aggregates.
- Read Model: MongoDB for optimized querying and denormalized views.
The system now employs a dual dispatcher mechanism to clearly separate the handling of internal domain events from public integration events.
graph TD
subgraph ApplicationLayer
CmdHandler[Command Handler]
end
subgraph DomainLayer
Agg[Aggregate]
InternalEvent[Internal DomainEvent]
end
Agg -- Generates --> InternalEvent
CmdHandler -- Dispatches --> DomainDispatcher[DomainEventDispatcher]
DomainDispatcher -- Routes to --> InternalHandler[Internal Handler - Same BC]
InternalHandler -- Modifies --> ReadModelA[BC-Specific Read Model A]
InternalHandler -- Interacts with --> EventStoreForAudit[Event Store - Audit]
CmdHandler -- Publishes via --> Publisher[BC-Specific IntegrationEventPublisher - e.g., OrderPublisher]
Publisher -- Uses --> Mapper[Event Mapper - Internal -> Public Contract]
Mapper -- Creates --> PublicContract[Public EventContract Pydantic DTO]
Publisher -- Dispatches via --> IntegrationDispatcher[IntegrationEventDispatcher]
IntegrationDispatcher -- Routes to --> IntegrationHandler[Integration Handler -Other BCs]
IntegrationHandler -- Modifies --> ReadModelB[BC-Specific Read Model B]
subgraph BootstrapAndRegistry ["Bootstrap & Registry - Infrastructure"]
direction LR
BS[bootstrap.py]
REG[registry.py]
end
BS -- Creates & Configures --> DomainDispatcher
BS -- Creates & Configures --> IntegrationDispatcher
BS -- Calls --> REG
REG -- Registers handlers with --> DomainDispatcher
REG -- Registers handlers with --> IntegrationDispatcher
- Path:
src/domain/core/events/DomainEventDispatcher.py
- Responsibility: Handles
DomainEvent
subclasses that are internal to a bounded context. It notifies handlers within the same bounded context. - Registration: Handlers for these internal events are registered with this dispatcher via
registry.py
.
- Path:
src/infrastructure/core/events/integration_event_dispatcher.py
- Responsibility: Handles public
EventContractVx
(Pydantic models) that are designed for cross-bounded context communication. It simulates a message bus, notifying handlers in other bounded contexts that subscribe to these public contracts. - Registration: Handlers in one BC that consume public event contracts from another BC are registered with this dispatcher via
registry.py
.
-
Bootstrap (
src/infrastructure/core/events/bootstrap.py
):create_domain_event_dispatcher(container)
: Creates theDomainEventDispatcher
and registers essential global handlers (likeEventStoreHandler
for auditing internal events).create_integration_event_dispatcher()
: Creates theIntegrationEventDispatcher
.register_all_event_handlers(domain_dispatcher, integration_dispatcher, container)
: Calls functions inregistry.py
to register all domain-specific and integration handlers with their respective dispatchers.
-
Registry (
src/infrastructure/core/events/registry.py
):- Contains functions like
register_customer_event_handlers(...)
andregister_order_event_handlers(...)
. - These functions now accept both
domain_event_dispatcher
andintegration_event_dispatcher
. - Internal handlers (reacting to
DomainEvent
within the same BC) are registered with thedomain_event_dispatcher
. - Integration handlers (reacting to public
EventContractVx
from other BCs) are registered with theintegration_event_dispatcher
.
# src/infrastructure/core/events/registry.py (Conceptual) def register_customer_event_handlers( domain_event_dispatcher: DomainEventDispatcher, integration_event_dispatcher: IntegrationEventDispatcher, container: dict ): # Handler for internal CustomerRegisteredEvent domain_event_dispatcher.register_handler( InternalCustomerRegisteredEvent, CustomerInternalHandler(container.get("...")) ) # Handler for public OrderPlacedEventContractV1 from Orders BC integration_event_dispatcher.register_handler( OrderPlacedEventContractV1, CustomerReactsToOrderHandler(container.get("...")) )
- Contains functions like
-
Application Startup (
src/main.py
): During FastAPI lifespan setup:- The two dispatchers are created using functions from
bootstrap.py
. - All handlers are registered using
register_all_event_handlers
frombootstrap.py
. - The dispatchers (and relevant publishers) are stored (e.g., in
app.state
) to be accessible via API dependencies.
- The two dispatchers are created using functions from
To facilitate clean, decoupled communication between bounded contexts (even within the monolith, preparing for microservices), a formal system for public integration events is established.
- Location:
src/integration_contracts/events/
(e.g.,order_events.py
,customer_events.py
) - Definition: These are Pydantic models (e.g.,
OrderPlacedEventContractV1
) that define the schema for events shared across bounded context boundaries. They act as Data Transfer Objects (DTOs). - Purpose: Provide a stable, explicit contract that consuming domains can rely on, avoiding direct dependencies on internal domain event classes from other contexts.
# src/integration_contracts/events/order_events.py
from pydantic import BaseModel, UUID
from datetime import datetime
class OrderItemContract(BaseModel):
product_id: UUID
quantity: int
unit_price: float
class OrderPlacedEventContractV1(BaseModel):
event_id: UUID
event_type: str = "OrderPlacedEvent.v1"
occurred_on: datetime
order_id: UUID
customer_id: UUID
items: List[OrderItemContract]
total_price: float
- Location: Within the infrastructure layer of the publishing domain (e.g.,
src/infrastructure/orders/event_mapping/mappers.py
). - Responsibility: Functions that take an internal domain event (e.g.,
InternalOrderPlacedEvent
) and transform it into its corresponding public event contract DTO (e.g.,OrderPlacedEventContractV1
).
# src/infrastructure/orders/event_mapping/mappers.py
from domain.orders.events.OrderPlacedEvent import OrderPlacedEvent as InternalOrderPlacedEvent
from integration_contracts.events.order_events import OrderPlacedEventContractV1, OrderItemContract
def map_internal_order_placed_to_v1_contract(event: InternalOrderPlacedEvent) -> OrderPlacedEventContractV1:
return OrderPlacedEventContractV1(
event_id=event.id, # Assuming internal event has an id
occurred_on=event.occurred_on,
order_id=event.aggregate_id,
customer_id=event.customer_id,
items=[OrderItemContract(**item.model_dump()) for item in event.items], # Or item.dict()
total_price=event.total_price
)
- Location: Within the infrastructure layer of the publishing domain (e.g.,
src/infrastructure/orders/event_publishing/order_integration_event_publisher.py
). - Responsibility:
- Takes a specific internal domain event from its command handler.
- Uses a registered mapper function to convert the internal event to its public contract DTO.
- Uses the
IntegrationEventDispatcher
to "broadcast" this public contract DTO to any interested subscribers in other bounded contexts.
- Invocation: Command Handlers explicitly call the
publish(internal_event)
method of these publishers for internal events that need to be made public.
# src/infrastructure/orders/event_publishing/order_integration_event_publisher.py
class OrderIntegrationEventPublisher:
def __init__(self, integration_event_dispatcher: IntegrationEventDispatcher):
self._integration_event_dispatcher = integration_event_dispatcher
self._mappers = {
InternalOrderPlacedEvent: map_internal_order_placed_to_v1_contract,
# ... other mappers
}
async def publish(self, internal_event: DomainEvent):
mapper = self._mappers.get(type(internal_event))
if mapper:
public_contract = mapper(internal_event)
if public_contract:
await self._integration_event_dispatcher.dispatch(public_contract)
- Origin: An action in
Domain A
(e.g., Order placed) generates anInternalDomainEventA
. - Internal Dispatch:
CommandHandlerA
dispatchesInternalDomainEventA
viaDomainEventDispatcher
for listeners within Domain A. - Explicit Publish Decision:
CommandHandlerA
decidesInternalDomainEventA
needs to be public and callsDomainAIntegrationEventPublisher.publish(InternalDomainEventA)
. - Mapping:
DomainAIntegrationEventPublisher
looks up a mapper forInternalDomainEventA
, converts it toPublicEventContractA_V1
. - Public Dispatch:
DomainAIntegrationEventPublisher
usesIntegrationEventDispatcher
to dispatchPublicEventContractA_V1
. - Subscription & Handling:
DomainBIntegrationHandler
(inDomain B
), which is registered with theIntegrationEventDispatcher
to listen forPublicEventContractA_V1
, receives the contract. - Action:
DomainBIntegrationHandler
processes the public contract and performs actions inDomain B
.
This approach ensures that Domain B
only depends on the stable PublicEventContractA_V1
from src/integration_contracts
, not on the internal details or event classes of Domain A
.
- An
EventStoreHandler
is registered with theDomainEventDispatcher
(typically for the baseDomainEvent
type). - Its responsibility is to persist all internal domain events to a dedicated store (e.g., could be a separate table/collection). This provides an audit trail and can be used for debugging or replaying events if needed (though full event sourcing for state reconstruction is not the primary focus here).
- The current
EventStore
is a simple in-memory store for demonstration but can be replaced with a persistent implementation.
src/
├── api/ # FastAPI routers, request/response models, dependencies
│ ├── customers/
│ └── orders/
│ └── dependencies.py
├── application/ # Application services (Command/Query Handlers)
│ ├── customers/
│ │ ├── commands/
│ │ └── queries/
│ └── orders/
│ ├── commands/ # e.g., PlaceOrderHandler.py
│ └── queries/
├── domain/ # Core business logic, independent of infrastructure
│ ├── core/ # Shared domain concepts (AggregateRoot, DomainEvent, etc.)
│ │ └── events/ # DomainEvent.py, DomainEventDispatcher.py
│ ├── customers/ # Customer Bounded Context
│ │ ├── events/ # e.g., CustomerRegisteredEvent.py (Internal)
│ │ ├── models/ # Customer Aggregate, Entities
│ │ └── repositories/ # Interfaces for customer repositories (IWrite, IRead)
│ │ └── events/
│ │ └── handlers/ # Handlers for events WITHIN Customer BC
│ │ └── integration/ # Handlers for events FROM OTHER BCs (consuming public contracts)
│ └── orders/ # Order Bounded Context
│ ├── events/ # e.g., OrderPlacedEvent.py (Internal)
│ ├── models/ # Order Aggregate, Entities
│ └── repositories/ # Interfaces for order repositories
│ └── events/
│ └── handlers/ # Handlers for events WITHIN Order BC
├── infrastructure/ # Implementation of external concerns
│ ├── core/ # Shared infrastructure (settings, DB base, core event components)
│ │ ├── events/ # Event bootstrap, registry, IntegrationEventDispatcher
│ │ │ ├── bootstrap.py
│ │ │ ├── integration_event_dispatcher.py
│ │ │ └── registry.py
│ │ ├── persistence/
│ │ └── settings.py
│ ├── customers/ # Customer BC specific infrastructure
│ │ ├── persistence/ # SQLAlchemy/MongoDB repositories
│ │ └── services/ # e.g., EmailService, AuditService
│ └── orders/ # Order BC specific infrastructure
│ ├── event_mapping/ # Mappers from internal Order events to public contracts
│ │ └── mappers.py
│ ├── event_publishing/ # OrderIntegrationEventPublisher
│ │ └── order_integration_event_publisher.py
│ └── persistence/ # SQLAlchemy/MongoDB repositories
├── integration_contracts/ # Public event contracts (Pydantic DTOs) for cross-BC communication
│ └── events/
│ ├── common_base.py # (Optional) Base for public contracts
│ ├── order_events.py # e.g., OrderPlacedEventContractV1
│ └── customer_events.py # e.g., CustomerRegisteredEventContractV1
├── tests/ # Unit and integration tests
└── main.py # FastAPI application entry point & lifespan management
(The rest of the README content like setup, running, testing instructions can follow)