-
Notifications
You must be signed in to change notification settings - Fork 34
/
EventBus.cs
151 lines (136 loc) · 6.13 KB
/
EventBus.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Redbus.Configuration;
using Redbus.Events;
using Redbus.Interfaces;
namespace Redbus
{
/// <summary>
/// Implements <see cref="IEventBus"/>.
/// </summary>
public class EventBus : IEventBus
{
private readonly IEventBusConfiguration _eventBusConfiguration;
public EventBus(IEventBusConfiguration configuration = null)
{
_eventBusConfiguration = configuration ?? EventBusConfiguration.Default;
_subscriptions = new Dictionary<Type, List<ISubscription>>();
}
/// <summary>
/// Subscribes to the specified event type with the specified action
/// </summary>
/// <typeparam name="TEventBase">The type of event</typeparam>
/// <param name="action">The Action to invoke when an event of this type is published</param>
/// <returns>A <see cref="SubscriptionToken"/> to be used when calling <see cref="Unsubscribe"/></returns>
public SubscriptionToken Subscribe<TEventBase>(Action<TEventBase> action) where TEventBase : EventBase
{
if (action == null)
throw new ArgumentNullException(nameof(action));
lock (SubscriptionsLock)
{
if (!_subscriptions.ContainsKey(typeof(TEventBase)))
_subscriptions.Add(typeof(TEventBase), new List<ISubscription>());
var token = new SubscriptionToken(typeof(TEventBase));
_subscriptions[typeof(TEventBase)].Add(new Subscription<TEventBase>(action, token));
return token;
}
}
/// <summary>
/// Unsubscribe from the Event type related to the specified <see cref="SubscriptionToken"/>
/// </summary>
/// <param name="token">The <see cref="SubscriptionToken"/> received from calling the Subscribe method</param>
public void Unsubscribe(SubscriptionToken token)
{
if (token == null)
throw new ArgumentNullException(nameof(token));
lock (SubscriptionsLock)
{
if (_subscriptions.ContainsKey(token.EventItemType))
{
var allSubscriptions = _subscriptions[token.EventItemType];
var subscriptionToRemove = allSubscriptions.FirstOrDefault(x => x.SubscriptionToken.Token == token.Token);
if (subscriptionToRemove != null)
_subscriptions[token.EventItemType].Remove(subscriptionToRemove);
}
}
}
/// <summary>
/// Publishes the specified event to any subscribers for the <see cref="TEventBase"/> event type
/// </summary>
/// <typeparam name="TEventBase">The type of event</typeparam>
/// <param name="eventItem">Event to publish</param>
public void Publish<TEventBase>(TEventBase eventItem) where TEventBase : EventBase
{
if (eventItem == null)
throw new ArgumentNullException(nameof(eventItem));
var allSubscriptions = new List<ISubscription>();
lock (SubscriptionsLock)
{
if (_subscriptions.ContainsKey(typeof(TEventBase)))
allSubscriptions = _subscriptions[typeof(TEventBase)].ToList();
}
for (var index = 0; index < allSubscriptions.Count; index++)
{
var subscription = allSubscriptions[index];
try
{
subscription.Publish(eventItem);
}
catch (Exception)
{
if (_eventBusConfiguration.ThrowSubscriberException)
throw;
}
}
}
/// <summary>
/// Publishes the specified event to any subscribers for the <see cref="TEventBase"/> event type asychronously
/// </summary>
/// <remarks> This is a wrapper call around the synchronous method as this method is naturally synchronous (CPU Bound) </remarks>
/// <typeparam name="TEventBase">The type of event</typeparam>
/// <param name="eventItem">Event to publish</param>
public void PublishAsync<TEventBase>(TEventBase eventItem) where TEventBase : EventBase
{
PublishAsyncInternal(eventItem, null);
}
/// <summary>
/// Publishes the specified event to any subscribers for the <see cref="TEventBase"/> event type asychronously
/// </summary>
/// <remarks> This is a wrapper call around the synchronous method as this method is naturally synchronous (CPU Bound) </remarks>
/// <typeparam name="TEventBase">The type of event</typeparam>
/// <param name="eventItem">Event to publish</param>
/// <param name="callback"><see cref="AsyncCallback"/> that is called on completion</param>
public void PublishAsync<TEventBase>(TEventBase eventItem, AsyncCallback callback) where TEventBase : EventBase
{
PublishAsyncInternal(eventItem, callback);
}
#region PRIVATE METHODS
private void PublishAsyncInternal<TEventBase>(TEventBase eventItem, AsyncCallback callback) where TEventBase : EventBase
{
Task<bool> publishTask = new Task<bool>(() =>
{
Publish(eventItem);
return true;
});
publishTask.Start();
if (callback == null)
return;
var tcs = new TaskCompletionSource<bool>();
publishTask.ContinueWith(t =>
{
if (t.IsFaulted)
tcs.TrySetException(t.Exception.InnerExceptions);
else if (t.IsCanceled)
tcs.TrySetCanceled();
else
tcs.TrySetResult(t.Result);
callback?.Invoke(tcs.Task);
}, TaskScheduler.Default);
}
#endregion
private readonly Dictionary<Type, List<ISubscription>> _subscriptions;
private static readonly object SubscriptionsLock = new object();
}
}