You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hello, and how are you doing? I hope this message finds you doing well!
I’ve a question for you concerning you’re (awesome!) CQRS implementation…
It has to do with setting up a “Message Queue” for incoming commands from a Client.
There are many ideas that suggest that a message queue can be used to publish Commands to the Event Store.
My application demands that a lot of clients will be issuing a lot of commands and a message queue of some kind is a highly recommended strategy for Distributed Services (as I’m sure you’re well aware of… )
The above diagram depicts the current implementation of my Message Queue (using NetMq.ReactiveExtensions); and, I must say: it was dead-easy to do, too.
Web API Controller // Create Securities. else if ( secsDto.ActionType.Equals( DbActionType.Create ) ) { var cmd = new CreateSecuritiesCommand( id: Guid.NewGuid() , securitiesReport: secsDto.SecuritiesReport ); var json = JsonConvert.SerializeObject( cmd ); m_Publisher.Publish( json ); await Task.Delay( 1000 ); }
Publisher public void Publish( string message ) { // Publish the command on the Message Bus. var cmd = JsonConvert.DeserializeObject<CreateSecuritiesCommand>( message ); var publisher = new PublisherNetMq<string>( m_EndPoint , loggerDelegate: msg => WriteLine( msg ) ); publisher.OnNext( message ); WriteLine( $"Published: {message}" ); }
Subscriber public void Subscribe() { subject.Subscribe( ( m ) => { // Valid JSON object. if ( string.IsNullOrEmpty( m ).Equals( false ) ) { var cmd = JsonConvert.DeserializeObject<CreateSecuritiesCommand>( m ); if ( m_Bus != null && cmd != null ) { // Publish on Command Bus. WriteLine( $"Received: {cmd.Id} , {cmd.SecuritiesReport.Id} , {cmd.SecuritiesReport.Ticker}" ); m_Bus.Publish( cmd ); m_Bus.Commit(); } } } ); }
The above is my simple demonstration and really works quite nicely.
I would prefer to use this implementation, as opposed to messing around with your beautifully constructed
Command Registration Action Method:
public Action<TCommand> CreatePublishActionWrappedInTransaction<TCommand, TCommandHandler>( TCommandHandler commandHandler , ref IContainer container ) { var trans = container?.GetInstance<TransactionHandler<TCommand , TCommandHandler>>(); return ( ( c ) => { trans?.Execute( c , commandHandler ); // Move this somewhere else… }); }
… and performing the Publish here…. And then in the Subscriber, actually performing the Commit…. Just seems like I’d have to re-wire a lot of your existing code (which, as I mentioned I don’t want to do because I like the way you’ve done Configuration).
Again, Mark thank you for all you’re wonder work on this project.
Mark:
Hello, and how are you doing? I hope this message finds you doing well!
I’ve a question for you concerning you’re (awesome!) CQRS implementation…
It has to do with setting up a “Message Queue” for incoming commands from a Client.
There are many ideas that suggest that a message queue can be used to publish Commands to the Event Store.
My application demands that a lot of clients will be issuing a lot of commands and a message queue of some kind is a highly recommended strategy for Distributed Services (as I’m sure you’re well aware of… )
The above diagram depicts the current implementation of my Message Queue (using NetMq.ReactiveExtensions); and, I must say: it was dead-easy to do, too.
Web API Controller
// Create Securities. else if ( secsDto.ActionType.Equals( DbActionType.Create ) ) { var cmd = new CreateSecuritiesCommand( id: Guid.NewGuid() , securitiesReport: secsDto.SecuritiesReport ); var json = JsonConvert.SerializeObject( cmd ); m_Publisher.Publish( json ); await Task.Delay( 1000 ); }
Publisher
public void Publish( string message ) { // Publish the command on the Message Bus. var cmd = JsonConvert.DeserializeObject<CreateSecuritiesCommand>( message ); var publisher = new PublisherNetMq<string>( m_EndPoint , loggerDelegate: msg => WriteLine( msg ) ); publisher.OnNext( message ); WriteLine( $"Published: {message}" ); }
Subscriber
public void Subscribe() { subject.Subscribe( ( m ) => { // Valid JSON object. if ( string.IsNullOrEmpty( m ).Equals( false ) ) { var cmd = JsonConvert.DeserializeObject<CreateSecuritiesCommand>( m ); if ( m_Bus != null && cmd != null ) { // Publish on Command Bus. WriteLine( $"Received: {cmd.Id} , {cmd.SecuritiesReport.Id} , {cmd.SecuritiesReport.Ticker}" ); m_Bus.Publish( cmd ); m_Bus.Commit(); } } } ); }
The above is my simple demonstration and really works quite nicely.
I would prefer to use this implementation, as opposed to messing around with your beautifully constructed
Command Registration Action Method:
public Action<TCommand> CreatePublishActionWrappedInTransaction<TCommand, TCommandHandler>( TCommandHandler commandHandler , ref IContainer container ) { var trans = container?.GetInstance<TransactionHandler<TCommand , TCommandHandler>>(); return ( ( c ) => { trans?.Execute( c , commandHandler ); // Move this somewhere else… }); }
… and performing the Publish here…. And then in the Subscriber, actually performing the Commit…. Just seems like I’d have to re-wire a lot of your existing code (which, as I mentioned I don’t want to do because I like the way you’ve done Configuration).
Again, Mark thank you for all you’re wonder work on this project.
Please Advise,
Robert Hyland
President & CEO,
Hyland Computer Systems, L.L.C
E: rhyland@hylandcomputer.systems
The text was updated successfully, but these errors were encountered: