diff --git a/ReactiveXComponent/Common/Header.cs b/ReactiveXComponent/Common/Header.cs index f67e880..36d017c 100644 --- a/ReactiveXComponent/Common/Header.cs +++ b/ReactiveXComponent/Common/Header.cs @@ -1,3 +1,5 @@ +using System; + namespace ReactiveXComponent.Common { public class Header @@ -17,5 +19,7 @@ public class Header public string PublishTopic { get; set; } public string ErrorMessage { get; set; } + + public string MessageId { get; set; } = Guid.NewGuid().ToString(); } } diff --git a/ReactiveXComponent/Common/HeaderElement.cs b/ReactiveXComponent/Common/HeaderElement.cs index cc0e82a..4c3393b 100644 --- a/ReactiveXComponent/Common/HeaderElement.cs +++ b/ReactiveXComponent/Common/HeaderElement.cs @@ -13,5 +13,7 @@ public static class HeaderElement public const string SessionData = "SessionData"; public const string EventType = "EventType"; public const string ErrorMessage = "ErrorMessage"; + public const string MessageId = "MessageId"; + public const string WorkerId = "WorkerId"; } } diff --git a/ReactiveXComponent/Common/StateMachineRefHeader.cs b/ReactiveXComponent/Common/StateMachineRefHeader.cs index b39bb8d..4de2084 100644 --- a/ReactiveXComponent/Common/StateMachineRefHeader.cs +++ b/ReactiveXComponent/Common/StateMachineRefHeader.cs @@ -1,37 +1,10 @@  +using System; + namespace ReactiveXComponent.Common { public class StateMachineRefHeader { - protected bool Equals(StateMachineRefHeader other) - { - return StateMachineId == other.StateMachineId && StateCode == other.StateCode && StateMachineCode == other.StateMachineCode && ComponentCode == other.ComponentCode && string.Equals(MessageType, other.MessageType) && string.Equals(PrivateTopic, other.PrivateTopic) && string.Equals(SessionData, other.SessionData) && string.Equals(ErrorMessage, other.ErrorMessage); - } - - public override bool Equals(object obj) - { - if (ReferenceEquals(null, obj)) return false; - if (ReferenceEquals(this, obj)) return true; - if (obj.GetType() != this.GetType()) return false; - return Equals((StateMachineRefHeader) obj); - } - - public override int GetHashCode() - { - unchecked - { - var hashCode = (StateMachineId != null ? StateMachineId.GetHashCode() : 0); - hashCode = (hashCode * 397) ^ StateCode; - hashCode = (hashCode * 397) ^ StateMachineCode; - hashCode = (hashCode * 397) ^ ComponentCode; - hashCode = (hashCode * 397) ^ (MessageType != null ? MessageType.GetHashCode() : 0); - hashCode = (hashCode * 397) ^ (PrivateTopic != null ? PrivateTopic.GetHashCode() : 0); - hashCode = (hashCode * 397) ^ (SessionData != null ? SessionData.GetHashCode() : 0); - hashCode = (hashCode * 397) ^ (ErrorMessage != null ? ErrorMessage.GetHashCode() : 0); - return hashCode; - } - } - public string StateMachineId { get; set; } public int StateCode { get; set; } @@ -47,5 +20,9 @@ public override int GetHashCode() public string SessionData { get; set; } public string ErrorMessage { get; set; } + + public string MessageId { get; set; } = Guid.NewGuid().ToString(); + + public int WorkerId { get; set; } } } diff --git a/ReactiveXComponent/Configuration/BusDetails.cs b/ReactiveXComponent/Configuration/BusDetails.cs index 34c33fb..a4c5ca4 100644 --- a/ReactiveXComponent/Configuration/BusDetails.cs +++ b/ReactiveXComponent/Configuration/BusDetails.cs @@ -8,12 +8,14 @@ public BusDetails() } - public BusDetails(string username, string password, string host, int port) + public BusDetails(string username, string password, string host, string virtualHost, int port) { Username = username; Password = password; Host = host; + VirtualHost = virtualHost; Port = port; + } public string Username { get; set; } @@ -22,14 +24,17 @@ public BusDetails(string username, string password, string host, int port) public string Host { get; set; } + public string VirtualHost { get; set; } + public int Port { get; set; } public BusDetails Clone() { return new BusDetails( - (string)Username?.Clone(), - (string)Password?.Clone(), - (string)Host?.Clone(), + Username, + Password, + Host, + VirtualHost, Port); } } diff --git a/ReactiveXComponent/Configuration/ConfigurationOverrides.cs b/ReactiveXComponent/Configuration/ConfigurationOverrides.cs index af044ec..33782ab 100644 --- a/ReactiveXComponent/Configuration/ConfigurationOverrides.cs +++ b/ReactiveXComponent/Configuration/ConfigurationOverrides.cs @@ -6,6 +6,8 @@ public class ConfigurationOverrides { public string Host { get; set; } + public string VirtualHost { get; set; } + public string Port { get; set; } public string Username { get; set; } diff --git a/ReactiveXComponent/Parser/XCApiConfigParser.cs b/ReactiveXComponent/Parser/XCApiConfigParser.cs index a8e0dfc..2059f76 100644 --- a/ReactiveXComponent/Parser/XCApiConfigParser.cs +++ b/ReactiveXComponent/Parser/XCApiConfigParser.cs @@ -186,6 +186,7 @@ public BusDetails GetBusDetails() busInfos?.Attribute("user")?.Value, busInfos?.Attribute("password")?.Value, busInfos?.Attribute("host")?.Value, + busInfos?.Attribute("virtualHost")?.Value, Convert.ToInt32(busInfos?.Attribute("port")?.Value)); return busDetails; diff --git a/ReactiveXComponent/RabbitMq/RabbitMqConnection.cs b/ReactiveXComponent/RabbitMq/RabbitMqConnection.cs index 2d4c0bc..a3db190 100644 --- a/ReactiveXComponent/RabbitMq/RabbitMqConnection.cs +++ b/ReactiveXComponent/RabbitMq/RabbitMqConnection.cs @@ -30,6 +30,11 @@ public IXCSession CreateSession(ConfigurationOverrides configurationOverrides = busDetails.Host = configurationOverrides.Host; } + if (!string.IsNullOrEmpty(configurationOverrides.VirtualHost)) + { + busDetails.VirtualHost = configurationOverrides.VirtualHost; + } + if (configurationOverrides.Port != null) { busDetails.Port = int.Parse(configurationOverrides.Port); diff --git a/ReactiveXComponent/RabbitMq/RabbitMqHeaderConverter.cs b/ReactiveXComponent/RabbitMq/RabbitMqHeaderConverter.cs index d592bb2..b22f146 100644 --- a/ReactiveXComponent/RabbitMq/RabbitMqHeaderConverter.cs +++ b/ReactiveXComponent/RabbitMq/RabbitMqHeaderConverter.cs @@ -21,7 +21,10 @@ public static Dictionary ConvertHeader(Header header) {HeaderElement.IncomingEventType, header?.IncomingEventType}, {HeaderElement.PublishTopic, header?.PublishTopic != null ? encoding.GetBytes(header.PublishTopic) : encoding.GetBytes(string.Empty) }, {HeaderElement.MessageType, header?.MessageType != null ? encoding.GetBytes(header.MessageType) : encoding.GetBytes(string.Empty)}, - {HeaderElement.ErrorMessage, header?.ErrorMessage != null ? encoding.GetBytes(header.ErrorMessage) : encoding.GetBytes(string.Empty)} + {HeaderElement.ErrorMessage, header?.ErrorMessage != null ? encoding.GetBytes(header.ErrorMessage) : encoding.GetBytes(string.Empty)}, + {HeaderElement.MessageId, header?.MessageId != null ? encoding.GetBytes(header.MessageId) : encoding.GetBytes(string.Empty) }, + + }; return dico; } @@ -40,7 +43,10 @@ public static Dictionary CreateHeaderFromStateMachineRefHeader(S {HeaderElement.MessageType, stateMachineRefHeader?.MessageType != null ? encoding.GetBytes(stateMachineRefHeader.MessageType) : encoding.GetBytes(string.Empty)}, {HeaderElement.EventType, eventCode}, {HeaderElement.IncomingEventType, (int)incomingEventType}, - {HeaderElement.ErrorMessage, stateMachineRefHeader?.ErrorMessage != null ? encoding.GetBytes(stateMachineRefHeader.ErrorMessage) : encoding.GetBytes(string.Empty)} + {HeaderElement.ErrorMessage, stateMachineRefHeader?.ErrorMessage != null ? encoding.GetBytes(stateMachineRefHeader.ErrorMessage) : encoding.GetBytes(string.Empty)}, + {HeaderElement.MessageId, stateMachineRefHeader?.MessageId != null ? encoding.GetBytes(stateMachineRefHeader.MessageId) : encoding.GetBytes(string.Empty) }, + {HeaderElement.WorkerId, stateMachineRefHeader?.WorkerId}, + }; return dico; } @@ -56,6 +62,8 @@ public static StateMachineRefHeader ConvertStateMachineRefHeader(IDictionary GetSnapshot(string stateMachine, int? chunkSize = StateMachineId = item.StateMachineId, StateMachineCode = item.StateMachineCode, ComponentCode = item.ComponentCode, - StateCode = item.StateCode + StateCode = item.StateCode, + WorkerId = item.WorkerId, }, item.PublicMember, _serializationType)).ToList(); diff --git a/ReactiveXComponent/RabbitMq/RabbitMqSubscriber.cs b/ReactiveXComponent/RabbitMq/RabbitMqSubscriber.cs index 0385b64..b501fe8 100644 --- a/ReactiveXComponent/RabbitMq/RabbitMqSubscriber.cs +++ b/ReactiveXComponent/RabbitMq/RabbitMqSubscriber.cs @@ -1,246 +1,246 @@ -using System; -using System.Collections.Concurrent; -using System.IO; -using System.Linq; -using RabbitMQ.Client; -using RabbitMQ.Client.Events; -using System.Reactive.Linq; -using RabbitMQ.Client.Exceptions; -using ReactiveXComponent.Common; -using ReactiveXComponent.Connection; -using ReactiveXComponent.Configuration; -using ReactiveXComponent.Serializer; - -namespace ReactiveXComponent.RabbitMq -{ - public class RabbitMqSubscriber : IXCSubscriber - { - private readonly IConnection _connection; - private readonly IXCConfiguration _xcConfiguration; - private readonly string _privateCommunicationIdentifier; +using System; +using System.Collections.Concurrent; +using System.IO; +using System.Linq; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using System.Reactive.Linq; +using RabbitMQ.Client.Exceptions; +using ReactiveXComponent.Common; +using ReactiveXComponent.Connection; +using ReactiveXComponent.Configuration; +using ReactiveXComponent.Serializer; + +namespace ReactiveXComponent.RabbitMq +{ + public class RabbitMqSubscriber : IXCSubscriber + { + private readonly IConnection _connection; + private readonly IXCConfiguration _xcConfiguration; + private readonly string _privateCommunicationIdentifier; private readonly ISerializer _serializer; - private readonly string _component; - private SerializationType _serializationType; - - private readonly ConcurrentDictionary _subscribersDico; + private readonly string _component; + private SerializationType _serializationType; + + private readonly ConcurrentDictionary _subscribersDico; private readonly ConcurrentDictionary _streamSubscriptionsDico; - - private event EventHandler MessageReceived; - private event EventHandler ConnectionFailed; - - public RabbitMqSubscriber(string component, IXCConfiguration xcConfiguration, IConnection connection, ISerializer serializer, string privateCommunicationIdentifier = null) - { - _component = component; - _xcConfiguration = xcConfiguration; - _connection = connection; - _subscribersDico = new ConcurrentDictionary(); + + private event EventHandler MessageReceived; + private event EventHandler ConnectionFailed; + + public RabbitMqSubscriber(string component, IXCConfiguration xcConfiguration, IConnection connection, ISerializer serializer, string privateCommunicationIdentifier = null) + { + _component = component; + _xcConfiguration = xcConfiguration; + _connection = connection; + _subscribersDico = new ConcurrentDictionary(); _streamSubscriptionsDico = new ConcurrentDictionary(); - _privateCommunicationIdentifier = privateCommunicationIdentifier; - _serializer = serializer; - InitSerializationType(); - InitObservableCollection(); + _privateCommunicationIdentifier = privateCommunicationIdentifier; + _serializer = serializer; + InitSerializationType(); + InitObservableCollection(); } - #region IXCSubscriber implementation - - public IObservable StateMachineUpdatesStream { get; private set; } - - public void Subscribe(string stateMachine, Action stateMachineListener) - { - if (stateMachineListener == null) - { - return; - } - - var componentCode = _xcConfiguration.GetComponentCode(_component); - var stateMachineCode = _xcConfiguration.GetStateMachineCode(_component, stateMachine); - var routingKey = string.IsNullOrEmpty(_privateCommunicationIdentifier) ? _xcConfiguration.GetSubscriberTopic(_component, stateMachine) : _privateCommunicationIdentifier; + #region IXCSubscriber implementation + + public IObservable StateMachineUpdatesStream { get; private set; } + + public void Subscribe(string stateMachine, Action stateMachineListener) + { + if (stateMachineListener == null) + { + return; + } + + var componentCode = _xcConfiguration.GetComponentCode(_component); + var stateMachineCode = _xcConfiguration.GetStateMachineCode(_component, stateMachine); + var routingKey = string.IsNullOrEmpty(_privateCommunicationIdentifier) ? _xcConfiguration.GetSubscriberTopic(_component, stateMachine) : _privateCommunicationIdentifier; var subscriptionKey = new SubscriptionKey(componentCode, stateMachineCode, routingKey); var streamSusbcriptionKey = new StreamSubscriptionKey(subscriptionKey, stateMachineListener); - - if (!string.IsNullOrEmpty(_privateCommunicationIdentifier)) - { - InitSubscriber(stateMachine, _privateCommunicationIdentifier); - } - - if (!_streamSubscriptionsDico.ContainsKey(streamSusbcriptionKey)) - { - var handler = new Action(args => { - if (args.StateMachineRefHeader.StateMachineCode == stateMachineCode) - { - stateMachineListener(args); - } + + if (!string.IsNullOrEmpty(_privateCommunicationIdentifier)) + { + InitSubscriber(stateMachine, _privateCommunicationIdentifier); + } + + if (!_streamSubscriptionsDico.ContainsKey(streamSusbcriptionKey)) + { + var handler = new Action(args => { + if (args.StateMachineRefHeader.StateMachineCode == stateMachineCode) + { + stateMachineListener(args); + } }); var subscription = StateMachineUpdatesStream.Subscribe(handler); - _streamSubscriptionsDico.AddOrUpdate(streamSusbcriptionKey, subscription, (key, oldSubscription) => subscription); - } + _streamSubscriptionsDico.AddOrUpdate(streamSusbcriptionKey, subscription, (key, oldSubscription) => subscription); + } - InitSubscriber(stateMachine); - } - - public void Unsubscribe(string stateMachine, Action stateMachineListener) - { - var componentCode = _xcConfiguration.GetComponentCode(_component); + InitSubscriber(stateMachine); + } + + public void Unsubscribe(string stateMachine, Action stateMachineListener) + { + var componentCode = _xcConfiguration.GetComponentCode(_component); var stateMachineCode = _xcConfiguration.GetStateMachineCode(_component, stateMachine); - var publicRoutingKey = _xcConfiguration.GetSubscriberTopic(_component, stateMachine); + var publicRoutingKey = _xcConfiguration.GetSubscriberTopic(_component, stateMachine); var publicSubscriberKey = new SubscriptionKey(componentCode, stateMachineCode, publicRoutingKey); DeleteSubscription(publicSubscriberKey, stateMachineListener); if (!string.IsNullOrEmpty(_privateCommunicationIdentifier)) - { - var privateSubscriberKey = new SubscriptionKey(componentCode, stateMachineCode, _privateCommunicationIdentifier); + { + var privateSubscriberKey = new SubscriptionKey(componentCode, stateMachineCode, _privateCommunicationIdentifier); DeleteSubscription(privateSubscriberKey, stateMachineListener); - } - } - - #endregion - - private void InitObservableCollection() - { - StateMachineUpdatesStream = Observable.FromEvent, MessageEventArgs>( - handler => (sender, e) => handler(e), - h => MessageReceived += h, - h => MessageReceived -= h); - } - - private void OnMessageReceived(MessageEventArgs e) - { - MessageReceived?.Invoke(this, e); - } - - private void InitSubscriber(string stateMachine, string privateCommunicationIdentifier = null) - { - if (_xcConfiguration == null) - { - return; - } - - var exchangeName = _xcConfiguration.GetComponentCode(_component).ToString(); - var componentCode = _xcConfiguration.GetComponentCode(_component); - var stateMachineCode = _xcConfiguration.GetStateMachineCode(_component, stateMachine); - var routingKey = string.IsNullOrEmpty(privateCommunicationIdentifier) ? _xcConfiguration.GetSubscriberTopic(_component, stateMachine) : privateCommunicationIdentifier; - - try - { - var subscriptionKey = new SubscriptionKey(componentCode, stateMachineCode, routingKey); - if (!_subscribersDico.ContainsKey(subscriptionKey)) - { - bool createExchangeChannel = false; - RabbitMqSubscriberInfos rabbitMqSubscriberInfos = null; - - EventHandler handler = (o, basicAckEventArgs) => - { - var stateMachineRefHeader = RabbitMqHeaderConverter.ConvertStateMachineRefHeader(basicAckEventArgs.BasicProperties.Headers); - - if (stateMachineRefHeader.StateMachineCode == stateMachineCode) - { - rabbitMqSubscriberInfos.Channel?.BasicAck(basicAckEventArgs.DeliveryTag, false); - - var obj = _serializer.Deserialize(new MemoryStream(basicAckEventArgs.Body)); - - var msgEventArgs = new MessageEventArgs(stateMachineRefHeader, obj, _serializationType); - - OnMessageReceived(msgEventArgs); - } - }; - - if (!string.IsNullOrEmpty(privateCommunicationIdentifier)) - { - var privateSubscriptionKey = - _subscribersDico.Keys.FirstOrDefault( - k => k.ComponentCode == componentCode && k.RoutingKey == routingKey); - - createExchangeChannel = (privateSubscriptionKey == null); - - if (!createExchangeChannel) - { - rabbitMqSubscriberInfos = _subscribersDico[privateSubscriptionKey]; - } - } - else - { - createExchangeChannel = true; - } - - if (createExchangeChannel) - { - IModel channel; - EventingBasicConsumer subscriber; - - CreateExchangeChannel(exchangeName, routingKey, out channel, out subscriber); - - if (channel == null || subscriber == null) - { - return; - } - - rabbitMqSubscriberInfos = new RabbitMqSubscriberInfos(channel, subscriber, handler); - // Add the new element.. - _subscribersDico.AddOrUpdate(subscriptionKey, rabbitMqSubscriberInfos, - (key, oldValue) => rabbitMqSubscriberInfos); - } - else - { - // Update the existing subscription for that routing key to subscribe the new handler.. - rabbitMqSubscriberInfos.AddHandler(handler); - } - } - } - catch (OperationInterruptedException e) - { - throw new ReactiveXComponentException("Failed to init Subscriber: " + e.Message, e); - } - } - - private void InitSerializationType() - { - var serialization = _xcConfiguration.GetSerializationType(); - - switch (serialization) - { - case XCApiTags.Binary: - _serializationType = SerializationType.Binary; - break; - case XCApiTags.Json: - _serializationType = SerializationType.Json; - break; - case XCApiTags.Bson: - _serializationType = SerializationType.Bson; - break; + } + } + + #endregion + + private void InitObservableCollection() + { + StateMachineUpdatesStream = Observable.FromEvent, MessageEventArgs>( + handler => (sender, e) => handler(e), + h => MessageReceived += h, + h => MessageReceived -= h); + } + + private void OnMessageReceived(MessageEventArgs e) + { + MessageReceived?.Invoke(this, e); + } + + private void InitSubscriber(string stateMachine, string privateCommunicationIdentifier = null) + { + if (_xcConfiguration == null) + { + return; + } + + var exchangeName = _xcConfiguration.GetComponentCode(_component).ToString(); + var componentCode = _xcConfiguration.GetComponentCode(_component); + var stateMachineCode = _xcConfiguration.GetStateMachineCode(_component, stateMachine); + var routingKey = string.IsNullOrEmpty(privateCommunicationIdentifier) ? _xcConfiguration.GetSubscriberTopic(_component, stateMachine) : privateCommunicationIdentifier; + + try + { + var subscriptionKey = new SubscriptionKey(componentCode, stateMachineCode, routingKey); + if (!_subscribersDico.ContainsKey(subscriptionKey)) + { + bool createExchangeChannel = false; + RabbitMqSubscriberInfos rabbitMqSubscriberInfos = null; + + EventHandler handler = (o, basicAckEventArgs) => + { + var stateMachineRefHeader = RabbitMqHeaderConverter.ConvertStateMachineRefHeader(basicAckEventArgs.BasicProperties.Headers); + + if (stateMachineRefHeader.StateMachineCode == stateMachineCode) + { + rabbitMqSubscriberInfos.Channel?.BasicAck(basicAckEventArgs.DeliveryTag, false); + + var obj = _serializer.Deserialize(new MemoryStream(basicAckEventArgs.Body)); + + var msgEventArgs = new MessageEventArgs(stateMachineRefHeader, obj, _serializationType); + + OnMessageReceived(msgEventArgs); + } + }; + + if (!string.IsNullOrEmpty(privateCommunicationIdentifier)) + { + var privateSubscriptionKey = + _subscribersDico.Keys.FirstOrDefault( + k => k.ComponentCode == componentCode && k.RoutingKey == routingKey); + + createExchangeChannel = (privateSubscriptionKey == null); + + if (!createExchangeChannel) + { + rabbitMqSubscriberInfos = _subscribersDico[privateSubscriptionKey]; + } + } + else + { + createExchangeChannel = true; + } + + if (createExchangeChannel) + { + IModel channel; + EventingBasicConsumer subscriber; + + CreateExchangeChannel(exchangeName, routingKey, out channel, out subscriber); + + if (channel == null || subscriber == null) + { + return; + } + + rabbitMqSubscriberInfos = new RabbitMqSubscriberInfos(channel, subscriber, handler); + // Add the new element.. + _subscribersDico.AddOrUpdate(subscriptionKey, rabbitMqSubscriberInfos, + (key, oldValue) => rabbitMqSubscriberInfos); + } + else + { + // Update the existing subscription for that routing key to subscribe the new handler.. + rabbitMqSubscriberInfos.AddHandler(handler); + } + } + } + catch (OperationInterruptedException e) + { + throw new ReactiveXComponentException("Failed to init Subscriber: " + e.Message, e); + } + } + + private void InitSerializationType() + { + var serialization = _xcConfiguration.GetSerializationType(); + + switch (serialization) + { + case XCApiTags.Binary: + _serializationType = SerializationType.Binary; + break; + case XCApiTags.Json: + _serializationType = SerializationType.Json; + break; + case XCApiTags.Bson: + _serializationType = SerializationType.Bson; + break; default: - throw new XCSerializationException("Serialization type not supported"); - } - } - - private void CreateExchangeChannel(string exchangeName, string routingKey, out IModel channel, out EventingBasicConsumer subscriber) - { - if (_connection == null || !_connection.IsOpen) - { - channel = null; - subscriber = null; - return; - } - - channel = _connection.CreateModel(); - - channel.ModelShutdown += ChannelOnModelShutdown; - channel.ExchangeDeclare(exchangeName, ExchangeType.Topic); - var queueName = channel.QueueDeclare().QueueName; - subscriber = new EventingBasicConsumer(channel); - channel.BasicConsume(queueName, false, subscriber); - channel.QueueBind(queueName, exchangeName, routingKey, null); - } - - private void ChannelOnModelShutdown(object sender, ShutdownEventArgs shutdownEventArgs) - { - ConnectionFailed?.Invoke(this, shutdownEventArgs.ReplyText); - } - - private void DeleteSubscriber(SubscriptionKey subscriptionKey) - { + throw new XCSerializationException("Serialization type not supported"); + } + } + + private void CreateExchangeChannel(string exchangeName, string routingKey, out IModel channel, out EventingBasicConsumer subscriber) + { + if (_connection == null || !_connection.IsOpen) + { + channel = null; + subscriber = null; + return; + } + + channel = _connection.CreateModel(); + + channel.ModelShutdown += ChannelOnModelShutdown; + channel.ExchangeDeclare(exchangeName, ExchangeType.Topic); + var queueName = channel.QueueDeclare().QueueName; + subscriber = new EventingBasicConsumer(channel); + channel.BasicConsume(queueName, false, subscriber); + channel.QueueBind(queueName, exchangeName, routingKey, null); + } + + private void ChannelOnModelShutdown(object sender, ShutdownEventArgs shutdownEventArgs) + { + ConnectionFailed?.Invoke(this, shutdownEventArgs.ReplyText); + } + + private void DeleteSubscriber(SubscriptionKey subscriptionKey) + { var canBeRemoved = true; foreach (var element in _streamSubscriptionsDico.Keys) @@ -250,42 +250,42 @@ private void DeleteSubscriber(SubscriptionKey subscriptionKey) canBeRemoved = false; } } - - if (canBeRemoved) + + if (canBeRemoved) { - RabbitMqSubscriberInfos rabbitMqSubscriberInfos; + RabbitMqSubscriberInfos rabbitMqSubscriberInfos; if (_subscribersDico.TryRemove(subscriptionKey, out rabbitMqSubscriberInfos)) - { - if (rabbitMqSubscriberInfos.Channel.IsOpen) - { + { + if (rabbitMqSubscriberInfos.Channel.IsOpen) + { rabbitMqSubscriberInfos.Channel.ModelShutdown -= ChannelOnModelShutdown; - rabbitMqSubscriberInfos.Channel.BasicCancel(rabbitMqSubscriberInfos.Subscriber.ConsumerTag); + rabbitMqSubscriberInfos.Channel.BasicCancel(rabbitMqSubscriberInfos.Subscriber.ConsumerTag); } - rabbitMqSubscriberInfos.Channel.Dispose(); - - foreach (var handler in rabbitMqSubscriberInfos.Handlers) - { - rabbitMqSubscriberInfos.Subscriber.Received -= handler; - } - } - } - } - + rabbitMqSubscriberInfos.Channel.Dispose(); + + foreach (var handler in rabbitMqSubscriberInfos.Handlers) + { + rabbitMqSubscriberInfos.Subscriber.Received -= handler; + } + } + } + } + private void DeleteSubscription(SubscriptionKey subscriptionKey, Action stateMachineListener) - { - var streamSubscriptionKey = new StreamSubscriptionKey(subscriptionKey, stateMachineListener); - - IDisposable subscription; - - if (_streamSubscriptionsDico.TryRemove(streamSubscriptionKey, out subscription)) - { + { + var streamSubscriptionKey = new StreamSubscriptionKey(subscriptionKey, stateMachineListener); + + IDisposable subscription; + + if (_streamSubscriptionsDico.TryRemove(streamSubscriptionKey, out subscription)) + { subscription.Dispose(); - DeleteSubscriber(subscriptionKey); + DeleteSubscriber(subscriptionKey); } } - #region IDisposable implementation + #region IDisposable implementation private bool _disposed; @@ -296,27 +296,27 @@ private void Dispose(bool disposing) if (disposing) { // clear managed resources - foreach (var subscriberInfo in _subscribersDico.Values) - { - if (subscriberInfo.Channel.IsOpen) - { + foreach (var subscriberInfo in _subscribersDico.Values) + { + if (subscriberInfo.Channel.IsOpen) + { subscriberInfo.Channel.ModelShutdown -= ChannelOnModelShutdown; - subscriberInfo.Channel.BasicCancel(subscriberInfo.Subscriber.ConsumerTag); + subscriberInfo.Channel.BasicCancel(subscriberInfo.Subscriber.ConsumerTag); } - - subscriberInfo.Channel.Dispose(); - + + subscriberInfo.Channel.Dispose(); + foreach (var handler in subscriberInfo.Handlers) { subscriberInfo.Subscriber.Received -= handler; - } + } } - _subscribersDico.Clear(); - - foreach (var subscription in _streamSubscriptionsDico.Values) - { - subscription.Dispose(); + _subscribersDico.Clear(); + + foreach (var subscription in _streamSubscriptionsDico.Values) + { + subscription.Dispose(); } _streamSubscriptionsDico.Clear(); @@ -338,7 +338,7 @@ public void Dispose() Dispose(false); } - #endregion + #endregion - } -} + } +} diff --git a/ReactiveXComponentTest/Configuration/ConfigurationTests.cs b/ReactiveXComponentTest/Configuration/ConfigurationTests.cs index d0be8cb..1ce9830 100644 --- a/ReactiveXComponentTest/Configuration/ConfigurationTests.cs +++ b/ReactiveXComponentTest/Configuration/ConfigurationTests.cs @@ -74,6 +74,7 @@ public void GetBusDetailsTest() var busDetails = _rabbitMqConfiguration.GetBusDetails(); Check.That(busDetails.Host).IsEqualTo("127.0.0.1"); + Check.That(busDetails.VirtualHost).IsEqualTo("myVirtualHost"); Check.That(busDetails.Username).IsEqualTo("guest"); Check.That(busDetails.Password).IsEqualTo("guest"); Check.That(busDetails.Port).IsEqualTo(5672); diff --git a/ReactiveXComponentTest/Connection/ConnectionTests.cs b/ReactiveXComponentTest/Connection/ConnectionTests.cs index be1d6cb..f936c0a 100644 --- a/ReactiveXComponentTest/Connection/ConnectionTests.cs +++ b/ReactiveXComponentTest/Connection/ConnectionTests.cs @@ -25,7 +25,7 @@ public void SetUp() _rabbitMqConfiguration = Substitute.For(parser); _webSocketConfiguration = Substitute.For(parser); - var rabbitBusDetails = new BusDetails("guest", "guest", "localhost", 5672); + var rabbitBusDetails = new BusDetails("guest", "guest", "localhost", null, 5672); var webSocketEndpoint = new WebSocketEndpoint("websocket", "localhost", "443", WebSocketType.Secure); _rabbitMqConfiguration.GetBusDetails().Returns(rabbitBusDetails); diff --git a/ReactiveXComponentTest/RabbitMq/RabbitMqTests.cs b/ReactiveXComponentTest/RabbitMq/RabbitMqTests.cs index 3a2100a..332a060 100644 --- a/ReactiveXComponentTest/RabbitMq/RabbitMqTests.cs +++ b/ReactiveXComponentTest/RabbitMq/RabbitMqTests.cs @@ -37,7 +37,7 @@ public class RabbitMqTests private const string PublicRoutingKey = "202"; private const string Serialization = XCApiTags.Binary; private const string SnapshotSerialization = XCApiTags.Json; - private readonly BusDetails _busDetails = new BusDetails(User, Password, Host, Port); + private readonly BusDetails _busDetails = new BusDetails(User, Password, Host, null, Port); [TestCase(true, false)] [TestCase(false, false)] diff --git a/ReactiveXComponentTest/RabbitMqTestApi.xcApi b/ReactiveXComponentTest/RabbitMqTestApi.xcApi index 2996b19..c95c6d9 100644 --- a/ReactiveXComponentTest/RabbitMqTestApi.xcApi +++ b/ReactiveXComponentTest/RabbitMqTestApi.xcApi @@ -3,7 +3,7 @@ Binary - +