Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
pokryfka committed Oct 24, 2023
1 parent 49c33eb commit 08ae2b0
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 46 deletions.
10 changes: 4 additions & 6 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ let package = Package(
name: "swift-graphql",
platforms: [
.iOS(.v15),
.macOS(.v10_15),
.tvOS(.v13),
.watchOS(.v6)
.macOS(.v12),
.tvOS(.v15),
.watchOS(.v8)
],
products: [
// SwiftGraphQL
Expand All @@ -27,7 +27,6 @@ let package = Package(
.package(url: "https://github.com/apple/swift-argument-parser", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-format", "508.0.0"..<"510.0.0"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
.package(url: "https://github.com/daltoniam/Starscream.git", from: "4.0.0"),
.package(url: "https://github.com/dominicegginton/Spinner", from: "2.0.0"),
.package(url: "https://github.com/JohnSundell/Files", from: "4.0.0"),
.package(url: "https://github.com/jpsim/Yams.git", from: "5.0.0"),
Expand All @@ -40,8 +39,7 @@ let package = Package(
name: "GraphQLWebSocket",
dependencies: [
"GraphQL",
.product(name: "Logging", package: "swift-log"),
"Starscream"
.product(name: "Logging", package: "swift-log")
],
path: "Sources/GraphQLWebSocket",
exclude: ["README.md"]
Expand Down
205 changes: 176 additions & 29 deletions Sources/GraphQLWebSocket/Client.swift
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
// This file is heavily inspired by https://github.com/enisdenjo/graphql-ws.

import Combine
import Dispatch
import GraphQL
import Foundation
import Starscream

/// A GraphQL client that lets you send queries over WebSocket protocol.
///
/// - NOTE: The client assumes that you'll manually establish the socket connection
/// and that it may send requests.
public class GraphQLWebSocket: WebSocketDelegate {
public class GraphQLWebSocket: NSObject, URLSessionWebSocketDelegate {

/// URL session used to create the Web socket.
private let session: URLSession

/// Configuration of the behaviour of the client.
private let config: GraphQLWebSocketConfiguration
Expand All @@ -20,7 +23,12 @@ public class GraphQLWebSocket: WebSocketDelegate {
// MARK: - State

/// Transport WebSocket connection with the server.
private var socket: WebSocket?
private var socket: URLSessionWebSocketTask?

/// The task used to receive messages from the server.
private var receiveTask: Task<Void, Never>?

private let dataQueue = DispatchQueue(label: "GraphQLWebSocket-\(UUID().uuidString)")

/// Holds information about the connection health and what the client is doing about it.
private var health: Health = Health.notconnected
Expand Down Expand Up @@ -74,7 +82,7 @@ public class GraphQLWebSocket: WebSocketDelegate {
case connecting

/// WebSocket has opened.
case opened(socket: WebSocketClient)
case opened

/// Open WebSocket connection has been acknowledged
case acknowledged(payload: [String: AnyCodable]?)
Expand Down Expand Up @@ -131,18 +139,32 @@ public class GraphQLWebSocket: WebSocketDelegate {
/// Creates a new GraphQL WebSocket client from the given connection.
public init(
request: URLRequest,
session: URLSession = .shared,
config: GraphQLWebSocketConfiguration = GraphQLWebSocketConfiguration()
) {
self.request = request
self.session = session
self.config = config

super.init()

if self.config.behaviour == .eager {
self.connect()
}
}

deinit {
assert(socket == nil, "Make sure to disconnect")
self.socket?.cancel(with: .normalClosure, reason: nil)

assert(receiveTask?.isCancelled != true, "Task not cancelled")
receiveTask?.cancel()
}

// MARK: - Internals

// TODO: keeping for reference, remove in final version
#if false
public func didReceive(event: WebSocketEvent, client: WebSocketClient) {
self.config.logger.debug("Received a new message from the server!")

Expand Down Expand Up @@ -223,6 +245,7 @@ public class GraphQLWebSocket: WebSocketDelegate {
self.close(code: 1006)
}
}
#endif

/// Creates a new socket connection and kicks-off the communication with the server.
private func connect() {
Expand All @@ -240,14 +263,12 @@ public class GraphQLWebSocket: WebSocketDelegate {
request.setValue("application/json", forHTTPHeaderField: "Content-Type")
request.httpMethod = "GET"

let socket = WebSocket(request: request)
let socket = session.webSocketTask(with: request)
socket.delegate = self
self.socket = socket
self.config.logger.debug("Socket created!")

socket.connect()
self.health = .connecting

socket.resume()
self.config.logger.debug("Socket connecting...")
break

Expand All @@ -274,7 +295,11 @@ public class GraphQLWebSocket: WebSocketDelegate {
}

self.config.logger.debug("Closing the connection!")
self.socket?.disconnect(closeCode: CloseCode.normalClosure)
self.socket?.cancel(with: .normalClosure, reason: nil)
self.socket = nil

self.receiveTask?.cancel()
self.receiveTask = nil
})
}

Expand Down Expand Up @@ -323,6 +348,52 @@ public class GraphQLWebSocket: WebSocketDelegate {
})
}

/// Starts receiving incoming messages.
private func receive() {
guard let socket = self.socket, case health = .connected else {
assertionFailure("Socket not ready")
return
}

assert(receiveTask?.isCancelled != true, "Task not cancelled")
receiveTask?.cancel()

receiveTask = Task { [weak self] in
while Task.isCancelled == false {
guard let self else { return }
await Task.yield()
socket.receive { result in
switch result {
case .success(.data(let data)):
self.config.logger.debug("Received 'binary' data from the server.")
guard let message = try? self.config.decoder.decode(ServerMessage.self, from: data) else {
break
}
self.dataQueue.async {
self.tick(result: .success(message))
}
case .success(.string(let string)):
self.config.logger.debug("Received 'text' data from the server.")
guard let message = try? self.config.decoder.decode(ServerMessage.self, from: Data(string.utf8)) else {
break
}
self.dataQueue.async {
self.tick(result: .success(message))
}
case .success(_):
self.config.logger.debug("Received unknown data from the server.")
break
case .failure(let error):
self.config.logger.debug("Received error from the server.: \(error)")
self.dataQueue.async {
self.tick(result: .failure(error))
}
}
}
}
}
}

/// Sends a message using the websocket transport.
private func send(message: ClientMessage) {
guard let socket = self.socket else {
Expand All @@ -335,8 +406,15 @@ public class GraphQLWebSocket: WebSocketDelegate {
switch (self.health, message) {
case (.acknowledged, _), (.disposed, _), (_, .initialise):
// We can send any message when the connection has been ACK and meta messages when the server hasn't ACK the connection yet.
socket.write(data: data)
self.config.logger.debug("\(message.description) sent to the server!")
socket.send(.data(data)) { [weak self] error in
guard let self else { return }
if let error {
self.config.logger.debug("Failed to send \(message.description) to the server: \(error)")
self.tick(result: .failure(error))
} else {
self.config.logger.debug("\(message.description) sent to the server!")
}
}

default:
self.config.logger.debug("Transport not ready, queueing message \(message.description)...")
Expand All @@ -362,21 +440,29 @@ public class GraphQLWebSocket: WebSocketDelegate {

/// Sends a ping request and starts the response timeout.
private func ping() {
self.send(message: ClientMessage.ping())
self.emitter.send(Event.ping(received: false, payload: nil))
self.config.logger.debug("Emitted a PING message!")

// We schedule a response timeout that has to be cleared
// in a timely manner by receiveing a new message from the server.
self.connectionDroppedTimer = Timer.scheduledTimer(
withTimeInterval: TimeInterval(3 * self.config.keepAlive),
repeats: true,
block: { [weak self] _ in
guard let self = self else {
return
}
self.socket?.disconnect(closeCode: CloseCode.noStatusReceived)
})
self.socket?.sendPing(pongReceiveHandler: { [weak self] error in
guard let self else { return }
if let error {
self.config.logger.debug("Failed to send a PING message: \(error)")
self.tick(result: .failure(error))
} else {
self.emitter.send(Event.ping(received: false, payload: nil))
self.config.logger.debug("Emitted a PING message!")

// We schedule a response timeout that has to be cleared
// in a timely manner by receiveing a new message from the server.
self.connectionDroppedTimer = Timer.scheduledTimer(
withTimeInterval: TimeInterval(3 * self.config.keepAlive),
repeats: true,
block: { [weak self] _ in
guard let self = self else {
return
}
self.socket?.cancel(with: .noStatusReceived, reason: nil)
})
}
})

}

/// Processes a management message from the server and forwards data message
Expand Down Expand Up @@ -421,7 +507,8 @@ public class GraphQLWebSocket: WebSocketDelegate {

// If the previous statement didn't catch this message, we got an invalid
// message before acknowledgement.
self.socket?.disconnect(closeCode: CloseCode.badResponse)
// NOTE: badResponse code (4404) not defined
self.socket?.cancel(with: .invalid, reason: nil)
return

case (_, .success(.ping(let msg))):
Expand Down Expand Up @@ -452,7 +539,8 @@ public class GraphQLWebSocket: WebSocketDelegate {
// As soon as the reading of the message fails, emit an error and stop
// processing new messages.
self.emitter.send(Event.error(err))
self.socket?.disconnect(closeCode: CloseCode.badResponse.rawValue)
// NOTE: badResponse code (4404) not defined
self.socket?.cancel(with: .invalid, reason: nil)
return
}
}
Expand Down Expand Up @@ -591,5 +679,64 @@ public class GraphQLWebSocket: WebSocketDelegate {
return results
}

// MARK: URLSessionTaskDelegate

nonisolated public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) {
dataQueue.async { [weak self] in
guard let self else { return }

assert(webSocketTask == socket)
// check if the server supports the same transport protocol
guard ClientMessage.PROTOCOL == `protocol` else {
self.close(code: UInt16(URLSessionWebSocketTask.CloseCode.protocolError.rawValue))
return
}

self.config.logger.debug("Socket connected!")

// Immediatelly notify all listeners about the health change.
self.emitter.send(Event.opened)
self.health = .connected

// Once the connection opens, we start recursively processing server messages.
if self.config.connectionAckTimeout > 0 {
self.config.logger.debug("Scheduling connection acknowledge timeout timer...")

self.ackTimer = Timer.scheduledTimer(
withTimeInterval: TimeInterval(self.config.connectionAckTimeout),
repeats: false,
block: { _ in
// NOTE: connectionAcknowledgementTimeout code (4504) not defined
self.socket?.cancel()
})
}

let payload = self.config.connectionParams()
send(message: ClientMessage.initalise(payload: payload))

// start receiving messages
self.receive()
}
}

nonisolated public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) {
// NOTE: previous implementation used hard coded abnormalClosure (1006) code
assert(closeCode == .abnormalClosure)
dataQueue.async { [weak self] in
self?.close(code: UInt16(closeCode.rawValue))
}
}

nonisolated public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
dataQueue.async { [weak self] in
guard let self else { return }
let msg = error?.localizedDescription ?? "unknown"
self.config.logger.debug("There was an error in socket (\(msg)).")

assert(error != nil)
if let error = error {
self.tick(result: .failure(error))
}
}
}
}

10 changes: 0 additions & 10 deletions Sources/GraphQLWebSocket/Extensions/WebSocket+Extensions.swift

This file was deleted.

4 changes: 3 additions & 1 deletion Tests/GraphQLWebSocketTests/ClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ final class ClientTests: XCTestCase {
let ysexpect = expectation(description: "ys complete")

let request = URLRequest(url: URL(string: "ws://127.0.0.1:4000/graphql")!)
let client = GraphQLWebSocket(request: request)
let config = GraphQLWebSocketConfiguration()
config.logger.logLevel = .debug
let client = GraphQLWebSocket(request: request, config: config)

client.onEvent()
.compactMap({ msg -> Error? in
Expand Down

0 comments on commit 08ae2b0

Please sign in to comment.