Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ReactiveXComponent/Common/Header.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System;

namespace ReactiveXComponent.Common
{
public class Header
Expand All @@ -17,5 +19,7 @@ public class Header
public string PublishTopic { get; set; }

public string ErrorMessage { get; set; }

public string MessageId { get; set; } = Guid.NewGuid().ToString();
}
}
2 changes: 2 additions & 0 deletions ReactiveXComponent/Common/HeaderElement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@ public static class HeaderElement
public const string SessionData = "SessionData";
public const string EventType = "EventType";
public const string ErrorMessage = "ErrorMessage";
public const string MessageId = "MessageId";
public const string WorkerId = "WorkerId";
}
}
35 changes: 6 additions & 29 deletions ReactiveXComponent/Common/StateMachineRefHeader.cs
Original file line number Diff line number Diff line change
@@ -1,37 +1,10 @@

using System;

namespace ReactiveXComponent.Common
{
public class StateMachineRefHeader
{
protected bool Equals(StateMachineRefHeader other)
{
return StateMachineId == other.StateMachineId && StateCode == other.StateCode && StateMachineCode == other.StateMachineCode && ComponentCode == other.ComponentCode && string.Equals(MessageType, other.MessageType) && string.Equals(PrivateTopic, other.PrivateTopic) && string.Equals(SessionData, other.SessionData) && string.Equals(ErrorMessage, other.ErrorMessage);
}

public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj)) return false;
if (ReferenceEquals(this, obj)) return true;
if (obj.GetType() != this.GetType()) return false;
return Equals((StateMachineRefHeader) obj);
}

public override int GetHashCode()
{
unchecked
{
var hashCode = (StateMachineId != null ? StateMachineId.GetHashCode() : 0);
hashCode = (hashCode * 397) ^ StateCode;
hashCode = (hashCode * 397) ^ StateMachineCode;
hashCode = (hashCode * 397) ^ ComponentCode;
hashCode = (hashCode * 397) ^ (MessageType != null ? MessageType.GetHashCode() : 0);
hashCode = (hashCode * 397) ^ (PrivateTopic != null ? PrivateTopic.GetHashCode() : 0);
hashCode = (hashCode * 397) ^ (SessionData != null ? SessionData.GetHashCode() : 0);
hashCode = (hashCode * 397) ^ (ErrorMessage != null ? ErrorMessage.GetHashCode() : 0);
return hashCode;
}
}

public string StateMachineId { get; set; }

public int StateCode { get; set; }
Expand All @@ -47,5 +20,9 @@ public override int GetHashCode()
public string SessionData { get; set; }

public string ErrorMessage { get; set; }

public string MessageId { get; set; } = Guid.NewGuid().ToString();

public int WorkerId { get; set; }
}
}
13 changes: 9 additions & 4 deletions ReactiveXComponent/Configuration/BusDetails.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ public BusDetails()

}

public BusDetails(string username, string password, string host, int port)
public BusDetails(string username, string password, string host, string virtualHost, int port)
{
Username = username;
Password = password;
Host = host;
VirtualHost = virtualHost;
Port = port;

}

public string Username { get; set; }
Expand All @@ -22,14 +24,17 @@ public BusDetails(string username, string password, string host, int port)

public string Host { get; set; }

public string VirtualHost { get; set; }

public int Port { get; set; }

public BusDetails Clone()
{
return new BusDetails(
(string)Username?.Clone(),
(string)Password?.Clone(),
(string)Host?.Clone(),
Username,
Password,
Host,
VirtualHost,
Port);
}
}
Expand Down
2 changes: 2 additions & 0 deletions ReactiveXComponent/Configuration/ConfigurationOverrides.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ public class ConfigurationOverrides
{
public string Host { get; set; }

public string VirtualHost { get; set; }

public string Port { get; set; }

public string Username { get; set; }
Expand Down
1 change: 1 addition & 0 deletions ReactiveXComponent/Parser/XCApiConfigParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public BusDetails GetBusDetails()
busInfos?.Attribute("user")?.Value,
busInfos?.Attribute("password")?.Value,
busInfos?.Attribute("host")?.Value,
busInfos?.Attribute("virtualHost")?.Value,
Convert.ToInt32(busInfos?.Attribute("port")?.Value));

return busDetails;
Expand Down
5 changes: 5 additions & 0 deletions ReactiveXComponent/RabbitMq/RabbitMqConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public IXCSession CreateSession(ConfigurationOverrides configurationOverrides =
busDetails.Host = configurationOverrides.Host;
}

if (!string.IsNullOrEmpty(configurationOverrides.VirtualHost))
{
busDetails.VirtualHost = configurationOverrides.VirtualHost;
}

