forked from pgorecki/python-ddd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
composition_root.py
74 lines (55 loc) · 3.07 KB
/
composition_root.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import dependency_injector.containers as containers
import dependency_injector.providers as providers
from application.command_bus import CommandBus
from application.command_handlers import AddItemCommandHandler
from application.query_bus import QueryBus
from application.query_handlers import GetItemsQueryHandler
from application.services import IdentityHashingService
from infrastructure.framework.falcon.controllers import (InfoController,
ItemsController)
from infrastructure.repositories.auction_items_repository import AuctionItemsRepository
from infrastructure.repositories.users_repository import InMemoryUsersRepository
from infrastructure.framework.falcon.authentication import BasicAuthenticationService
class ObjectiveCommandHandler():
def __init__(self, logger):
self.logger = logger
def handle(self, command):
print('objective handler is handling', command, self.logger)
def functional_handler(logger):
def handle(command):
print('functional handler is handling', command, logger)
return handle
class BaseContainer(containers.DeclarativeContainer):
hashing_service_factory = providers.Singleton(IdentityHashingService)
authentication_service_factory = providers.Factory(BasicAuthenticationService,
users_repository=providers.Factory(InMemoryUsersRepository, hashing_service=hashing_service_factory)
)
class CommandBusContainer(containers.DeclarativeContainer):
items_repository = providers.Singleton(AuctionItemsRepository)
command_handler_factory = providers.FactoryAggregate(
AddItemCommand=providers.Factory(AddItemCommandHandler,
items_repository=items_repository
)
)
command_bus_factory = providers.Factory(
CommandBus,
command_handler_factory=providers.DelegatedFactory(
command_handler_factory)
)
class QueryBusContainer(containers.DeclarativeContainer):
items_repository = providers.Singleton(AuctionItemsRepository)
query_handler_factory = providers.FactoryAggregate(
GetItemsQuery=providers.Factory(
GetItemsQueryHandler, items_repository=items_repository)
)
query_bus_factory = providers.Factory(
QueryBus, query_handler_factory=providers.DelegatedFactory(query_handler_factory))
class FalconContainer(containers.DeclarativeContainer):
items_controller_factory = providers.Factory(ItemsController,
command_bus=CommandBusContainer.command_bus_factory,
query_bus=QueryBusContainer.query_bus_factory,
authentication_service=BaseContainer.authentication_service_factory,
)
info_controller_factory = providers.Factory(InfoController,
authentication_service=BaseContainer.authentication_service_factory
)