-
Notifications
You must be signed in to change notification settings - Fork 5
/
WebSocketServer.swift
146 lines (124 loc) · 4.61 KB
/
WebSocketServer.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import Combine
import Foundation
import NIO
import NIOHTTP1
import NIOSSL
import NIOWebSocket
import WebSocket
import WebSocketKit
enum WebSocketServerOutput: Hashable {
case message(WebSocketMessage)
case remoteClose
}
private typealias WS = WebSocketKit.WebSocket
final class WebSocketServer {
var port: Int { _port! }
private var _port: Int?
let maximumMessageSize: Int
// Publisher provided by consumers of `WebSocketServer` to provide the output
// `WebSocketServer` should send to its clients.
private let outputPublisher: AnyPublisher<WebSocketServerOutput, Error>
private var outputPublisherSubscription: AnyCancellable?
// Publisher the repeats everything sent to it by clients.
private let inputSubject = PassthroughSubject<WebSocketMessage, Never>()
private let eventLoopGroup: EventLoopGroup
private var channel: Channel?
init<P: Publisher>(
outputPublisher: P,
maximumMessageSize: Int = 1024 * 1024
) throws where P.Output == WebSocketServerOutput, P.Failure == Error {
self.outputPublisher = outputPublisher.eraseToAnyPublisher()
self.maximumMessageSize = maximumMessageSize
eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
channel = try makeWebSocket(
on: eventLoopGroup,
onUpgrade: onWebSocketUpgrade
)
.bind(to: SocketAddress(
ipAddress: "127.0.0.1",
port: 0 // random port
))
.wait()
if let port = channel?.localAddress?.port {
_port = port
}
}
private func makeWebSocket(
on eventLoopGroup: EventLoopGroup,
onUpgrade: @escaping (HTTPRequestHead, WS) -> Void
) -> ServerBootstrap {
ServerBootstrap(group: eventLoopGroup)
.serverChannelOption(
ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR),
value: 1
)
.childChannelInitializer { channel in
let ws = NIOWebSocketServerUpgrader(
shouldUpgrade: { channel, _ in
channel.eventLoop.makeSucceededFuture([:])
},
upgradePipelineHandler: { channel, req in
WebSocket.server(on: channel) { ws in
onUpgrade(req, ws)
}
}
)
return channel.pipeline.configureHTTPServerPipeline(
withServerUpgrade: (
upgraders: [ws],
completionHandler: { _ in }
)
)
}
}
private var onWebSocketUpgrade: (HTTPRequestHead, WS) -> Void {
{ [weak self] (_: HTTPRequestHead, ws: WS) in
guard let self else { return }
let sub = outputPublisher
.sink(
receiveCompletion: { completion in
switch completion {
case .finished:
_ = ws.close(code:)
case .failure:
_ = ws.close(code: .unexpectedServerError)
}
},
receiveValue: { output in
switch output {
case .remoteClose:
do { try ws.close(code: .goingAway).wait() }
catch {}
case let .message(message):
switch message {
case let .data(data):
ws.send(raw: data, opcode: .binary)
case let .text(text):
ws.send(text)
}
}
}
)
outputPublisherSubscription = sub
ws.onText { [weak self] _, text in
self?.inputSubject.send(.text(text))
}
ws.onBinary { [weak self] _, buffer in
guard let self,
let data = buffer.getData(
at: buffer.readerIndex,
length: buffer.readableBytes
)
else { return }
inputSubject.send(.data(data))
}
}
}
func shutDown() {
try? channel?.close(mode: .all).wait()
try? eventLoopGroup.syncShutdownGracefully()
}
var inputPublisher: AnyPublisher<WebSocketMessage, Never> {
inputSubject.eraseToAnyPublisher()
}
}