Skip to content
logicbomb edited this page Jun 19, 2011 · 2 revisions

Summary

Euclid's Dispatch system ensures that messages are processed. The dispatcher is also responsible for updating the [Registry] (https://github.com/smhinsey/Euclid/wiki/Registry) about the state of the message.

Location

Defined in the Euclid.Common project with a default implementation provided in Euclid.Common.Transport project.

Configuration

An object type of [MessageDispatcherSettings] (https://github.com/smhinsey/Euclid/blob/master/src/Euclid.Common.Transport/MessageDispatcherSettings.cs)

note each setting is implemented as an [OverridableSetting] (https://github.com/smhinsey/Euclid/blob/master/src/Euclid.Common/Configuration/IOverridableSetting.cs)

Usage

Euclid provides an abstract class [MultitaskingMessageDispatcher] (https://github.com/smhinsey/Euclid/blob/master/src/Euclid.Common.Transport/MultitaskingMessageDispatcher.cs) so that composite app developers can create an appropriate dispatcher.

The following example of configuring and starting a dispatcher is based on the [unit tests] (https://github.com/smhinsey/Euclid/blob/master/tests/Euclid.Common.UnitTests/Transport/MessageDispatcherTests.cs).

IServiceLocator is defined in the [Common Service Locator library] (http://commonservicelocator.codeplex.com/) which allows composite application developers to use the dependency injection container of their choice

Adapters have been written to allow common dependency injection frameworks to be used with the Common Service Locator library, this example uses the WindsorServiceLocator adapter to wrap the IWindsorContainer object.

A number of adapaters are packaged for NuGet and can be found by searching online for 'CommonService'

namespace Euclid.Common.TestingFakes.Transport
{
	public class FakeDispatcher : MultitaskingMessageDispatcher<FakeRegistry>
	{
		public FakeDispatcher(/*see note*/IServiceLocator container, FakeRegistry registry) : base(container, registry)
		{
		}
	}
}

namespace Euclid.Common.UnitTests.Transport
{
    public class MessageDispatcherTests
    {
        private FakeDispatcher _dispatcher;
        private InMemoryMessageTransport _transport = new InMemoryTransport();
        private FakeRegistry _registry = new FakeRegistry(new InMemoryRecordRepository<FakeRecord>(), new InMemoryBlobStorage(), new JsonMessageSerializer());;
        
        public void OutlineOfSteps()
        {
                var myPreferredDependencyInjectionContainer = ConfigureContainer();

                ConfigureDispatcher(myPreferredDependencyInjectionContainer);

                StartDispatchingMessages();

                //application runs

                StopDispatchingMessages();
        }

        public IWindsorContainer ConfigureContainer()
        {
                var processor = new FakeMessageProcessor();
                container.Register(
                                Component
                                        .For<FakeMessageProcessor>()
                                        .Instance(processor));

                return container;
        }

        public void ConfigureDispatcher(IWindsorContainer container)
        {
            /* see note */
            var locator = new WindsorServiceLocator(container);

            _dispatcher = new FakeDispatcher(locator, _registry);

            //typically dispatcher configuration would happen in a composite application's startup routine
            var settings = new MessageDispatcherSettings();
            settings.InputTransport.WithDefault(_transport);
            settings.MessageProcessorTypes.WithDefault(new List<Type> { typeof(FakeMessageProcessor) });
            settings.DurationOfDispatchingSlice.WithDefault(new TimeSpan(0, 0, 0, 0, 200));
            settings.NumberOfMessagesToDispatchPerSlice.WithDefault(30);

            _dispatcher.Configure(settings);
        }

        public void StartDispatcher()
        {
                _transport.Open();
                
                _dispatcher.Enable();

                /*
                1. other processes send messages on the transport which are received by the dispatcher 
                2. the dispatcher is responsible for calling the appropriate MessageProcessor as it reads messages off the transport
                3. the dispatcher updates the registry for handled messages (the Completed/Error/UnableToDispatch fields are set)
                */
        }

        public void StopDispatcher()
        {
                _dispatcher.Disable();

                _transport.Close();
        }
}

Creating a new dispatcher: