/
SignalerWss.cs
166 lines (135 loc) · 5.34 KB
/
SignalerWss.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
using System;
using Cysharp.Threading.Tasks;
using NativeWebSocket;
using UniRx;
using Unity.WebRTC;
using UnityEngine;
namespace WebRTC.Signaler
{
public class SignalerWss : ISignaler, IDisposable
{
#region WSS
private readonly WebSocket _webSocket;
public WebSocket WebSocket => _webSocket;
public bool IsConnected => _webSocket.State == WebSocketState.Open;
private readonly CompositeDisposable _compositeDisposable = new();
public SignalerWss(string url)
{
_webSocket = new WebSocket(url);
Observable.FromEvent<WebSocketMessageEventHandler, byte[]>(
h => h.Invoke,
h => _webSocket.OnMessage += h,
h => _webSocket.OnMessage -= h
).Subscribe(OnReceiveMessage)
.AddTo(_compositeDisposable);
Observable.FromEvent<WebSocketOpenEventHandler>(
h => h.Invoke,
h => _webSocket.OnOpen += h,
h => _webSocket.OnOpen -= h
)
.Subscribe(OnOpen).AddTo(_compositeDisposable);
Observable.FromEvent<WebSocketCloseEventHandler, WebSocketCloseCode>(
h => h.Invoke,
h => _webSocket.OnClose += h,
h => _webSocket.OnClose -= h
).Subscribe(OnClose).AddTo(_compositeDisposable);
Observable.FromEvent<WebSocketErrorEventHandler, string>(
h => h.Invoke,
h => _webSocket.OnError += h,
h => _webSocket.OnError -= h
).Subscribe(OnError).AddTo(_compositeDisposable);
Observable.EveryUpdate()
.Subscribe(_ => _webSocket.DispatchMessageQueue())
.AddTo(_compositeDisposable);
}
public async UniTask Connect()
{
// never return
_webSocket.Connect().AsUniTask().Forget();
bool IsConnecting() => _webSocket.State == WebSocketState.Connecting;
// *** -> Connecting
await UniTask.WaitUntil(IsConnecting);
// Connecting -> ***
await UniTask.WaitWhile(IsConnecting);
}
public void Dispose()
{
_webSocket?.Close();
_compositeDisposable?.Dispose();
_receiveOffer?.Dispose();
_receiveAnswer?.Dispose();
_receiveIce?.Dispose();
}
private void OnOpen(Unit _)
{
Debug.Log($"{nameof(SignalerWss)} WS {nameof(OnOpen)}");
}
private void OnClose(WebSocketCloseCode code)
{
Debug.Log($"{nameof(SignalerWss)} WS {nameof(OnClose)} {code}");
}
private void OnError(string error)
{
Debug.LogError($"{nameof(SignalerWss)} WS {nameof(OnError)} {error}");
}
#endregion
#region Message
public enum MessageType : byte
{
Offer = 1,
Answer = 2,
Ice = 3,
}
private void OnReceiveMessage(byte[] message)
{
var t = message[message.Length - 1];
Array.Resize(ref message, message.Length - 1);
var s = System.Text.Encoding.UTF8.GetString(message);
switch (t)
{
case (byte)MessageType.Offer:
_receiveOffer?.OnNext(SessionDescriptionToJson.FromJson(s).To());
break;
case (byte)MessageType.Answer:
_receiveAnswer?.OnNext(SessionDescriptionToJson.FromJson(s).To());
break;
case (byte)MessageType.Ice:
_receiveIce?.OnNext(IceCandidateToJson.FromJson(s).To());
break;
default:
return;
}
}
public UniTask SendMessage(MessageType type, string json)
{
var s = System.Text.Encoding.UTF8.GetBytes(json);
Array.Resize(ref s, s.Length + 1);
s[s.Length - 1] = (byte)type;
return _webSocket.Send(s).AsUniTask();
}
#endregion
#region ISignaler
public UniTask SendOffer(RTCSessionDescription description)
{
Debug.Log($"{nameof(SignalerWss)} {nameof(SendOffer)} {description}");
return SendMessage(MessageType.Offer, description.From().ToJson());
}
public UniTask SendAnswer(RTCSessionDescription description)
{
Debug.Log($"{nameof(SignalerWss)} {nameof(SendAnswer)} {description}");
return SendMessage(MessageType.Answer, description.From().ToJson());
}
public UniTask SendIce(RTCIceCandidate candidate)
{
Debug.Log($"{nameof(SignalerWss)} {nameof(SendIce)} {candidate}");
return SendMessage(MessageType.Ice, candidate.From().ToJson());
}
private readonly Subject<RTCSessionDescription> _receiveOffer = new();
public IObservable<RTCSessionDescription> ReceiveOffer => _receiveOffer;
private readonly Subject<RTCSessionDescription> _receiveAnswer = new();
public IObservable<RTCSessionDescription> ReceiveAnswer => _receiveAnswer;
private readonly Subject<RTCIceCandidate> _receiveIce = new();
public IObservable<RTCIceCandidate> ReceiveIce => _receiveIce;
#endregion
}
}