In [1]:
from morpheus.modflow.types.Settings import Metadata, Name, Description, Tags
from morpheus.modflow.types.User import UserId
from morpheus.modflow.types.Project import Project, ProjectId
from morpheus.modflow.domain.events.ProjectCreatedEvent import ProjectCreatedEvent, ProjectCreatedEventPayload

user_id = UserId.new()
project_id = ProjectId.new()

project = Project.new(user_id=user_id, project_id=project_id)
settings = project.settings
project = project.with_updated_settings(
  settings.with_updated_metadata(Metadata(name=Name.from_str('Test'), description=Description.from_str('Test project'), tags=Tags.from_list([])))
)

event = ProjectCreatedEvent.from_payload(
  ProjectCreatedEventPayload(project_id=project_id, project_name=project.settings.metadata.name)
)
  
event

ProjectCreatedEvent(payload=ProjectCreatedEventPayload(project_id=ProjectId(value='cdfb8191-e93e-41b7-ba63-640ee33fa7ee'), project_name=Name(value='Test')))

In [2]:
from morpheus.common.types.event_sourcing.EventMetadata import EventMetadata
from morpheus.common.types import DateTime
from morpheus.common.types.event_sourcing.EventEnvelope import EventEnvelope

envelope = EventEnvelope(event, DateTime.now(), EventMetadata.from_dict({'test': 123}))
envelope

EventEnvelope(event=ProjectCreatedEvent(payload=ProjectCreatedEventPayload(project_id=ProjectId(value='cdfb8191-e93e-41b7-ba63-640ee33fa7ee'), project_name=Name(value='Test'))), occurred_at=DateTime(value=datetime.datetime(2024, 1, 23, 15, 41, 56, 707110)), metadata=EventMetadata(obj={'test': 123}))

# Listeners must have arguments "event" and "metadata" with proper types

In [3]:
from morpheus.common.infrastructure.event_sourcing.EventPublisher import EventPublisher, listen_to

class Invalid:
  pass

class ProjectorInvalid0:
    @listen_to(ProjectCreatedEvent)
    def on_project_created(self):
      print('Created project')

class ProjectorInvalid1:
    @listen_to(ProjectCreatedEvent)
    def on_project_created(self, invalid: ProjectCreatedEvent, metadata: EventMetadata):
      print('Created project')

class ProjectorInvalid2:
    @listen_to(ProjectCreatedEvent)
    def on_project_created(self, event: ProjectCreatedEvent, invalid: EventMetadata):
      print('Created project')

class ProjectorInvalid3:
    @listen_to(ProjectCreatedEvent)
    def on_project_created(self, event: Invalid, metadata: EventMetadata):
      print('Created project')
      
class ProjectorInvalid4:
    @listen_to(ProjectCreatedEvent)
    def on_project_created(self, event: ProjectCreatedEvent, metadata: Invalid):
      print('Created project')


event_publisher = EventPublisher()
try:
  projector = ProjectorInvalid0()
  event_publisher.register(projector)
except Exception as e:
  print(e)
  
try:
  projector = ProjectorInvalid1()
  event_publisher.register(projector)
except Exception as e:
  print(e)
  
try:
  projector = ProjectorInvalid2()
  event_publisher.register(projector)
except Exception as e:
  print(e)
  
try:
  projector = ProjectorInvalid3()
  event_publisher.register(projector)
except Exception as e:
  print(e)
  
try:
  projector = ProjectorInvalid4()
  event_publisher.register(projector)
except Exception as e:
  print(e)

Event listener <bound method ProjectorInvalid0.on_project_created of <__main__.ProjectorInvalid0 object at 0x7fc226458b30>> must have an argument called "event" that should by type hinted with a subclass of EventBase
Event listener <bound method ProjectorInvalid1.on_project_created of <__main__.ProjectorInvalid1 object at 0x7fc255a5be60>> must have an argument called "event" that should by type hinted with a subclass of EventBase
Event listener <bound method ProjectorInvalid2.on_project_created of <__main__.ProjectorInvalid2 object at 0x7fc226458b30>> must have an argument called "metadata" that should by type hinted with a subclass of EventMetadata
Event listener <bound method ProjectorInvalid3.on_project_created of <__main__.ProjectorInvalid3 object at 0x7fc255a5be60>> must have an argument called "event" that should by type hinted with a subclass of EventBase
Event listener <bound method ProjectorInvalid4.on_project_created of <__main__.ProjectorInvalid4 object at 0x7fc226458b30>> m

# Cannot register same listener twice

In [4]:
class Projector:
    @listen_to(ProjectCreatedEvent)
    def on_project_created(self, event: ProjectCreatedEvent, metadata: EventMetadata):
      print(f'Created project {event.get_payload().project_id} with metadata {metadata}')
   
try:
  event_publisher = EventPublisher()
  projector = Projector()
  event_publisher.register(projector)
  event_publisher.register(projector)
except Exception as e:
  print(e)

Event handler already registered


In [5]:
from morpheus.modflow.infrastructure.event_sourcing.ModflowEventBus import modflow_event_bus

modflow_event_bus.record(envelope) 