diff --git a/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift b/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift index c2c1c6965..aca61c595 100644 --- a/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift +++ b/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift @@ -58,8 +58,9 @@ public final class JSONRPCConnection: Connection { /// The queue on which we send data. private let sendQueue: DispatchQueue = DispatchQueue(label: "jsonrpc-send-queue", qos: .userInitiated) - private let receiveIO: DispatchIO - private let sendIO: DispatchIO + private let inFD: FileHandle + private let outFD: FileHandle + let ioGroup: DispatchGroup private let messageRegistry: MessageRegistry /// If non-nil, all input received by this `JSONRPCConnection` will be written to the file handle @@ -86,7 +87,6 @@ public final class JSONRPCConnection: Connection { /// Buffer of received bytes that haven't been parsed. /// /// Access to this must be be guaranteed to be sequential to avoid data races. Currently, all access are - /// - The `receiveIO` handler: This is synchronized on `queue`. /// - `requestBufferIsEmpty`: Also synchronized on `queue`. private nonisolated(unsafe) var requestBuffer: [UInt8] = [] @@ -136,45 +136,12 @@ public final class JSONRPCConnection: Connection { state = .created self.messageRegistry = messageRegistry - let ioGroup = DispatchGroup() - - #if os(Windows) - let rawInFD = dispatch_fd_t(bitPattern: inFD._handle) - #else - let rawInFD = inFD.fileDescriptor - #endif - - ioGroup.enter() - receiveIO = DispatchIO( - type: .stream, - fileDescriptor: rawInFD, - queue: queue, - cleanupHandler: { (error: Int32) in - if error != 0 { - logger.fault("IO error \(error)") - } - ioGroup.leave() - } - ) + self.ioGroup = DispatchGroup() - #if os(Windows) - let rawOutFD = dispatch_fd_t(bitPattern: outFD._handle) - #else - let rawOutFD = outFD.fileDescriptor - #endif - - ioGroup.enter() - sendIO = DispatchIO( - type: .stream, - fileDescriptor: rawOutFD, - queue: sendQueue, - cleanupHandler: { (error: Int32) in - if error != 0 { - logger.fault("IO error \(error)") - } - ioGroup.leave() - } - ) + self.inFD = inFD + self.outFD = outFD + + self.ioGroup.enter() ioGroup.notify(queue: queue) { [weak self] in guard let self else { return } @@ -187,13 +154,6 @@ public final class JSONRPCConnection: Connection { await self.closeHandler?() } } - - // We cannot assume the client will send us bytes in packets of any particular size, so set the lower limit to 1. - receiveIO.setLimit(lowWater: 1) - receiveIO.setLimit(highWater: Int.max) - - sendIO.setLimit(lowWater: 1) - sendIO.setLimit(highWater: Int.max) } /// Creates and starts a `JSONRPCConnection` that connects to a subprocess launched with the specified arguments. @@ -293,27 +253,19 @@ public final class JSONRPCConnection: Connection { state = .running self.receiveHandler = receiveHandler self.closeHandler = closeHandler + } - receiveIO.read(offset: 0, length: Int.max, queue: queue) { done, data, errorCode in - guard errorCode == 0 else { - #if !os(Windows) - if errorCode != POSIXError.ECANCELED.rawValue { - logger.fault("IO error reading \(errorCode)") + self.inFD.readabilityHandler = { fileHandle in + let data = fileHandle.availableData + if data.isEmpty { + fileHandle.readabilityHandler = nil + self.queue.async { + self.closeAssumingOnQueue() } - #endif - if done { self.closeAssumingOnQueue() } - return - } - - if done { - self.closeAssumingOnQueue() - return - } - - guard let data = data, !data.isEmpty else { return - } + } + self.queue.sync { orLog("Writing input mirror file") { try self.inputMirrorFile?.write(contentsOf: data) } @@ -554,16 +506,16 @@ public final class JSONRPCConnection: Connection { orLog("Writing output mirror file") { try outputMirrorFile?.write(contentsOf: dispatchData) } - sendIO.write(offset: 0, data: dispatchData, queue: sendQueue) { [weak self] done, _, errorCode in - if errorCode != 0 { - logger.fault("IO error sending message \(errorCode)") - if done, let self { - // An unrecoverable error occurs on the channel’s file descriptor. - // Close the connection. - self.queue.async { - self.closeAssumingOnQueue() - } + sendQueue.sync { + do { + try outFD.write(contentsOf: dispatchData) + } catch { + logger.fault("IO error sending message \(error.forLogging)") + self.queue.async { + self.ioGroup.leave() + self.closeAssumingOnQueue() } + return } } } @@ -646,7 +598,10 @@ public final class JSONRPCConnection: Connection { /// The user-provided close handler will be called *asynchronously* when all outstanding I/O /// operations have completed. No new I/O will be accepted after `close` returns. public func close() { - queue.sync { closeAssumingOnQueue() } + queue.sync { + closeAssumingOnQueue() + ioGroup.leave() + } } /// Close the connection, assuming that the code is already executing on `queue`. @@ -660,9 +615,12 @@ public final class JSONRPCConnection: Connection { logger.log("Closing JSONRPCConnection...") // Attempt to close the reader immediately; we do not need to accept remaining inputs. - receiveIO.close(flags: .stop) // Close the writer after it finishes outstanding work. - sendIO.close() + do { + try outFD.close() + } catch { + logger.error("Failed to close outFD: \(error.forLogging)") + } } }