/
WebSocket+Concurrency.swift
140 lines (131 loc) · 5.1 KB
/
WebSocket+Concurrency.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#if compiler(>=5.5) && canImport(_Concurrency)
import NIOCore
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension Request {
/// Upgrades an existing request to a websocket connection
public func webSocket(
maxFrameSize: WebSocketMaxFrameSize = .`default`,
shouldUpgrade: @escaping ((Request) async throws -> HTTPHeaders?) = { _ in [:] },
onUpgrade: @escaping (Request, WebSocket) async -> ()
) -> Response {
webSocket(
maxFrameSize: maxFrameSize,
shouldUpgrade: { request in
let promise = request.eventLoop.makePromise(of: HTTPHeaders?.self)
promise.completeWithTask {
try await shouldUpgrade(request)
}
return promise.futureResult
},
onUpgrade: { request, socket in
Task {
await onUpgrade(request, socket)
}
}
)
}
}
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension RoutesBuilder {
/// Adds a route for opening a web socket connection
/// - parameters:
/// - path: Path components separated by commas.
/// - maxFrameSize: The maximum allowed frame size. See `NIOWebSocketServerUpgrader`.
/// - shouldUpgrade: Closure to apply before upgrade to web socket happens.
/// Returns additional `HTTPHeaders` for response, `nil` to deny upgrading.
/// See `NIOWebSocketServerUpgrader`.
/// - onUpgrade: Closure to apply after web socket is upgraded successfully.
/// - returns: `Route` instance for newly created web socket endpoint
@discardableResult
public func webSocket(
_ path: PathComponent...,
maxFrameSize: WebSocketMaxFrameSize = .`default`,
shouldUpgrade: @escaping ((Request) async throws -> HTTPHeaders?) = { _ in [:] },
onUpgrade: @escaping (Request, WebSocket) async -> ()
) -> Route {
return self.webSocket(path, maxFrameSize: maxFrameSize, shouldUpgrade: shouldUpgrade, onUpgrade: onUpgrade)
}
/// Adds a route for opening a web socket connection
/// - parameters:
/// - path: Array of path components.
/// - maxFrameSize: The maximum allowed frame size. See `NIOWebSocketServerUpgrader`.
/// - shouldUpgrade: Closure to apply before upgrade to web socket happens.
/// Returns additional `HTTPHeaders` for response, `nil` to deny upgrading.
/// See `NIOWebSocketServerUpgrader`.
/// - onUpgrade: Closure to apply after web socket is upgraded successfully.
/// - returns: `Route` instance for newly created web socket endpoint
@discardableResult
public func webSocket(
_ path: [PathComponent],
maxFrameSize: WebSocketMaxFrameSize = .`default`,
shouldUpgrade: @escaping ((Request) async throws -> HTTPHeaders?) = { _ in [:] },
onUpgrade: @escaping (Request, WebSocket) async -> ()
) -> Route {
return self.on(.GET, path) { request -> Response in
return request.webSocket(maxFrameSize: maxFrameSize, shouldUpgrade: shouldUpgrade, onUpgrade: onUpgrade)
}
}
}
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension WebSocket {
public static func connect(
to url: String,
headers: HTTPHeaders = [:],
configuration: WebSocketClient.Configuration = .init(),
on eventLoopGroup: EventLoopGroup,
onUpgrade: @escaping (WebSocket) -> ()
) async throws {
guard let url = URL(string: url) else {
throw WebSocketClient.Error.invalidURL
}
return try await self.connect(
to: url,
headers: headers,
configuration: configuration,
on: eventLoopGroup,
onUpgrade: onUpgrade
)
}
public static func connect(
to url: URL,
headers: HTTPHeaders = [:],
configuration: WebSocketClient.Configuration = .init(),
on eventLoopGroup: EventLoopGroup,
onUpgrade: @escaping (WebSocket) -> ()
) async throws {
let scheme = url.scheme ?? "ws"
return try await self.connect(
scheme: scheme,
host: url.host ?? "localhost",
port: url.port ?? (scheme == "wss" ? 443 : 80),
path: url.path,
headers: headers,
configuration: configuration,
on: eventLoopGroup,
onUpgrade: onUpgrade
)
}
public static func connect(
scheme: String = "ws",
host: String,
port: Int = 80,
path: String = "/",
headers: HTTPHeaders = [:],
configuration: WebSocketClient.Configuration = .init(),
on eventLoopGroup: EventLoopGroup,
onUpgrade: @escaping (WebSocket) -> ()
) async throws {
return try await WebSocketClient(
eventLoopGroupProvider: .shared(eventLoopGroup),
configuration: configuration
).connect(
scheme: scheme,
host: host,
port: port,
path: path,
headers: headers,
onUpgrade: onUpgrade
).get()
}
}
#endif