Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 230 additions & 0 deletions Sources/MCP/Base/Transports.swift
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,233 @@ public actor StdioTransport: Transport {
}
}
}

#if canImport(Network)
import Network

/// Network connection based transport implementation
public actor NetworkTransport: Transport {
private let connection: NWConnection
public nonisolated let logger: Logger

private var isConnected = false
private let messageStream: AsyncStream<String>
private let messageContinuation: AsyncStream<String>.Continuation

// Track connection state for continuations
private var connectionContinuationResumed = false

public init(connection: NWConnection, logger: Logger? = nil) {
self.connection = connection
self.logger =
logger
?? Logger(
label: "mcp.transport.network",
factory: { _ in SwiftLogNoOpLogHandler() }
)

// Create message stream
var continuation: AsyncStream<String>.Continuation!
self.messageStream = AsyncStream { continuation = $0 }
self.messageContinuation = continuation
}

/// Connects to the network transport
public func connect() async throws {
guard !isConnected else { return }

// Reset continuation state
connectionContinuationResumed = false

// Wait for connection to be ready
try await withCheckedThrowingContinuation {
[weak self] (continuation: CheckedContinuation<Void, Swift.Error>) in
guard let self = self else {
continuation.resume(throwing: MCP.Error.internalError("Transport deallocated"))
return
}

connection.stateUpdateHandler = { [weak self] state in
guard let self = self else { return }

Task { @MainActor in
switch state {
case .ready:
await self.handleConnectionReady(continuation: continuation)
case .failed(let error):
await self.handleConnectionFailed(
error: error, continuation: continuation)
case .cancelled:
await self.handleConnectionCancelled(continuation: continuation)
default:
// Wait for ready or failed state
break
}
}
}

// Start the connection if it's not already started
if connection.state != .ready {
connection.start(queue: .main)
} else {
Task { @MainActor in
await self.handleConnectionReady(continuation: continuation)
}
}
}
}

private func handleConnectionReady(continuation: CheckedContinuation<Void, Swift.Error>)
async
{
if !connectionContinuationResumed {
connectionContinuationResumed = true
isConnected = true
logger.info("Network transport connected successfully")
continuation.resume()
// Start the receive loop after connection is established
Task { await self.receiveLoop() }
}
}

private func handleConnectionFailed(
error: Swift.Error, continuation: CheckedContinuation<Void, Swift.Error>
) async {
if !connectionContinuationResumed {
connectionContinuationResumed = true
logger.error("Connection failed: \(error)")
continuation.resume(throwing: error)
}
}

private func handleConnectionCancelled(continuation: CheckedContinuation<Void, Swift.Error>)
async
{
if !connectionContinuationResumed {
connectionContinuationResumed = true
logger.warning("Connection cancelled")
continuation.resume(throwing: MCP.Error.internalError("Connection cancelled"))
}
}

public func disconnect() async {
guard isConnected else { return }
isConnected = false
connection.cancel()
messageContinuation.finish()
logger.info("Network transport disconnected")
}

public func send(_ message: String) async throws {
guard isConnected else {
throw MCP.Error.internalError("Transport not connected")
}

guard let data = (message + "\n").data(using: .utf8) else {
throw MCP.Error.internalError("Failed to encode message")
}

// Use a local actor-isolated variable to track continuation state
var sendContinuationResumed = false

try await withCheckedThrowingContinuation {
[weak self] (continuation: CheckedContinuation<Void, Swift.Error>) in
guard let self = self else {
continuation.resume(throwing: MCP.Error.internalError("Transport deallocated"))
return
}

connection.send(
content: data,
completion: .contentProcessed { [weak self] error in
guard let self = self else { return }

Task { @MainActor in
if !sendContinuationResumed {
sendContinuationResumed = true
if let error = error {
self.logger.error("Send error: \(error)")
continuation.resume(
throwing: MCP.Error.internalError("Send error: \(error)"))
} else {
continuation.resume()
}
}
}
})
}
}

public func receive() -> AsyncThrowingStream<String, Swift.Error> {
return AsyncThrowingStream { continuation in
Task {
for await message in messageStream {
continuation.yield(message)
}
continuation.finish()
}
}
}

private func receiveLoop() async {
var buffer = Data()

while isConnected && !Task.isCancelled {
do {
let newData = try await receiveData()
buffer.append(newData)

// Process complete messages
while let newlineIndex = buffer.firstIndex(of: UInt8(ascii: "\n")) {
let messageData = buffer[..<newlineIndex]
buffer = buffer[(newlineIndex + 1)...]

if let message = String(data: messageData, encoding: .utf8),
!message.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty
{
logger.debug("Message received", metadata: ["message": "\(message)"])
messageContinuation.yield(message)
}
}
} catch {
if !Task.isCancelled {
logger.error("Receive error: \(error)")
}
break
}
}

messageContinuation.finish()
}

private func receiveData() async throws -> Data {
// Use a local actor-isolated variable to track continuation state
var receiveContinuationResumed = false

return try await withCheckedThrowingContinuation {
[weak self] (continuation: CheckedContinuation<Data, Swift.Error>) in
guard let self = self else {
continuation.resume(throwing: MCP.Error.internalError("Transport deallocated"))
return
}

connection.receive(minimumIncompleteLength: 1, maximumLength: 65536) {
content, _, _, error in
Task { @MainActor in
if !receiveContinuationResumed {
receiveContinuationResumed = true
if let error = error {
continuation.resume(throwing: error)
} else if let content = content {
continuation.resume(returning: content)
} else {
continuation.resume(
throwing: MCP.Error.internalError("No data received"))
}
}
}
}
}
}
}
#endif