if (configurationOverrides.Port != null)
{
busDetails.Port = int.Parse(configurationOverrides.Port);
Expand Down
20 changes: 17 additions & 3 deletions ReactiveXComponent/RabbitMq/RabbitMqHeaderConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ public static Dictionary<string, object> ConvertHeader(Header header)
{HeaderElement.IncomingEventType, header?.IncomingEventType},
{HeaderElement.PublishTopic, header?.PublishTopic != null ? encoding.GetBytes(header.PublishTopic) : encoding.GetBytes(string.Empty) },
{HeaderElement.MessageType, header?.MessageType != null ? encoding.GetBytes(header.MessageType) : encoding.GetBytes(string.Empty)},
{HeaderElement.ErrorMessage, header?.ErrorMessage != null ? encoding.GetBytes(header.ErrorMessage) : encoding.GetBytes(string.Empty)}
{HeaderElement.ErrorMessage, header?.ErrorMessage != null ? encoding.GetBytes(header.ErrorMessage) : encoding.GetBytes(string.Empty)},
{HeaderElement.MessageId, header?.MessageId != null ? encoding.GetBytes(header.MessageId) : encoding.GetBytes(string.Empty) },


};
return dico;
}
Expand All @@ -40,7 +43,10 @@ public static Dictionary<string, object> CreateHeaderFromStateMachineRefHeader(S
{HeaderElement.MessageType, stateMachineRefHeader?.MessageType != null ? encoding.GetBytes(stateMachineRefHeader.MessageType) : encoding.GetBytes(string.Empty)},
{HeaderElement.EventType, eventCode},
{HeaderElement.IncomingEventType, (int)incomingEventType},
{HeaderElement.ErrorMessage, stateMachineRefHeader?.ErrorMessage != null ? encoding.GetBytes(stateMachineRefHeader.ErrorMessage) : encoding.GetBytes(string.Empty)}
{HeaderElement.ErrorMessage, stateMachineRefHeader?.ErrorMessage != null ? encoding.GetBytes(stateMachineRefHeader.ErrorMessage) : encoding.GetBytes(string.Empty)},
{HeaderElement.MessageId, stateMachineRefHeader?.MessageId != null ? encoding.GetBytes(stateMachineRefHeader.MessageId) : encoding.GetBytes(string.Empty) },
{HeaderElement.WorkerId, stateMachineRefHeader?.WorkerId},

};
return dico;
}
Expand All @@ -56,6 +62,8 @@ public static StateMachineRefHeader ConvertStateMachineRefHeader(IDictionary<str
var messageType = string.Empty;
var sessionData = string.Empty;
var errorMessage = string.Empty;
var messageId = string.Empty;
var workerId = -1;

if (stateMachineRefHeader.ContainsKey(HeaderElement.StateMachineId) && stateMachineRefHeader[HeaderElement.StateMachineId] != null)
stateMachineId = Encoding.UTF8.GetString((byte[])stateMachineRefHeader[HeaderElement.StateMachineId]);
Expand All @@ -73,6 +81,10 @@ public static StateMachineRefHeader ConvertStateMachineRefHeader(IDictionary<str
sessionData = encoding.GetString(stateMachineRefHeader[HeaderElement.SessionData] as byte[]);
if (stateMachineRefHeader.ContainsKey(HeaderElement.ErrorMessage))
errorMessage = encoding.GetString(stateMachineRefHeader[HeaderElement.ErrorMessage] as byte[]);
if (stateMachineRefHeader.ContainsKey(HeaderElement.MessageId))
messageId = encoding.GetString(stateMachineRefHeader[HeaderElement.MessageId] as byte[]);
if (stateMachineRefHeader.ContainsKey(HeaderElement.WorkerId))
workerId = Convert.ToInt32(stateMachineRefHeader[HeaderElement.WorkerId]);

return new StateMachineRefHeader()
{
Expand All @@ -83,7 +95,9 @@ public static StateMachineRefHeader ConvertStateMachineRefHeader(IDictionary<str
MessageType = messageType,
PrivateTopic = publishTopic,
SessionData = sessionData,
ErrorMessage = errorMessage
ErrorMessage = errorMessage,
MessageId = messageId,
WorkerId = workerId,
};
}
}
Expand Down
6 changes: 4 additions & 2 deletions ReactiveXComponent/RabbitMq/RabbitMqPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private Header CreateHeader(string component, string stateMachine, object messag
StateCode = defaultValue,
EventCode = _configuration.GetPublisherEventCode(messageType),
IncomingEventType = (int)IncomingEventType.Transition,
PublishTopic = visibility == Visibility.Private && !string.IsNullOrEmpty(_privateCommunicationIdentifier)? _privateCommunicationIdentifier : string.Empty
PublishTopic = visibility == Visibility.Private && !string.IsNullOrEmpty(_privateCommunicationIdentifier)? _privateCommunicationIdentifier : string.Empty,
};

return header;
Expand All @@ -133,7 +133,9 @@ private StateMachineRefHeader CreateStateMachineRefHeader(StateMachineRefHeader
StateMachineCode = stateMachineRefHeader.StateMachineCode,
ComponentCode = stateMachineRefHeader.ComponentCode,
MessageType = messageType,
PrivateTopic = visibility == Visibility.Private && !string.IsNullOrEmpty(_privateCommunicationIdentifier) ? _privateCommunicationIdentifier : string.Empty
PrivateTopic = visibility == Visibility.Private && !string.IsNullOrEmpty(_privateCommunicationIdentifier) ? _privateCommunicationIdentifier : string.Empty,
MessageId = stateMachineRefHeader.MessageId,
WorkerId = stateMachineRefHeader.WorkerId,
};

return stateMachineRefheader;
Expand Down
2 changes: 1 addition & 1 deletion ReactiveXComponent/RabbitMq/RabbitMqSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private void InitConnection(BusDetails busDetails)
{
UserName = busDetails.Username ?? XCApiTags.DefaultUserName,
Password = busDetails.Password ?? XCApiTags.DefaultPassword,
VirtualHost = ConnectionFactory.DefaultVHost,
VirtualHost = string.IsNullOrEmpty(busDetails.VirtualHost) ? ConnectionFactory.DefaultVHost : busDetails.VirtualHost,
HostName = busDetails.Host,
Port = busDetails.Port,
Protocol = Protocols.DefaultProtocol
Expand Down
3 changes: 2 additions & 1 deletion ReactiveXComponent/RabbitMq/RabbitMqSnapshotManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize =
StateMachineId = item.StateMachineId,
StateMachineCode = item.StateMachineCode,
ComponentCode = item.ComponentCode,
StateCode = item.StateCode
StateCode = item.StateCode,
WorkerId = item.WorkerId,
},
item.PublicMember,
_serializationType)).ToList();
Expand Down
Loading