Skip to content

Commit

Permalink
(#224) Create dedicated channel for Message Sequence
Browse files Browse the repository at this point in the history
  • Loading branch information
pardahlman committed May 7, 2017
1 parent d2032f2 commit 94c8fff
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<ProjectReference Include="..\RawRabbit.Operations.Publish\RawRabbit.Operations.Publish.csproj" />
<ProjectReference Include="..\RawRabbit.Enrichers.MessageContext.Subscribe\RawRabbit.Enrichers.MessageContext.Subscribe.csproj" />
<ProjectReference Include="..\RawRabbit.Operations.StateMachine\RawRabbit.Operations.StateMachine.csproj" />
<ProjectReference Include="..\RawRabbit.Operations.Tools\RawRabbit.Operations.Tools.csproj" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'net451' ">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RawRabbit.Logging;
using RawRabbit.Operations.MessageSequence.Configuration;
using RawRabbit.Operations.MessageSequence.Configuration.Abstraction;
Expand All @@ -11,6 +12,7 @@
using RawRabbit.Operations.StateMachine;
using RawRabbit.Operations.StateMachine.Trigger;
using RawRabbit.Pipe;
using RawRabbit.Pipe.Middleware;
using Stateless;

namespace RawRabbit.Operations.MessageSequence.StateMachine
Expand All @@ -24,6 +26,7 @@ public class MessageSequence : StateMachineBase<SequenceState, Type, SequenceMod
private readonly Queue<StepDefinition> _stepDefinitions;
private readonly List<Subscription.ISubscription> _subscriptions;
private readonly ILogger _logger = LogManager.GetLogger<MessageSequence>();
private IModel _channel;

public MessageSequence(IBusClient client, SequenceModel model = null) : base(model)
{
Expand Down Expand Up @@ -186,6 +189,7 @@ MessageSequence<TMessage> IMessageSequenceBuilder.Complete<TMessage>()
{
subscription.Dispose();
}
_channel.Dispose();
});

var trigger = StateMachine.SetTriggerParameters<TMessage>(typeof(TMessage));
Expand Down Expand Up @@ -222,12 +226,15 @@ MessageSequence<TMessage> IMessageSequenceBuilder.Complete<TMessage>()
)
);

_channel = _client.CreateChannelAsync().GetAwaiter().GetResult();

foreach (var triggerCfg in _triggerConfigurer.TriggerConfiguration)
{
triggerCfg.Context += context =>
{
context.Properties.Add(StateMachineKey.ModelId, Model.Id);
context.Properties.Add(StateMachineKey.Machine, this);
context.Properties.TryAdd(PipeKey.Channel, _channel);
};
var ctx = _client.InvokeAsync(triggerCfg.Pipe, triggerCfg.Context).GetAwaiter().GetResult();
_subscriptions.Add(ctx.GetSubscription());
Expand Down

0 comments on commit 94c8fff

Please sign in to comment.