From 43c936b5279644fac0e7f99805b6458eb3a061d0 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Thu, 27 Feb 2025 14:41:51 -0800 Subject: [PATCH] Add NetworkTransport --- Sources/MCP/Base/Transports.swift | 230 ++++++++++++++++++++++++++++++ 1 file changed, 230 insertions(+) diff --git a/Sources/MCP/Base/Transports.swift b/Sources/MCP/Base/Transports.swift index b9a06daa..704c0f5e 100644 --- a/Sources/MCP/Base/Transports.swift +++ b/Sources/MCP/Base/Transports.swift @@ -170,3 +170,233 @@ public actor StdioTransport: Transport { } } } + +#if canImport(Network) + import Network + + /// Network connection based transport implementation + public actor NetworkTransport: Transport { + private let connection: NWConnection + public nonisolated let logger: Logger + + private var isConnected = false + private let messageStream: AsyncStream + private let messageContinuation: AsyncStream.Continuation + + // Track connection state for continuations + private var connectionContinuationResumed = false + + public init(connection: NWConnection, logger: Logger? = nil) { + self.connection = connection + self.logger = + logger + ?? Logger( + label: "mcp.transport.network", + factory: { _ in SwiftLogNoOpLogHandler() } + ) + + // Create message stream + var continuation: AsyncStream.Continuation! + self.messageStream = AsyncStream { continuation = $0 } + self.messageContinuation = continuation + } + + /// Connects to the network transport + public func connect() async throws { + guard !isConnected else { return } + + // Reset continuation state + connectionContinuationResumed = false + + // Wait for connection to be ready + try await withCheckedThrowingContinuation { + [weak self] (continuation: CheckedContinuation) in + guard let self = self else { + continuation.resume(throwing: MCP.Error.internalError("Transport deallocated")) + return + } + + connection.stateUpdateHandler = { [weak self] state in + guard let self = self else { return } + + Task { @MainActor in + switch state { + case .ready: + await self.handleConnectionReady(continuation: continuation) + case .failed(let error): + await self.handleConnectionFailed( + error: error, continuation: continuation) + case .cancelled: + await self.handleConnectionCancelled(continuation: continuation) + default: + // Wait for ready or failed state + break + } + } + } + + // Start the connection if it's not already started + if connection.state != .ready { + connection.start(queue: .main) + } else { + Task { @MainActor in + await self.handleConnectionReady(continuation: continuation) + } + } + } + } + + private func handleConnectionReady(continuation: CheckedContinuation) + async + { + if !connectionContinuationResumed { + connectionContinuationResumed = true + isConnected = true + logger.info("Network transport connected successfully") + continuation.resume() + // Start the receive loop after connection is established + Task { await self.receiveLoop() } + } + } + + private func handleConnectionFailed( + error: Swift.Error, continuation: CheckedContinuation + ) async { + if !connectionContinuationResumed { + connectionContinuationResumed = true + logger.error("Connection failed: \(error)") + continuation.resume(throwing: error) + } + } + + private func handleConnectionCancelled(continuation: CheckedContinuation) + async + { + if !connectionContinuationResumed { + connectionContinuationResumed = true + logger.warning("Connection cancelled") + continuation.resume(throwing: MCP.Error.internalError("Connection cancelled")) + } + } + + public func disconnect() async { + guard isConnected else { return } + isConnected = false + connection.cancel() + messageContinuation.finish() + logger.info("Network transport disconnected") + } + + public func send(_ message: String) async throws { + guard isConnected else { + throw MCP.Error.internalError("Transport not connected") + } + + guard let data = (message + "\n").data(using: .utf8) else { + throw MCP.Error.internalError("Failed to encode message") + } + + // Use a local actor-isolated variable to track continuation state + var sendContinuationResumed = false + + try await withCheckedThrowingContinuation { + [weak self] (continuation: CheckedContinuation) in + guard let self = self else { + continuation.resume(throwing: MCP.Error.internalError("Transport deallocated")) + return + } + + connection.send( + content: data, + completion: .contentProcessed { [weak self] error in + guard let self = self else { return } + + Task { @MainActor in + if !sendContinuationResumed { + sendContinuationResumed = true + if let error = error { + self.logger.error("Send error: \(error)") + continuation.resume( + throwing: MCP.Error.internalError("Send error: \(error)")) + } else { + continuation.resume() + } + } + } + }) + } + } + + public func receive() -> AsyncThrowingStream { + return AsyncThrowingStream { continuation in + Task { + for await message in messageStream { + continuation.yield(message) + } + continuation.finish() + } + } + } + + private func receiveLoop() async { + var buffer = Data() + + while isConnected && !Task.isCancelled { + do { + let newData = try await receiveData() + buffer.append(newData) + + // Process complete messages + while let newlineIndex = buffer.firstIndex(of: UInt8(ascii: "\n")) { + let messageData = buffer[.. Data { + // Use a local actor-isolated variable to track continuation state + var receiveContinuationResumed = false + + return try await withCheckedThrowingContinuation { + [weak self] (continuation: CheckedContinuation) in + guard let self = self else { + continuation.resume(throwing: MCP.Error.internalError("Transport deallocated")) + return + } + + connection.receive(minimumIncompleteLength: 1, maximumLength: 65536) { + content, _, _, error in + Task { @MainActor in + if !receiveContinuationResumed { + receiveContinuationResumed = true + if let error = error { + continuation.resume(throwing: error) + } else if let content = content { + continuation.resume(returning: content) + } else { + continuation.resume( + throwing: MCP.Error.internalError("No data received")) + } + } + } + } + } + } + } +#endif