/
AsyncSub.cs
executable file
·130 lines (107 loc) · 3.34 KB
/
AsyncSub.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Copyright 2015 Apcera Inc. All rights reserved.
using System;
using System.Threading;
using System.Threading.Tasks;
// disable XML comment warnings
#pragma warning disable 1591
namespace NATS.Client
{
public sealed class AsyncSubscription : Subscription, IAsyncSubscription, ISubscription
{
public event EventHandler<MsgHandlerEventArgs> MessageHandler;
private MsgHandlerEventArgs msgHandlerArgs = new MsgHandlerEventArgs();
private Task msgFeeder = null;
private bool started = false;
internal AsyncSubscription(Connection conn, string subject, string queue)
: base(conn, subject, queue)
{
mch = conn.getMessageChannel();
if ((ownsChannel = (mch == null)))
{
mch = new Channel<Msg>();
}
}
internal protected override bool processMsg(Msg msg)
{
Connection localConn;
EventHandler<MsgHandlerEventArgs> localHandler;
long localMax;
lock (mu)
{
if (closed)
return false;
localConn = conn;
localHandler = MessageHandler;
localMax = max;
}
// the message handler has not been setup yet, drop the
// message.
if (MessageHandler == null)
return true;
if (conn == null)
return false;
long d = tallyDeliveredMessage(msg);
if (localMax <= 0 || d <= localMax)
{
msgHandlerArgs.msg = msg;
try
{
localHandler(this, msgHandlerArgs);
}
catch (Exception) { }
if (d == max)
{
unsubscribe(false);
conn = null;
}
}
return true;
}
internal bool isStarted()
{
return started;
}
internal void enableAsyncProcessing()
{
if (ownsChannel && msgFeeder == null)
{
msgFeeder = new Task(() => { conn.deliverMsgs(mch); },
TaskCreationOptions.LongRunning);
msgFeeder.Start();
}
started = true;
}
internal void disableAsyncProcessing()
{
if (msgFeeder != null)
{
mch.close();
msgFeeder = null;
}
started = false;
}
public void Start()
{
if (started)
return;
if (conn == null)
throw new NATSBadSubscriptionException();
conn.sendSubscriptionMessage(this);
enableAsyncProcessing();
}
override public void Unsubscribe()
{
disableAsyncProcessing();
base.Unsubscribe();
}
public override void AutoUnsubscribe(int max)
{
Start();
base.AutoUnsubscribe(max);
}
internal override void close()
{
close(ownsChannel);
}
}
}