This repository has been archived by the owner on Apr 7, 2022. It is now read-only.
/
WebSocketHandler.swift
164 lines (146 loc) 路 5.83 KB
/
WebSocketHandler.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import Foundation
extension WebSocket {
/// Creates an `HTTPProtocolUpgrader` that will create instances of this class upon HTTP upgrade.
public static func httpProtocolUpgrader(
shouldUpgrade: @escaping (HTTPRequest) -> (HTTPHeaders?),
onUpgrade: @escaping (WebSocket, HTTPRequest) -> ()
) -> HTTPProtocolUpgrader {
return WebSocketUpgrader(shouldUpgrade: { head in
let req = HTTPRequest(
method: head.method,
url: URL(string: head.uri)!,
version: head.version,
headers: head.headers,
body: .init()
)
return shouldUpgrade(req)
}, upgradePipelineHandler: { channel, head in
let req = HTTPRequest(
method: head.method,
url: URL(string: head.uri)!,
version: head.version,
headers: head.headers,
body: .init()
)
let handler = WebsocketHandler()
let websocket = WebSocket(handler: handler)
handler.onConnect = {
onUpgrade(websocket, req)
}
return channel.pipeline.add(handler: handler)
})
}
}
/// NIO channel pipeline handler for web sockets.
internal final class WebsocketHandler: ChannelInboundHandler {
typealias InboundIn = WebSocketFrame
typealias OutboundOut = WebSocketFrame
private var awaitingClose: Bool = false
internal var onText: ((String) -> ())?
internal var onData: ((Data) -> ())?
internal var onClose: (() -> ())?
internal var onError: ((Error) -> ())?
internal var onConnect: (() -> ())?
private var currentCtx: ChannelHandlerContext?
public func handlerAdded(ctx: ChannelHandlerContext) {
// do something here?
self.currentCtx = ctx
onConnect?()
onConnect = nil
}
func handlerRemoved(ctx: ChannelHandlerContext) {
close()
}
public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
var frame = self.unwrapInboundIn(data)
switch frame.opcode {
case .connectionClose:
self.receivedClose(ctx: ctx, frame: frame)
case .ping:
self.pong(ctx: ctx, frame: frame)
case .unknownControl, .unknownNonControl:
self.closeOnError(ctx: ctx)
case .text:
if let onText = self.onText {
var data = frame.unmaskedData
let string = data.readString(length: data.readableBytes)!
onText(string)
}
case .binary:
if let onData = self.onData {
var data = frame.unmaskedData
let string = data.readData(length: data.readableBytes)!
onData(string)
}
default:
// We ignore all other frames.
break
}
}
public func channelReadComplete(ctx: ChannelHandlerContext) {
ctx.flush()
}
func close() {
onClose?()
onClose = nil
onData = nil
onText = nil
onError = nil
currentCtx?.close(promise: nil)
currentCtx = nil
}
func send(count: Int, opcode: WebSocketOpcode, bufferWriter: @escaping (inout ByteBuffer) -> ()) {
let ctx = currentCtx!
ctx.eventLoop.execute {
guard ctx.channel.isActive else { return }
// We can't send if we sent a close message.
guard !self.awaitingClose else { return }
var buffer = ctx.channel.allocator.buffer(capacity: count)
bufferWriter(&buffer)
let frame = WebSocketFrame(fin: true, opcode: opcode, data: buffer)
ctx.writeAndFlush(self.wrapOutboundOut(frame), promise: nil)
}
}
private func receivedClose(ctx: ChannelHandlerContext, frame: WebSocketFrame) {
// Handle a received close frame. In websockets, we're just going to send the close
// frame and then close, unless we already sent our own close frame.
if awaitingClose {
// Cool, we started the close and were waiting for the user. We're done.
close()
} else {
// This is an unsolicited close. We're going to send a response frame and
// then, when we've sent it, close up shop. We should send back the close code the remote
// peer sent us, unless they didn't send one at all.
var data = frame.unmaskedData
let closeDataCode = data.readSlice(length: 2) ?? ctx.channel.allocator.buffer(capacity: 0)
let closeFrame = WebSocketFrame(fin: true, opcode: .connectionClose, data: closeDataCode)
_ = ctx.write(self.wrapOutboundOut(closeFrame)).map { () in
self.close()
}
}
}
private func pong(ctx: ChannelHandlerContext, frame: WebSocketFrame) {
var frameData = frame.data
let maskingKey = frame.maskKey
if let maskingKey = maskingKey {
frameData.webSocketUnmask(maskingKey)
}
let responseFrame = WebSocketFrame(fin: true, opcode: .pong, data: frameData)
ctx.write(self.wrapOutboundOut(responseFrame), promise: nil)
}
func errorCaught(ctx: ChannelHandlerContext, error: Error) {
onError?(error)
}
private func closeOnError(ctx: ChannelHandlerContext) {
// We have hit an error, we want to close. We do that by sending a close frame and then
// shutting down the write side of the connection.
var data = ctx.channel.allocator.buffer(capacity: 2)
let error = WebSocketErrorCode.protocolError
data.write(webSocketErrorCode: error)
let frame = WebSocketFrame(fin: true, opcode: .connectionClose, data: data)
_ = ctx.write(self.wrapOutboundOut(frame)).then {
ctx.close(mode: .output)
}
awaitingClose = true
}
}