-
Notifications
You must be signed in to change notification settings - Fork 85
/
WebSocketMessageReadRfc6455Stream.cs
134 lines (111 loc) · 5.16 KB
/
WebSocketMessageReadRfc6455Stream.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
using System;
using System.Threading;
using System.Threading.Tasks;
namespace vtortola.WebSockets.Rfc6455
{
internal sealed class WebSocketMessageReadRfc6455Stream : WebSocketMessageReadStream
{
readonly WebSocketConnectionRfc6455 _webSocket;
readonly WebSocketMessageType _messageType;
readonly WebSocketExtensionFlags _flags;
bool _hasPendingFrames;
public override WebSocketMessageType MessageType { get { return _messageType; } }
public override WebSocketExtensionFlags Flags { get { return _flags; } }
public WebSocketMessageReadRfc6455Stream(WebSocketConnectionRfc6455 webSocket)
{
Guard.ParameterCannotBeNull(webSocket, nameof(webSocket));
_webSocket = webSocket;
_messageType = (WebSocketMessageType)_webSocket.CurrentHeader.Flags.Option;
_flags = GetExtensionFlags(_webSocket.CurrentHeader.Flags);
_hasPendingFrames = !_webSocket.CurrentHeader.Flags.FIN;
if (_webSocket.CurrentHeader.Flags.Option != WebSocketFrameOption.Binary && _webSocket.CurrentHeader.Flags.Option != WebSocketFrameOption.Text)
throw new WebSocketException("WebSocketMessageReadNetworkStream can only start with a Text or Binary frame, not " + _webSocket.CurrentHeader.Flags.Option.ToString());
}
private WebSocketExtensionFlags GetExtensionFlags(WebSocketFrameHeaderFlags webSocketFrameHeaderFlags)
{
var flags = new WebSocketExtensionFlags();
flags.Rsv1 = webSocketFrameHeaderFlags.RSV1;
flags.Rsv2 = webSocketFrameHeaderFlags.RSV2;
flags.Rsv3 = webSocketFrameHeaderFlags.RSV3;
return flags;
}
private int CheckBoundaries(byte[] buffer, int offset, int count)
{
if (count > buffer.Length - offset)
throw new ArgumentException("There is not space in the array for that length considering that offset.");
if (_webSocket.CurrentHeader == null)
return 0;
if (_webSocket.CurrentHeader.ContentLength < count)
count = (int)_webSocket.CurrentHeader.ContentLength;
if (_webSocket.CurrentHeader.RemainingBytes < count)
count = (int)_webSocket.CurrentHeader.RemainingBytes;
return count;
}
public override int Read(byte[] buffer, int offset, int count)
{
var readed = 0;
do
{
if (!_webSocket.IsConnected)
break;
var checkedcount = CheckBoundaries(buffer, offset, count);
if (checkedcount == 0 && !_hasPendingFrames)
{
_webSocket.DisposeCurrentHeaderIfFinished();
break;
}
else if (checkedcount == 0 && _hasPendingFrames)
{
LoadNewHeader();
}
else
{
readed = _webSocket.ReadInternal(buffer, offset, checkedcount);
_webSocket.DisposeCurrentHeaderIfFinished();
if (_webSocket.CurrentHeader == null && _hasPendingFrames)
LoadNewHeader();
}
} while (readed == 0 && _webSocket.CurrentHeader.RemainingBytes != 0);
return readed;
}
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancel)
{
Int32 readed = 0;
do
{
if (!_webSocket.IsConnected || cancel.IsCancellationRequested)
break;
var checkedcount = CheckBoundaries(buffer, offset, count);
if (checkedcount == 0 && !_hasPendingFrames)
{
_webSocket.DisposeCurrentHeaderIfFinished();
break;
}
else if (checkedcount == 0 && _hasPendingFrames)
{
await LoadNewHeaderAsync(cancel).ConfigureAwait(false);
}
else
{
readed = await _webSocket.ReadInternalAsync(buffer, offset, checkedcount, cancel).ConfigureAwait(false);
_webSocket.DisposeCurrentHeaderIfFinished();
if (_webSocket.CurrentHeader == null && _hasPendingFrames)
await LoadNewHeaderAsync(cancel).ConfigureAwait(false);
}
} while (readed == 0 && _webSocket.CurrentHeader.RemainingBytes != 0);
return readed;
}
private void LoadNewHeader()
{
_webSocket.AwaitHeader();
_hasPendingFrames = HasPendingFrames();
}
private async Task LoadNewHeaderAsync(CancellationToken cancel)
{
await _webSocket.AwaitHeaderAsync(cancel).ConfigureAwait(false);
_hasPendingFrames = HasPendingFrames();
}
private bool HasPendingFrames()
=> _webSocket.CurrentHeader != null && !_webSocket.CurrentHeader.Flags.FIN && _webSocket.CurrentHeader.Flags.Option == WebSocketFrameOption.Continuation;
}
}