From 5948b78b03f3cb5e30496c6197f9b485db0a7aee Mon Sep 17 00:00:00 2001 From: Helge Hess Date: Fri, 10 Nov 2017 16:16:20 +0100 Subject: [PATCH 01/14] Drop PoCSocket, and the HTTPStreamingParser The little socket stuff we need we can do directly in the code. No need to wrap such basics. The StreamingParser was some combination of response writer and parser. Those are really distinct things (the writer doesn't really need to know about the parser, and the reverse). --- Sources/HTTP/HTTPStreamingParser.swift | 523 ------------------ Sources/HTTP/PoCSocket/PoCSocket.swift | 273 --------- .../PoCSocketConnectionListener.swift | 323 ----------- .../PoCSocket/PoCSocketSimpleServer.swift | 194 ------- 4 files changed, 1313 deletions(-) delete mode 100644 Sources/HTTP/HTTPStreamingParser.swift delete mode 100644 Sources/HTTP/PoCSocket/PoCSocket.swift delete mode 100644 Sources/HTTP/PoCSocket/PoCSocketConnectionListener.swift delete mode 100644 Sources/HTTP/PoCSocket/PoCSocketSimpleServer.swift diff --git a/Sources/HTTP/HTTPStreamingParser.swift b/Sources/HTTP/HTTPStreamingParser.swift deleted file mode 100644 index 5ed1103..0000000 --- a/Sources/HTTP/HTTPStreamingParser.swift +++ /dev/null @@ -1,523 +0,0 @@ -// This source file is part of the Swift.org Server APIs open source project -// -// Copyright (c) 2017 Swift Server API project authors -// Licensed under Apache License v2.0 with Runtime Library Exception -// -// See http://swift.org/LICENSE.txt for license information -// - -import CHTTPParser -import Foundation -import Dispatch - -/// Class that wraps the CHTTPParser and calls the `HTTPRequestHandler` to get the response -/// :nodoc: -public class StreamingParser: HTTPResponseWriter { - - let handle: HTTPRequestHandler - - /// Time to leave socket open waiting for next request to start - public let keepAliveTimeout: TimeInterval - - /// Flag to track if the client wants to send consecutive requests on the same TCP connection - var clientRequestedKeepAlive = false - - /// Tracks when socket should be closed. Needs to have a lock, since it's updated often - private let _keepAliveUntilLock = DispatchSemaphore(value: 1) - private var _keepAliveUntil: TimeInterval? - public var keepAliveUntil: TimeInterval? { - get { - _keepAliveUntilLock.wait() - defer { - _keepAliveUntilLock.signal() - } - return _keepAliveUntil - } - set { - _keepAliveUntilLock.wait() - defer { - _keepAliveUntilLock.signal() - } - _keepAliveUntil = newValue - } - } - - /// Optional delegate that can tell us how many connections are in-flight. - public weak var connectionCounter: CurrentConnectionCounting? - - /// Holds the bytes that come from the CHTTPParser until we have enough of them to do something with it - var parserBuffer: Data? - - /// HTTP Parser - var httpParser = http_parser() - var httpParserSettings = http_parser_settings() - - /// Block that takes a chunk from the HTTPParser as input and writes to a Response as a result - var httpBodyProcessingCallback: HTTPBodyProcessing? - - //Note: we want this to be strong so it holds onto the connector until it's explicitly cleared - /// Protocol that we use to send data (and status info) back to the Network layer - public var parserConnector: ParserConnecting? - - ///Flag to track whether our handler has told us not to call it anymore - private let _shouldStopProcessingBodyLock = DispatchSemaphore(value: 1) - private var _shouldStopProcessingBody: Bool = false - private var shouldStopProcessingBody: Bool { - get { - _shouldStopProcessingBodyLock.wait() - defer { - _shouldStopProcessingBodyLock.signal() - } - return _shouldStopProcessingBody - } - set { - _shouldStopProcessingBodyLock.wait() - defer { - _shouldStopProcessingBodyLock.signal() - } - _shouldStopProcessingBody = newValue - } - } - - var lastCallBack = CallbackRecord.idle - var lastHeaderName: String? - var parsedHeaders = HTTPHeaders() - var parsedHTTPMethod: HTTPMethod? - var parsedHTTPVersion: HTTPVersion? - var parsedURL: String? - - /// Is the currently parsed request an upgrade request? - public private(set) var upgradeRequested = false - - /// Class that wraps the CHTTPParser and calls the `HTTPRequestHandler` to get the response - /// - /// - Parameter handler: function that is used to create the response - public init(handler: @escaping HTTPRequestHandler, connectionCounter: CurrentConnectionCounting? = nil, keepAliveTimeout: Double = 5.0) { - self.handle = handler - self.connectionCounter = connectionCounter - self.keepAliveTimeout = keepAliveTimeout - - //Set up all the callbacks for the CHTTPParser library - httpParserSettings.on_message_begin = { parser -> Int32 in - guard let listener = StreamingParser.getSelf(parser: parser) else { - return Int32(0) - } - return listener.messageBegan() - } - - httpParserSettings.on_message_complete = { parser -> Int32 in - guard let listener = StreamingParser.getSelf(parser: parser) else { - return 0 - } - return listener.messageCompleted() - } - - httpParserSettings.on_headers_complete = { parser -> Int32 in - guard let listener = StreamingParser.getSelf(parser: parser) else { - return 0 - } - let methodId = parser?.pointee.method - let methodName = String(validatingUTF8: http_method_str(http_method(rawValue: methodId ?? 0))) ?? "GET" - let major = Int(parser?.pointee.http_major ?? 0) - let minor = Int(parser?.pointee.http_minor ?? 0) - - //This needs to be set here and not messageCompleted if it's going to work here - let keepAlive = http_should_keep_alive(parser) == 1 - let upgradeRequested = parser?.pointee.upgrade == 1 - - return listener.headersCompleted(methodName: methodName, - majorVersion: major, - minorVersion: minor, - keepAlive: keepAlive, - upgrade: upgradeRequested) - } - - httpParserSettings.on_header_field = { (parser, chunk, length) -> Int32 in - guard let listener = StreamingParser.getSelf(parser: parser) else { - return 0 - } - return listener.headerFieldReceived(data: chunk, length: length) - } - - httpParserSettings.on_header_value = { (parser, chunk, length) -> Int32 in - guard let listener = StreamingParser.getSelf(parser: parser) else { - return 0 - } - return listener.headerValueReceived(data: chunk, length: length) - } - - httpParserSettings.on_body = { (parser, chunk, length) -> Int32 in - guard let listener = StreamingParser.getSelf(parser: parser) else { - return 0 - } - return listener.bodyReceived(data: chunk, length: length) - } - - httpParserSettings.on_url = { (parser, chunk, length) -> Int32 in - guard let listener = StreamingParser.getSelf(parser: parser) else { - return 0 - } - return listener.urlReceived(data: chunk, length: length) - } - - http_parser_init(&httpParser, HTTP_REQUEST) - - self.httpParser.data = Unmanaged.passUnretained(self).toOpaque() - } - - /// Read a stream from the network, pass it to the parser and return number of bytes consumed - /// - /// - Parameter data: data coming from network - /// - Returns: number of bytes that we sent to the parser - public func readStream(data: Data) -> Int { - return data.withUnsafeBytes { (ptr) -> Int in - return http_parser_execute(&self.httpParser, &self.httpParserSettings, ptr, data.count) - } - } - - /// States to track where we are in parsing the HTTP Stream from the client - enum CallbackRecord { - case idle, messageBegan, messageCompleted, headersCompleted, headerFieldReceived, headerValueReceived, bodyReceived, urlReceived - } - - /// Process change of state as we get more and more parser callbacks - /// - /// - Parameter currentCallBack: state we are entering, as specified by the CHTTPParser - /// - Returns: Whether or not the state actually changed - @discardableResult - func processCurrentCallback(_ currentCallBack: CallbackRecord) -> Bool { - if lastCallBack == currentCallBack { - return false - } - switch lastCallBack { - case .headerFieldReceived: - if let parserBuffer = self.parserBuffer { - self.lastHeaderName = String(data: parserBuffer, encoding: .utf8) - self.parserBuffer = nil - } else { - print("Missing parserBuffer after \(lastCallBack)") - } - case .headerValueReceived: - if let parserBuffer = self.parserBuffer, - let lastHeaderName = self.lastHeaderName, - let headerValue = String(data: parserBuffer, encoding: .utf8) { - self.parsedHeaders.append([HTTPHeaders.Name(lastHeaderName): headerValue]) - self.lastHeaderName = nil - self.parserBuffer = nil - } else { - print("Missing parserBuffer after \(lastCallBack)") - } - case .headersCompleted: - self.parserBuffer = nil - - if !upgradeRequested { - self.httpBodyProcessingCallback = self.handle(self.createRequest(), self) - } - case .urlReceived: - if let parserBuffer = self.parserBuffer { - //Under heaptrack, this may appear to leak via _CFGetTSDCreateIfNeeded, - // apparently, that's because it triggers thread metadata to be created - self.parsedURL = String(data: parserBuffer, encoding: .utf8) - self.parserBuffer = nil - } else { - print("Missing parserBuffer after \(lastCallBack)") - } - case .idle: - break - case .messageBegan: - break - case .messageCompleted: - break - case .bodyReceived: - break - } - lastCallBack = currentCallBack - return true - } - - func messageBegan() -> Int32 { - processCurrentCallback(.messageBegan) - self.parserConnector?.responseBeginning() - return 0 - } - - func messageCompleted() -> Int32 { - let didChangeState = processCurrentCallback(.messageCompleted) - if let chunkHandler = self.httpBodyProcessingCallback, didChangeState { - var dummy = false //We're sending `.end`, which means processing is stopping anyway, so the bool here is pointless - switch chunkHandler { - case .processBody(let handler): - handler(.end, &dummy) - case .discardBody: - done() - } - } - return 0 - } - - func headersCompleted(methodName: String, - majorVersion: Int, - minorVersion: Int, - keepAlive: Bool, - upgrade: Bool) -> Int32 { - processCurrentCallback(.headersCompleted) - self.parsedHTTPMethod = HTTPMethod(methodName) - self.parsedHTTPVersion = HTTPVersion(major: majorVersion, minor: minorVersion) - - //This needs to be set here and not messageCompleted if it's going to work here - self.clientRequestedKeepAlive = keepAlive - self.keepAliveUntil = Date(timeIntervalSinceNow: keepAliveTimeout).timeIntervalSinceReferenceDate - self.upgradeRequested = upgrade - return 0 - } - - func headerFieldReceived(data: UnsafePointer?, length: Int) -> Int32 { - processCurrentCallback(.headerFieldReceived) - guard let data = data else { return 0 } - data.withMemoryRebound(to: UInt8.self, capacity: length) { (ptr) -> Void in - if var parserBuffer = parserBuffer { - parserBuffer.append(ptr, count: length) - } else { - parserBuffer = Data(bytes: data, count: length) - } - } - return 0 - } - - func headerValueReceived(data: UnsafePointer?, length: Int) -> Int32 { - processCurrentCallback(.headerValueReceived) - guard let data = data else { return 0 } - data.withMemoryRebound(to: UInt8.self, capacity: length) { (ptr) -> Void in - if var parserBuffer = parserBuffer { - parserBuffer.append(ptr, count: length) - } else { - parserBuffer = Data(bytes: data, count: length) - } - } - return 0 - } - - func bodyReceived(data: UnsafePointer?, length: Int) -> Int32 { - processCurrentCallback(.bodyReceived) - guard let data = data else { return 0 } - if shouldStopProcessingBody { - return 0 - } - data.withMemoryRebound(to: UInt8.self, capacity: length) { (ptr) -> Void in - #if swift(>=4.0) - let buff = UnsafeRawBufferPointer(start: ptr, count: length) - #else - let buff = UnsafeBufferPointer(start: ptr, count: length) - #endif - let chunk = DispatchData(bytes: buff) - if let chunkHandler = self.httpBodyProcessingCallback { - switch chunkHandler { - case .processBody(let handler): - //OK, this sucks. We can't access the value of the `inout` inside this block - // due to exclusivity. Which means that if we were to pass a local variable, we'd - // have to put a semaphore or something up here to wait for the block to be done before - // we could get its value and pass that on to the instance variable. So instead, we're - // just passing in a pointer to the internal ivar. But that ivar can't be modified in - // more than one place, so we have to put a semaphore around it to prevent that. - _shouldStopProcessingBodyLock.wait() - handler(.chunk(data: chunk, finishedProcessing: {self._shouldStopProcessingBodyLock.signal()}), &_shouldStopProcessingBody) - case .discardBody: - break - } - } - } - return 0 - } - - func urlReceived(data: UnsafePointer?, length: Int) -> Int32 { - processCurrentCallback(.urlReceived) - guard let data = data else { return 0 } - data.withMemoryRebound(to: UInt8.self, capacity: length) { (ptr) -> Void in - if var parserBuffer = parserBuffer { - parserBuffer.append(ptr, count: length) - } else { - parserBuffer = Data(bytes: data, count: length) - } - } - return 0 - } - - static func getSelf(parser: UnsafeMutablePointer?) -> StreamingParser? { - guard let pointee = parser?.pointee.data else { return nil } - return Unmanaged.fromOpaque(pointee).takeUnretainedValue() - } - - var headersWritten = false - var isChunked = false - - /// Create a `HTTPRequest` struct from the parsed information - public func createRequest() -> HTTPRequest { - return HTTPRequest(method: parsedHTTPMethod!, - target: parsedURL!, - httpVersion: parsedHTTPVersion!, - headers: parsedHeaders) - } - - public func writeHeader(status: HTTPResponseStatus, headers: HTTPHeaders, completion: @escaping (Result) -> Void) { - - guard !headersWritten else { - return - } - - var header = "HTTP/1.1 \(status.code) \(status.reasonPhrase)\r\n" - - let isContinue = status == .continue - - var headers = headers - if !isContinue { - adjustHeaders(status: status, headers: &headers) - } - - for (key, value) in headers { - // TODO encode value using [RFC5987] - header += "\(key): \(value)\r\n" - } - header.append("\r\n") - - // FIXME headers are US-ASCII, anything else should be encoded using [RFC5987] some lines above - // TODO use requested encoding if specified - if let data = header.data(using: .utf8) { - self.parserConnector?.queueSocketWrite(data, completion: completion) - if !isContinue { - headersWritten = true - } - } else { - //TODO handle encoding error - } - } - - func adjustHeaders(status: HTTPResponseStatus, headers: inout HTTPHeaders) { - for header in status.suppressedHeaders { - headers[header] = nil - } - - if headers[.contentLength] != nil { - headers[.transferEncoding] = "identity" - } else if parsedHTTPVersion! >= HTTPVersion(major: 1, minor: 1) { - switch headers[.transferEncoding] { - case .some("identity"): // identity without content-length - clientRequestedKeepAlive = false - case .some("chunked"): - isChunked = true - default: - isChunked = true - headers[.transferEncoding] = "chunked" - } - } else { - // HTTP 1.0 does not support chunked - clientRequestedKeepAlive = false - headers[.transferEncoding] = nil - } - - if clientRequestedKeepAlive { - headers[.connection] = "Keep-Alive" - } else { - headers[.connection] = "Close" - } - } - - public func writeTrailer(_ trailers: HTTPHeaders, completion: @escaping (Result) -> Void) { - fatalError("Not implemented") - } - - public func writeBody(_ data: UnsafeHTTPResponseBody, completion: @escaping (Result) -> Void) { - guard headersWritten else { - //TODO error or default headers? - return - } - - guard data.withUnsafeBytes({ $0.count > 0 }) else { - completion(.ok) - return - } - - let dataToWrite: Data - if isChunked { - dataToWrite = data.withUnsafeBytes { - let chunkStart = (String($0.count, radix: 16) + "\r\n").data(using: .utf8)! - var dataToWrite = chunkStart - dataToWrite.append(UnsafeBufferPointer(start: $0.baseAddress?.assumingMemoryBound(to: UInt8.self), count: $0.count)) - let chunkEnd = "\r\n".data(using: .utf8)! - dataToWrite.append(chunkEnd) - return dataToWrite - } - } else if let data = data as? Data { - dataToWrite = data - } else { - dataToWrite = data.withUnsafeBytes { Data($0) } - } - - self.parserConnector?.queueSocketWrite(dataToWrite, completion: completion) - } - - public func done(completion: @escaping (Result) -> Void) { - if isChunked { - let chunkTerminate = "0\r\n\r\n".data(using: .utf8)! - self.parserConnector?.queueSocketWrite(chunkTerminate, completion: completion) - } - - self.parsedHTTPMethod = nil - self.parsedURL = nil - self.parsedHeaders = HTTPHeaders() - self.lastHeaderName = nil - self.parserBuffer = nil - self.parsedHTTPMethod = nil - self.parsedHTTPVersion = nil - self.lastCallBack = .idle - self.headersWritten = false - self.httpBodyProcessingCallback = nil - self.upgradeRequested = false - self.shouldStopProcessingBody = false - - //Note: This used to be passed into the completion block that `Result` used to have - // But since that block was removed, we're calling it directly - if self.clientRequestedKeepAlive { - self.keepAliveUntil = Date(timeIntervalSinceNow: keepAliveTimeout).timeIntervalSinceReferenceDate - self.parserConnector?.responseComplete() - } else { - self.parserConnector?.responseCompleteCloseWriter() - } - completion(.ok) - } - - public func abort() { - fatalError("abort called, not sure what to do with it") - } - - deinit { - httpParser.data = nil - } - -} - -/// Protocol implemented by the thing that sits in between us and the network layer -/// :nodoc: -public protocol ParserConnecting: class { - /// Send data to the network do be written to the client - func queueSocketWrite(_ from: Data, completion: @escaping (Result) -> Void) - - /// Let the network know that a response has started to avoid closing a connection during a slow write - func responseBeginning() - - /// Let the network know that a response is complete, so it can be closed after timeout - func responseComplete() - - /// Let the network know that a response is complete and we're ready to close the connection - func responseCompleteCloseWriter() - - /// Used to let the network know we're ready to close the connection - func closeWriter() -} - -/// Delegate that can tell us how many connections are in-flight so we can set the Keep-Alive header -/// to the correct number of available connections -/// :nodoc: -public protocol CurrentConnectionCounting: class { - /// Current number of active connections - var connectionCount: Int { get } -} diff --git a/Sources/HTTP/PoCSocket/PoCSocket.swift b/Sources/HTTP/PoCSocket/PoCSocket.swift deleted file mode 100644 index 8b23e50..0000000 --- a/Sources/HTTP/PoCSocket/PoCSocket.swift +++ /dev/null @@ -1,273 +0,0 @@ -// This source file is part of the Swift.org Server APIs open source project -// -// Copyright (c) 2017 Swift Server API project authors -// Licensed under Apache License v2.0 with Runtime Library Exception -// -// See http://swift.org/LICENSE.txt for license information -// - -import Foundation -import Dispatch - -///:nodoc: -public enum PoCSocketError: Error { - case SocketOSError(errno: Int32) - case InvalidSocketError - case InvalidReadLengthError - case InvalidWriteLengthError - case InvalidBufferError -} - -/// Simple Wrapper around the `socket(2)` functions we need for Proof of Concept testing -/// Intentionally a thin layer over `recv(2)`/`send(2)` so uses the same argument types. -/// Note that no method names here are the same as any system call names. -/// This is because we expect the caller might need functionality we haven't implemented here. -internal class PoCSocket { - - /// hold the file descriptor for the socket supplied by the OS. `-1` is invalid socket - internal var socketfd: Int32 = -1 - - /// The TCP port the server is actually listening on. Set after system call completes - internal var listeningPort: Int32 = -1 - - /// Track state between `listen(2)` and `shutdown(2)` - internal private(set) var isListening = false - - /// Track state between `accept(2)/bind(2)` and `close(2)` - internal private(set) var isConnected = false - - /// track whether a shutdown is in progress so we can suppress error messages - private let _isShuttingDownLock = DispatchSemaphore(value: 1) - private var _isShuttingDown: Bool = false - private var isShuttingDown: Bool { - get { - _isShuttingDownLock.wait() - defer { - _isShuttingDownLock.signal() - } - return _isShuttingDown - } - set { - _isShuttingDownLock.wait() - defer { - _isShuttingDownLock.signal() - } - _isShuttingDown = newValue - } - } - - /// Call recv(2) with buffer allocated by our caller and return the output - /// - /// - Parameters: - /// - readBuffer: Buffer to read into. Note this needs to be `inout` because we're modfying it and we want Swift4+'s ownership checks to make sure no one else is at the same time - /// - maxLength: Max length that can be read. Buffer *must* be at least this big!!! - /// - Returns: Number of bytes read or -1 on failure as per `recv(2)` - /// - Throws: PoCSocketError if sanity checks fail - internal func socketRead(into readBuffer: inout UnsafeMutablePointer, maxLength: Int) throws -> Int { - if maxLength <= 0 || maxLength > Int(Int32.max) { - throw PoCSocketError.InvalidReadLengthError - } - if socketfd <= 0 { - throw PoCSocketError.InvalidSocketError - } - - //Make sure no one passed a nil pointer to us - let readBufferPointer: UnsafeMutablePointer! = readBuffer - if readBufferPointer == nil { - throw PoCSocketError.InvalidBufferError - } - - //Make sure data isn't re-used - readBuffer.initialize(to: 0x0, count: maxLength) - - let read = recv(self.socketfd, readBuffer, maxLength, Int32(0)) - //Leave this as a local variable to facilitate Setting a Watchpoint in lldb - return read - } - - /// Pass buffer passed into to us into send(2). - /// - /// - Parameters: - /// - buffer: buffer containing data to write. - /// - bufSize: number of bytes to write. Buffer must be this long - /// - Returns: number of bytes written or -1. See `send(2)` - /// - Throws: PoCSocketError if sanity checks fail - @discardableResult internal func socketWrite(from buffer: UnsafeRawPointer, bufSize: Int) throws -> Int { - if socketfd <= 0 { - throw PoCSocketError.InvalidSocketError - } - if bufSize < 0 || bufSize > Int(Int32.max) { - throw PoCSocketError.InvalidWriteLengthError - } - - //Make sure we weren't handed a nil buffer - let writeBufferPointer: UnsafeRawPointer! = buffer - if writeBufferPointer == nil { - throw PoCSocketError.InvalidBufferError - } - - let sent = send(self.socketfd, buffer, Int(bufSize), Int32(0)) - //Leave this as a local variable to facilitate Setting a Watchpoint in lldb - return sent - } - - /// Calls `shutdown(2)` and `close(2)` on a socket - internal func shutdownAndClose() { - self.isShuttingDown = true - if socketfd < 1 { - //Nothing to do. Maybe it was closed already - return - } - //print("Shutting down socket \(self.socketfd)") - if self.isListening || self.isConnected { - //print("Shutting down socket") - _ = shutdown(self.socketfd, Int32(SHUT_RDWR)) - self.isListening = false - } - self.isConnected = false - close(self.socketfd) - } - - /// Thin wrapper around `accept(2)` - /// - /// - Returns: PoCSocket object for newly connected socket or nil if we've been told to shutdown - /// - Throws: PoCSocketError on sanity check fails or if accept fails after several retries - internal func acceptClientConnection() throws -> PoCSocket? { - if socketfd <= 0 || !isListening { - throw PoCSocketError.InvalidSocketError - } - - let retVal = PoCSocket() - - var maxRetryCount = 100 - - var acceptFD: Int32 = -1 - repeat { - var acceptAddr = sockaddr_in() - var addrSize = socklen_t(MemoryLayout.size) - - acceptFD = withUnsafeMutablePointer(to: &acceptAddr) { pointer in - return accept(self.socketfd, UnsafeMutableRawPointer(pointer).assumingMemoryBound(to: sockaddr.self), &addrSize) - } - if acceptFD < 0 && errno != EINTR { - //fail - if (isShuttingDown) { - return nil - } - maxRetryCount = maxRetryCount - 1 - print("Could not accept on socket \(socketfd). Error is \(errno). Will retry.") - } - } - while acceptFD < 0 && maxRetryCount > 0 - - if acceptFD < 0 { - throw PoCSocketError.SocketOSError(errno: errno) - } - - retVal.isConnected = true - retVal.socketfd = acceptFD - - return retVal - } - - /// call `bind(2)` and `listen(2)` - /// - /// - Parameters: - /// - port: `sin_port` value, see `bind(2)` - /// - maxBacklogSize: backlog argument to `listen(2)` - /// - Throws: PoCSocketError - internal func bindAndListen(on port: Int = 0, maxBacklogSize: Int32 = 100) throws { - #if os(Linux) - socketfd = socket(Int32(AF_INET), Int32(SOCK_STREAM.rawValue), Int32(IPPROTO_TCP)) - #else - socketfd = socket(Int32(AF_INET), Int32(SOCK_STREAM), Int32(IPPROTO_TCP)) - #endif - - if socketfd <= 0 { - throw PoCSocketError.InvalidSocketError - } - - var on: Int32 = 1 - // Allow address reuse - if setsockopt(self.socketfd, SOL_SOCKET, SO_REUSEADDR, &on, socklen_t(MemoryLayout.size)) < 0 { - throw PoCSocketError.SocketOSError(errno: errno) - } - - // Allow port reuse - if setsockopt(self.socketfd, SOL_SOCKET, SO_REUSEPORT, &on, socklen_t(MemoryLayout.size)) < 0 { - throw PoCSocketError.SocketOSError(errno: errno) - } - - #if os(Linux) - var addr = sockaddr_in( - sin_family: sa_family_t(AF_INET), - sin_port: htons(UInt16(port)), - sin_addr: in_addr(s_addr: in_addr_t(0)), - sin_zero: (0, 0, 0, 0, 0, 0, 0, 0)) - #else - var addr = sockaddr_in( - sin_len: UInt8(MemoryLayout.stride), - sin_family: UInt8(AF_INET), - sin_port: (Int(OSHostByteOrder()) != OSLittleEndian ? UInt16(port) : _OSSwapInt16(UInt16(port))), - sin_addr: in_addr(s_addr: in_addr_t(0)), - sin_zero: (0, 0, 0, 0, 0, 0, 0, 0)) - #endif - - _ = withUnsafePointer(to: &addr) { - bind(self.socketfd, UnsafePointer(OpaquePointer($0)), socklen_t(MemoryLayout.size)) - } - - //print("bindResult is \(bindResult)") - - _ = listen(self.socketfd, maxBacklogSize) - - isListening = true - - //print("listenResult is \(listenResult)") - - var addr_in = sockaddr_in() - - listeningPort = try withUnsafePointer(to: &addr_in) { pointer in - var len = socklen_t(MemoryLayout.size) - if getsockname(socketfd, UnsafeMutablePointer(OpaquePointer(pointer)), &len) != 0 { - throw PoCSocketError.SocketOSError(errno: errno) - } - #if os(Linux) - return Int32(ntohs(addr_in.sin_port)) - #else - return Int32(Int(OSHostByteOrder()) != OSLittleEndian ? addr_in.sin_port.littleEndian : addr_in.sin_port.bigEndian) - #endif - } - - //print("listeningPort is \(listeningPort)") - } - - /// Check to see if socket is being used - /// - /// - Returns: whether socket is listening or connected - internal func isOpen() -> Bool { - return isListening || isConnected - } - - /// Sets the socket to Blocking or non-blocking mode. - /// - /// - Parameter mode: true for blocking, false for nonBlocking - /// - Returns: `fcntl(2)` flags - /// - Throws: PoCSocketError if `fcntl` fails - @discardableResult internal func setBlocking(mode: Bool) throws -> Int32 { - let flags = fcntl(self.socketfd, F_GETFL) - if flags < 0 { - //Failed - throw PoCSocketError.SocketOSError(errno: errno) - } - - let newFlags = mode ? flags & ~O_NONBLOCK : flags | O_NONBLOCK - - let result = fcntl(self.socketfd, F_SETFL, newFlags) - if result < 0 { - //Failed - throw PoCSocketError.SocketOSError(errno: errno) - } - return result - } -} diff --git a/Sources/HTTP/PoCSocket/PoCSocketConnectionListener.swift b/Sources/HTTP/PoCSocket/PoCSocketConnectionListener.swift deleted file mode 100644 index 8e54030..0000000 --- a/Sources/HTTP/PoCSocket/PoCSocketConnectionListener.swift +++ /dev/null @@ -1,323 +0,0 @@ -// This source file is part of the Swift.org Server APIs open source project -// -// Copyright (c) 2017 Swift Server API project authors -// Licensed under Apache License v2.0 with Runtime Library Exception -// -// See http://swift.org/LICENSE.txt for license information -// - -import Foundation -import Dispatch - -///:nodoc: -public class PoCSocketConnectionListener: ParserConnecting { - - ///socket(2) wrapper object - var socket: PoCSocket? - - ///ivar for the thing that manages the CHTTP Parser - var parser: StreamingParser? - - ///Save the socket file descriptor so we can loook at it for debugging purposes - var socketFD: Int32 - var shouldShutdown: Bool = false - - /// Queues for managing access to the socket without blocking the world - let socketReaderQueue: DispatchQueue - let socketWriterQueue: DispatchQueue - - ///Event handler for reading from the socket - private var readerSource: DispatchSourceRead? - - ///Flag to track whether we're in the middle of a response or not (with lock) - private let _responseCompletedLock = DispatchSemaphore(value: 1) - private var _responseCompleted: Bool = false - var responseCompleted: Bool { - get { - _responseCompletedLock.wait() - defer { - _responseCompletedLock.signal() - } - return _responseCompleted - } - set { - _responseCompletedLock.wait() - defer { - _responseCompletedLock.signal() - } - _responseCompleted = newValue - } - } - - ///Flag to track whether we've received a socket error or not (with lock) - private let _errorOccurredLock = DispatchSemaphore(value: 1) - private var _errorOccurred: Bool = false - var errorOccurred: Bool { - get { - _errorOccurredLock.wait() - defer { - _errorOccurredLock.signal() - } - return _errorOccurred - } - set { - _errorOccurredLock.wait() - defer { - _errorOccurredLock.signal() - } - _errorOccurred = newValue - } - } - - ///Largest number of bytes we're willing to allocate for a Read - // it's an anti-heartbleed-type paranoia check - private var maxReadLength: Int = 1048576 - - /// initializer - /// - /// - Parameters: - /// - socket: thin PoCSocket wrapper around system calls - /// - parser: Manager of the CHTTPParser library - internal init(socket: PoCSocket, parser: StreamingParser, readQueue: DispatchQueue, writeQueue: DispatchQueue, maxReadLength: Int = 0) { - self.socket = socket - socketFD = socket.socketfd - socketReaderQueue = readQueue - socketWriterQueue = writeQueue - self.parser = parser - parser.parserConnector = self - if maxReadLength > 0 { - self.maxReadLength = maxReadLength - } - } - - /// Check if socket is still open. Used to decide whether it should be closed/pruned after timeout - public var isOpen: Bool { - guard let socket = self.socket else { - return false - } - return socket.isOpen() - } - - /// Close the socket and free up memory unless we're in the middle of a request - func close() { - self.shouldShutdown = true - - if !self.responseCompleted && !self.errorOccurred { - return - } - if (self.socket?.socketfd ?? -1) > 0 { - self.socket?.shutdownAndClose() - } - - //In a perfect world, we wouldn't have to clean this all up explicitly, - // but KDE/heaptrack informs us we're in far from a perfect world - - if !(self.readerSource?.isCancelled ?? true) { - /* - OK, so later macOS wants `cancel()` to be called from inside the readerSource, - otherwise, there's a very intermittent thread-dependent crash, (ask me how I know) - so in that case, we set a Bool variable and call `activate()`. Older macOS doesn't - have `activate()` so we call back to calling `cancel()` directly. - - Linux *DOES* have activate(), but it doesn't seem to do anything at present, so we call `cancel()` - directly in that case, too (Although I suspect that might need to change in future releases). - */ - #if os(Linux) - // Call Cancel directory on Linux - self.readerSource?.cancel() - self.cleanup() - #else - if #available(OSX 10.12, *) { - //Set Flag and Activate the readerSource so it can run `cancel()` for us - self.shouldShutdown = true - self.readerSource?.activate() - } else { - // Fallback on earlier versions - self.readerSource?.cancel() - self.cleanup() - } - #endif - } - } - - /// Called by the parser to let us know that it's done with this socket - public func closeWriter() { - self.socketWriterQueue.async { [weak self] in - if self?.readerSource?.isCancelled ?? true { - self?.close() - } - } - } - - /// Check if the socket is idle, and if so, call close() - func closeIfIdleSocket() { - if !self.responseCompleted { - //We're in the middle of a connection - we're not idle - return - } - let now = Date().timeIntervalSinceReferenceDate - if let keepAliveUntil = parser?.keepAliveUntil, now >= keepAliveUntil { - print("Closing idle socket \(socketFD)") - close() - } - } - - func cleanup() { - self.readerSource?.setEventHandler(handler: nil) - self.readerSource?.setCancelHandler(handler: nil) - - self.readerSource = nil - self.socket = nil - self.parser?.parserConnector = nil //allows for memory to be reclaimed - self.parser = nil - } - - /// Called by the parser to let us know that a response has started being created - public func responseBeginning() { - self.responseCompleted = false - } - - /// Called by the parser to let us know that a response is complete, and we can close after timeout - public func responseComplete() { - self.responseCompleted = true - self.socketWriterQueue.async { [weak self] in - if self?.readerSource?.isCancelled ?? true { - self?.close() - } - } - } - - /// Called by the parser to let us know that a response is complete and we should close the socket - public func responseCompleteCloseWriter() { - self.responseCompleted = true - self.socketWriterQueue.async { [weak self] in - self?.close() - } - } - - /// Starts reading from the socket and feeding that data to the parser - public func process() { - let tempReaderSource: DispatchSourceRead - //Make sure we have a socket here. Don't use guard so that - // we don't encourage strongSocket to be used in the - // event handler, which could cause a leak - if let strongSocket = socket { - do { - try strongSocket.setBlocking(mode: true) - tempReaderSource = DispatchSource.makeReadSource(fileDescriptor: strongSocket.socketfd, - queue: socketReaderQueue) - } catch { - print("Socket cannot be set to Blocking in process(): \(error)") - return - } - } else { - print("Socket is nil in process()") - return - } - - tempReaderSource.setEventHandler { [weak self] in - guard let strongSelf = self else { - return - } - guard strongSelf.socket?.socketfd ?? -1 > 0 else { - strongSelf.readerSource?.cancel() - strongSelf.cleanup() - return - } - guard !strongSelf.shouldShutdown else { - strongSelf.readerSource?.cancel() - strongSelf.cleanup() - return - } - - var length = 1 //initial value - - do { - if strongSelf.socket?.socketfd ?? -1 > 0 { - var maxLength: Int = Int(strongSelf.readerSource?.data ?? 0) - if (maxLength > strongSelf.maxReadLength) || (maxLength <= 0) { - maxLength = strongSelf.maxReadLength - } - var readBuffer: UnsafeMutablePointer = UnsafeMutablePointer.allocate(capacity: maxLength) - length = try strongSelf.socket?.socketRead(into: &readBuffer, maxLength: maxLength) ?? -1 - if length > 0 { - self?.responseCompleted = false - - let data = Data(bytes: readBuffer, count: length) - let numberParsed = strongSelf.parser?.readStream(data: data) ?? 0 - - if numberParsed != data.count { - print("Error: wrong number of bytes consumed by parser (\(numberParsed) instead of \(data.count)") - } - } - readBuffer.deallocate(capacity: maxLength) - } else { - print("bad socket FD while reading") - length = -1 - } - } catch { - print("ReaderSource Event Error: \(error)") - self?.readerSource?.cancel() - self?.errorOccurred = true - self?.close() - } - if length == 0 { - self?.readerSource?.cancel() - } - if length < 0 { - self?.errorOccurred = true - self?.readerSource?.cancel() - self?.close() - } - } - - tempReaderSource.setCancelHandler { [weak self] in - self?.close() //close if we can - } - - self.readerSource = tempReaderSource - self.readerSource?.resume() - } - - /// Called by the parser to give us data to send back out of the socket - /// - /// - Parameter bytes: Data object to be queued to be written to the socket - public func queueSocketWrite(_ bytes: Data, completion:@escaping (Result) -> Void) { - self.socketWriterQueue.async { [weak self] in - self?.write(bytes) - completion(.ok) - } - } - - /// Write data to a socket. Should be called in an `async` block on the `socketWriterQueue` - /// - /// - Parameter data: data to be written - public func write(_ data: Data) { - do { - var written: Int = 0 - var offset = 0 - - while written < data.count && !errorOccurred { - try data.withUnsafeBytes { (ptr: UnsafePointer) in - let result = try socket?.socketWrite(from: ptr + offset, bufSize: - data.count - offset) ?? -1 - if result < 0 { - print("Received broken write socket indication") - errorOccurred = true - } else { - written += result - } - } - offset = data.count - written - } - if errorOccurred { - close() - return - } - } catch { - print("Received write socket error: \(error)") - errorOccurred = true - close() - } - } -} diff --git a/Sources/HTTP/PoCSocket/PoCSocketSimpleServer.swift b/Sources/HTTP/PoCSocket/PoCSocketSimpleServer.swift deleted file mode 100644 index aab5261..0000000 --- a/Sources/HTTP/PoCSocket/PoCSocketSimpleServer.swift +++ /dev/null @@ -1,194 +0,0 @@ -// This source file is part of the Swift.org Server APIs open source project -// -// Copyright (c) 2017 Swift Server API project authors -// Licensed under Apache License v2.0 with Runtime Library Exception -// -// See http://swift.org/LICENSE.txt for license information -// - -import Dispatch -import Foundation - -// MARK: Server - -/// An HTTP server that listens for connections on a TCP socket and spawns Listeners to handle them. -///:nodoc: -public class PoCSocketSimpleServer: CurrentConnectionCounting { - /// PoCSocket to listen on for connections - private let serverSocket: PoCSocket = PoCSocket() - - /// Collection of listeners of sockets. Used to kill connections on timeout or shutdown - private var connectionListenerList = ConnectionListenerCollection() - - // Timer that cleans up idle sockets on expire - private let pruneSocketTimer: DispatchSourceTimer = DispatchSource.makeTimerSource(queue: DispatchQueue(label: "pruneSocketTimer")) - - /// The port we're listening on. Used primarily to query a randomly assigned port during XCTests - public var port: Int { - return Int(serverSocket.listeningPort) - } - - /// Tuning parameter to set the number of queues - private var queueMax: Int = 4 //sensible default - - /// Tuning parameter to set the number of sockets we can accept at one time - private var acceptMax: Int = 8 //sensible default - - ///Used to stop `accept(2)`ing while shutdown in progress to avoid spurious logs - private let _isShuttingDownLock = DispatchSemaphore(value: 1) - private var _isShuttingDown: Bool = false - var isShuttingDown: Bool { - get { - _isShuttingDownLock.wait() - defer { - _isShuttingDownLock.signal() - } - return _isShuttingDown - } - set { - _isShuttingDownLock.wait() - defer { - _isShuttingDownLock.signal() - } - _isShuttingDown = newValue - } - } - - /// Starts the server listening on a given port - /// - /// - Parameters: - /// - port: TCP port. See listen(2) - /// - handler: Function that creates the HTTP Response from the HTTP Request - /// - Throws: Error (usually a socket error) generated - public func start(port: Int = 0, - queueCount: Int = 0, - acceptCount: Int = 0, - maxReadLength: Int = 1048576, - keepAliveTimeout: Double = 5.0, - handler: @escaping HTTPRequestHandler) throws { - - // Don't let a signal generated by a broken socket kill the server - signal(SIGPIPE, SIG_IGN) - - if queueCount > 0 { - queueMax = queueCount - } - if acceptCount > 0 { - acceptMax = acceptCount - } - try self.serverSocket.bindAndListen(on: port) - - pruneSocketTimer.setEventHandler { [weak self] in - self?.connectionListenerList.prune() - } - #if swift(>=4.0) - pruneSocketTimer.schedule(deadline: .now() + keepAliveTimeout, - repeating: .seconds(Int(keepAliveTimeout))) - #else - pruneSocketTimer.scheduleRepeating(deadline: .now() + keepAliveTimeout, - interval: .seconds(Int(keepAliveTimeout))) - #endif - pruneSocketTimer.resume() - - var readQueues = [DispatchQueue]() - var writeQueues = [DispatchQueue]() - let acceptQueue = DispatchQueue(label: "Accept Queue", qos: .default, attributes: .concurrent) - - let acceptSemaphore = DispatchSemaphore.init(value: acceptMax) - - for idx in 0.. { - weak var value: T? - init (_ value: T) { - self.value = value - } - } - - let lock = DispatchSemaphore(value: 1) - - /// Storage for weak connection listeners - var storage = [WeakConnectionListener]() - - /// Add a new connection to the collection - /// - /// - Parameter listener: socket manager object - func add(_ listener: PoCSocketConnectionListener) { - lock.wait() - storage.append(WeakConnectionListener(listener)) - lock.signal() - } - - /// Used when shutting down the server to close all connections - func closeAll() { - lock.wait() - storage.filter { nil != $0.value }.forEach { $0.value?.close() } - lock.signal() - } - - /// Close any idle sockets and remove any weak pointers to closed (and freed) sockets from the collection - func prune() { - lock.wait() - storage.filter { nil != $0.value }.forEach { $0.value?.closeIfIdleSocket() } - storage = storage.filter { nil != $0.value }.filter { $0.value?.isOpen ?? false } - lock.signal() - } - - /// Count of collections - var count: Int { - lock.wait() - let count = storage.filter { nil != $0.value }.count - lock.signal() - return count - } -} From 0bdbe68a0253d470b31a97ea6eb15b5d4d5f39b7 Mon Sep 17 00:00:00 2001 From: Helge Hess Date: Fri, 10 Nov 2017 16:23:13 +0100 Subject: [PATCH 02/14] Add `queue` argument to handler Similar to PR #86. In this case it is non-optional, that thing being a concrete implementation. All API functions need to run on the given queue. But since the handler itself also runs on that queue, it only needs to be done if the handler actually dispatches to a different queue. This is an optimization to avoid excessive dispatching in async environments. (if API calls could be issued from arbitrary queues, they would always need to be queued to the handler queue. which is possible but pretty expensive). P.S.: The argument is necessary because there is nothing like `DispatchQueue.current` in GCD anymore. --- Sources/HTTP/HTTPCommon.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/HTTP/HTTPCommon.swift b/Sources/HTTP/HTTPCommon.swift index e605fb5..10e051f 100644 --- a/Sources/HTTP/HTTPCommon.swift +++ b/Sources/HTTP/HTTPCommon.swift @@ -32,7 +32,7 @@ import Foundation /// - Parameter req: the incoming HTTP request. /// - Parameter res: a writer providing functions to create an HTTP reponse to the request. /// - Returns HTTPBodyProcessing: a enum that either discards the request data, or provides a closure to process it. -public typealias HTTPRequestHandler = (HTTPRequest, HTTPResponseWriter) -> HTTPBodyProcessing +public typealias HTTPRequestHandler = (HTTPRequest, HTTPResponseWriter, DispatchQueue) -> HTTPBodyProcessing /// Class protocol containing a `handle()` function that implements `HTTPRequestHandler` to respond to incoming HTTP requests. /// - See: `HTTPRequestHandler` for more information @@ -42,7 +42,7 @@ public protocol HTTPRequestHandling: class { /// - Parameter response: a writer providing functions to create an HTTP response to the request. /// - Returns HTTPBodyProcessing: a enum that either discards the request data, or provides a closure to process it. /// - See: `HTTPRequestHandler` for more information - func handle(request: HTTPRequest, response: HTTPResponseWriter) -> HTTPBodyProcessing + func handle(request: HTTPRequest, response: HTTPResponseWriter, queue: DispatchQueue) -> HTTPBodyProcessing } /// The result returned as part of a completion handler From 45d9df664d736f1d46d90f886e6fd68cea929b23 Mon Sep 17 00:00:00 2001 From: Helge Hess Date: Fri, 10 Nov 2017 16:56:35 +0100 Subject: [PATCH 03/14] Drop `HTTPResponseWriter` as a protocol This is a concrete implementation. Unless we do anything as protocols, there is no need to have this as one. --- Sources/HTTP/HTTPResponse.swift | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/Sources/HTTP/HTTPResponse.swift b/Sources/HTTP/HTTPResponse.swift index 3d7680e..60de458 100644 --- a/Sources/HTTP/HTTPResponse.swift +++ b/Sources/HTTP/HTTPResponse.swift @@ -19,33 +19,6 @@ public struct HTTPResponse { public var headers: HTTPHeaders } -/// HTTPResponseWriter provides functions to create an HTTP response -public protocol HTTPResponseWriter: class { - /// Writer function to create the headers for an HTTP response - /// - Parameter status: The status code to include in the HTTP response - /// - Parameter headers: The HTTP headers to include in the HTTP response - /// - Parameter completion: Closure that is called when the HTTP headers have been written to the HTTP respose - func writeHeader(status: HTTPResponseStatus, headers: HTTPHeaders, completion: @escaping (Result) -> Void) - - /// Writer function to write a trailer header as part of the HTTP response - /// - Parameter trailers: The trailers to write as part of the HTTP response - /// - Parameter completion: Closure that is called when the trailers has been written to the HTTP response - /// This is not currently implemented - func writeTrailer(_ trailers: HTTPHeaders, completion: @escaping (Result) -> Void) - - /// Writer function to write data to the body of the HTTP response - /// - Parameter data: The data to write as part of the HTTP response - /// - Parameter completion: Closure that is called when the data has been written to the HTTP response - func writeBody(_ data: UnsafeHTTPResponseBody, completion: @escaping (Result) -> Void) - - /// Writer function to complete the HTTP response - /// - Parameter completion: Closure that is called when the HTTP response has been completed - func done(completion: @escaping (Result) -> Void) - - /// abort: Abort the HTTP response - func abort() -} - /// Convenience methods for HTTP response writer. extension HTTPResponseWriter { /// Convenience function to write the headers for an HTTP response without a completion handler From 897d96aee16145da045c61c65f6850800addf0c7 Mon Sep 17 00:00:00 2001 From: Helge Hess Date: Fri, 10 Nov 2017 17:18:07 +0100 Subject: [PATCH 04/14] Add asynchronous, GCD based HTTP server HTTP server which uses GCD to handle async I/O. --- Sources/HTTP/HTTPConnection.swift | 620 ++++++++++++++++++++++++++ Sources/HTTP/HTTPResponseWriter.swift | 446 ++++++++++++++++++ Sources/HTTP/HTTPServer.swift | 350 +++++++++++++-- 3 files changed, 1379 insertions(+), 37 deletions(-) create mode 100644 Sources/HTTP/HTTPConnection.swift create mode 100644 Sources/HTTP/HTTPResponseWriter.swift diff --git a/Sources/HTTP/HTTPConnection.swift b/Sources/HTTP/HTTPConnection.swift new file mode 100644 index 0000000..8696468 --- /dev/null +++ b/Sources/HTTP/HTTPConnection.swift @@ -0,0 +1,620 @@ +// +// HTTPConnection.swift +// HTTP +// +// Created by Helge Hess on 22.10.17. +// Copyright © 2017 ZeeZide GmbH. All rights reserved. +// + +import Foundation +import Dispatch +import CHTTPParser + +/// A connection to a single HTTP client. +/// +/// The connection owns a queue which is used to synchronize all access to the +/// API. Writers and `finishedProcessing` callbacks alike. +/// +/// Back-pressure is supported. That is, until the body processor of a handler +/// marks a chunk as processed, the connection suspends reading more data. +/// Note that some reads might still be in-flight and will be buffered until +/// the connection is resumed again. +/// +/// Since HTTP clients can pipeline calls, a connection can run multiple +/// handlers at the same time. To maintain proper response ordering, +/// response-writer objects will be 'corked' until the responses ahead of them +/// are completely written out. +/// +internal class HTTPConnection : CustomStringConvertible { + + /// The `HTTPServer` which created the connection. If the connection shuts + /// down, it will unregister from that server. + private let server : HTTPServer + + /// Serialized access to the state of the connection. Everything needs to + /// happen on this queue. It is the `DispatchQueue.main` for a handler. + internal let queue : DispatchQueue + + /// The socket. + private let channel : DispatchIO + + /// User-level closure responsible for handling any incoming requests. Passed + /// in from `HTTPServer`. The connection maintains a strong reference to it, + /// until it is gone. + private let requestHandler : HTTPRequestHandler + + /// The body handler as returned by the `HTTPRequestHandler`. If the handler + /// returns `.discard`, this will be nil and all content will be dropped. + /// At any time, there can only be one `bodyHandler`. + private var bodyHandler : HTTPBodyHandler? + + /// The array of active writers. There can be many due to HTTP pipelining. + /// I.e. a new request can arrive while the previous request is still being + /// delivered to the client. + /// HTTP/1 requires strict ordering of responses. That is, the first response + /// must be fully sent before the next can be sent. To support that, inactive + /// writers are 'corked' and will be flushed when they are due. + private var writers = [ HTTPResponseWriter ]() + // FIXME: make this a linked list + + internal init(fd : Int32, + queue : DispatchQueue, + requestHandler : @escaping HTTPRequestHandler, + server : HTTPServer) + { + self.server = server + self.queue = queue + self.requestHandler = requestHandler + + self.channel = DispatchIO(type: .stream, fileDescriptor: fd, queue: queue) { + error in + close(fd) + } + channel.setLimit(lowWater: 1) + + wireUpParser() + } + + /// Defines whether the receiving side of the socket is still open. + /// Note: some content (and EOF) could still be buffered! + private var isReadSideOpen = true { + didSet { + assert(isReadSideOpen == false, "unexpected value set: \(self)") + if writers.isEmpty { _connectionHasFinished() } + } + } + + /// Close the receiving side of the channel. Note that we may still have stuff + /// in the read buffer! + private func closeReadSide() { + guard isReadSideOpen else { return } + isReadSideOpen = false + shutdown(channel.fileDescriptor, SHUT_RD) + } + + private var isSuspended = true // started in suspended state + private var isReading = false + private var readBuffer : DispatchData? = nil + private var bufferedEOF = false + + internal func suspend() { + assert(!isSuspended, "suspending suspended connection") + if isSuspended { return } + + // Note: The channel itself is not actually suspended, we wait until the + // current `read` is through. + isSuspended = true + } + internal func resume() { + assert(isSuspended, "resuming running connection") + if !isSuspended { return } + + guard isReadSideOpen else { return } + isSuspended = false + + // flush read buffer + if let readBuffer = readBuffer { + self.readBuffer = nil + + // Note: this can trigger suspend/resume! + readBuffer.enumerateBytes { bptr, _, shouldStop in + if !feedParser(bptr) { + closeReadSide() + shouldStop = true + } + } + } + if bufferedEOF { + bufferedEOF = false + if !feedParser(nil) { closeReadSide() } + } + + // Careful here. Additional suspend/resume calls may have happened. Or the + // socket may have been closed. + if isReadSideOpen { + maybeReadMore() + } + } + + /// Feed the parser w/ more data. Pass in nil to signal EOF/shutdown. + @inline(__always) + private func feedParser(_ data: UnsafeBufferPointer?) -> Bool { + var hitParserError = false + + if let data = data { + data.baseAddress?.withMemoryRebound(to: Int8.self, capacity: data.count) { + let rc = http_parser_execute(&self.httpParser, + &self.httpParserSettings, + $0, data.count) + guard rc == data.count else { // error + hitParserError = true + return + } + } + } + else { + let rc = http_parser_execute(&self.httpParser, + &self.httpParserSettings, + nil, 0) + hitParserError = rc != 0 + } + return !hitParserError + } + + private let readBufferSize = 4096 + + internal func maybeReadMore() { + assert(isReadSideOpen, "starting, but reading-side is not open?") + guard !isReading else { return } // still reading + guard isReadSideOpen else { return } + + isReading = true + channel.read(offset: 0, length: readBufferSize, queue: queue) { + // Note: This is retaining the connection! So if we stop, we also need to + // stop this read-call! (it should error out if we close the read + // side) + done, data, error in + + assert(self.isReading, "read callback called, but not reading anymore?") + guard self.isReadSideOpen else { return } + + if done { + self.isReading = false + } + + var hitParseError = false + + if error == 0, let data = data { + if done && data.isEmpty { // error = 0, empty data, + done means EOF + if self.isSuspended { + self.bufferedEOF = true + } + else { + hitParseError = !self.feedParser(nil) + } + self.isReadSideOpen = false + } + else if self.isSuspended { + if self.readBuffer == nil { self.readBuffer = data } + else { self.readBuffer!.append(data) } + } + else { + data.enumerateBytes { bptr, _, stop in + if !self.feedParser(bptr) { + hitParseError = true + stop = true + } + } + } + } + + if error != 0 || hitParseError { + self.abortOnError() + } + else if done && self.isReadSideOpen && !self.isSuspended { + // continue reading next block + self.maybeReadMore() + } + } + } + + /// A hard close, makes no sense to continue anything once we got a read + /// error. + func abortOnError() { + closeReadSide() + channel.close(flags: .stop) + bodyHandler = nil + for writer in writers { + writer.abort() + } + writers = [] + + _connectionHasFinished() + } + + internal func serverWillStop() { // sent by server on server queue + queue.async { self._serverWillStop() } + } + private func _serverWillStop() { + // TODO: set some flag, teardown + // Note: the server object won't go away, we have a hard retain on it. + } + + /// Called by the writer if it is done and all queued writes have completed. + internal func _responseWriterIsDone(_ writer: HTTPResponseWriter) { + let isFirst = writers.first === writer + + guard isFirst else { + assert(writers.first == nil || writers.first === writer, + "writer done, but it is not the head writer? \(self) \(writer)") + return + } + + if let idx = writers.index(where: { $0 === writer }) { + // break retain cycle + writers.remove(at: idx) + } + else { + assert(false, "did not find writer which went done: \(self)") + } + + // activate next writer in the pipeline + if let newFirst = writers.first { + newFirst.channel = channel + } + + // Close connection if the read side is down, and no writers need to write + if writers.isEmpty && !isReadSideOpen { // && keep alive? + // we are done? + _connectionHasFinished() + } + } + + private var didFinish = false + private func _connectionHasFinished() { + guard !didFinish else { return } + didFinish = true + server._connectionIsDone(self) + } + + + // MARK: - Parser + + // This is inline for speedz. We could separate it out, but that is harder + // than it looks to get fast. Because you can't mix C closures w/ generics + // and using a protocol is not exactly fast either. + + @inline(__always) + private final func headersCompleted(parser: UnsafeMutablePointer) { + // this is a little lame + let method : HTTPMethod + let version : HTTPVersion + + method = http_method(rawValue: parser.pointee.method).getMethod() + version = HTTPVersion(major: Int(parser.pointee.http_major), + minor: Int(parser.pointee.http_minor)) + + #if false // TODO: do the right thing here + let keepAlive = http_should_keep_alive(parser) == 1 + let upgradeRequested = parser.pointee.upgrade == 1 + #endif + + // create request object + + let request = HTTPRequest(method: method, target: parsedURL ?? "", + httpVersion: version, headers: parsedHeaders) + resetHeaderParseState() + + // create response object + + let response = HTTPResponseWriter(requestVersion: version, + connection: self, queue: queue) + if writers.isEmpty { // yay, we are active + response.channel = channel + } + writers.append(response) + + // run user-level handler closure + + let bh = requestHandler(request, response, queue) + switch bh { + case .discardBody: bodyHandler = nil + case .processBody(let handler): bodyHandler = handler + } + } + + /// The body handler set the 'stop' flag. + private func bodyHandlerRequestedStop() { + // Note: This is the physical close. We still may have stuff in the + // buffer. Drop it. Do NOT resume. + readBuffer = nil + closeReadSide() // physical close + } + + /// The number of `finishedProcessing` callbacks we are waiting for. + private var inFlightBodySends = 0 + + @inline(__always) + private final func handleBodyChunk(_ chunk: UnsafeRawBufferPointer) { + guard let bodyHandler = bodyHandler else { return } // .discard + + var wasStopped = false, doneGotCalled = false + + let doneCB = { // The done callback MUST run on the handler-queue + doneGotCalled = true + guard !wasStopped else { return } // already handled + + self.inFlightBodySends -= 1 + if self.inFlightBodySends == 0 { self.resume() } + } + + let data = DispatchData(bytes: chunk) + // copies (we kinda need to, for async) + + if inFlightBodySends == 0 { self.suspend() } // stop reading from socket + inFlightBodySends += 1 + + var shouldStop = false + bodyHandler(.chunk(data: data, finishedProcessing: doneCB), + &shouldStop) + + if shouldStop { + self.bodyHandler = nil + wasStopped = true + if !doneGotCalled { + self.inFlightBodySends -= 1 + } + + bodyHandlerRequestedStop() + } + } + + @inline(__always) + private final func messageCompleted() { + // The request has been fully parsed (including all body data). + + var stop = false + bodyHandler?(.end, &stop) + + // TBD: Do we need to hang on to the body handler until the response is + // done? I don't think so. If we want to, retain it in the handler. + bodyHandler = nil + + if stop { + bodyHandlerRequestedStop() + } + } + + + /// Holds the bytes that come from the CHTTPParser until we have enough of + /// them to do something with it + private var parserBuffer = Data() + + /// HTTP Parser + private var httpParser = http_parser() + private var httpParserSettings = http_parser_settings() + + private var lastCallBack = LastCallback.none + private var lastHeaderName : HTTPHeaders.Name? + private var parsedHeaders = HTTPHeaders() + private var parsedURL : String? + + enum LastCallback { case none, field, value, url } + + private func resetHeaderParseState() { + lastCallBack = .none + lastHeaderName = nil + parsedURL = nil + parsedHeaders = HTTPHeaders() // TBD: hm. Maybe do this differently (reuse) + parserBuffer.removeAll() + } + + private func wireUpParser() { + // Set up all the callbacks for the CHTTPParser library. + httpParserSettings.on_message_begin = { parser -> Int32 in + guard let me = HTTPConnection.getSelf(parser: parser) else { return 0 } + me.resetHeaderParseState() + return 0 + } + httpParserSettings.on_message_complete = { parser -> Int32 in + guard let me = HTTPConnection.getSelf(parser: parser) else { return 0 } + me.messageCompleted() + me.resetHeaderParseState() // should be done already + return 0 + } + + httpParserSettings.on_headers_complete = { parser -> Int32 in + guard let me = HTTPConnection.getSelf(parser: parser) else { return 0 } + var dummy : CChar = 0 + _ = me.processDataCB(newState: .none, p: &dummy, len: 0) // finish up + + me.headersCompleted(parser: parser!) + return 0 + } + + httpParserSettings.on_header_field = { (parser, chunk, length) -> Int32 in + guard let me = HTTPConnection.getSelf(parser: parser) else { return 0 } + return me.processDataCB(newState: .field, p: chunk, len: length) + } + + httpParserSettings.on_header_value = { (parser, chunk, length) -> Int32 in + guard let me = HTTPConnection.getSelf(parser: parser) else { return 0 } + return me.processDataCB(newState: .value, p: chunk, len: length) + } + + httpParserSettings.on_body = { (parser, chunk, length) -> Int32 in + guard let me = HTTPConnection.getSelf(parser: parser) else { return 0 } + me.handleBodyChunk(UnsafeRawBufferPointer(start: chunk, count: length)) + return 0 + } + + httpParserSettings.on_url = { (parser, chunk, length) -> Int32 in + guard let me = HTTPConnection.getSelf(parser: parser) else { return 0 } + return me.processDataCB(newState: .url, p: chunk, len: length) + } + + http_parser_init(&httpParser, HTTP_REQUEST) + self.httpParser.data = Unmanaged.passUnretained(self).toOpaque() + } + + private final func processDataCB(newState s: LastCallback, + p: UnsafePointer?, len: size_t) + -> Int32 + { + let newState = s + if lastCallBack == newState { // continue value + if let p = p { + let bp = UnsafeBufferPointer(start: p, count: len) + parserBuffer.append(bp) + } + return 0 // done already. state is the same + } + + switch lastCallBack { // != newState! + case .url: // finished URL + if !parserBuffer.isEmpty { + parsedURL = String(data: parserBuffer, encoding: .utf8) + } + + case .field: // last field was a name + lastHeaderName = headerNameForData(parserBuffer) + + case .value: // last field was a value, now something new + let value = String(data: parserBuffer, encoding: .utf8) + #if DEBUG + assert(lastHeaderName != nil, "header value w/o a name?") + assert(value != nil, "header value missing?") + #endif + + // TBD: Uh, oh, why the need to create a literal here?? + if let name = lastHeaderName, let value = value { + parsedHeaders.append([name: value]) + } + lastHeaderName = nil + + default: + break + } + + // store new data & state + parserBuffer.removeAll() + lastCallBack = newState + if len > 0, let p = p { + let bp = UnsafeBufferPointer(start: p, count: len) + parserBuffer.append(bp) + } + return 0 + } + + @inline(__always) + static func getSelf(parser: UnsafeMutablePointer?) + -> HTTPConnection? + { + guard let pointee = parser?.pointee.data else { return nil } + return Unmanaged + .fromOpaque(pointee) + .takeUnretainedValue() + } + + + // MARK: - CustomStringConvertible + + var description : String { + var ms = " HTTPHeaders.Name? { + guard !data.isEmpty else { return nil } + // TODO: reuse header values by matching the data via memcmp(), maybe first + // switch on length, compare c0 + guard let s = String(data: data, encoding: .utf8) else { return nil } + return HTTPHeaders.Name(s) +} + +fileprivate extension http_method { + + @inline(__always) + func getMethod() -> HTTPMethod { + // We have this, so that we can use static strings most of the time! + switch self { + case HTTP_DELETE: return HTTPMethod.delete + case HTTP_GET: return HTTPMethod.get + case HTTP_HEAD: return HTTPMethod.head + case HTTP_POST: return HTTPMethod.post + case HTTP_PUT: return HTTPMethod.put + /* pathological */ + case HTTP_CONNECT: return HTTPMethod.connect + case HTTP_OPTIONS: return HTTPMethod.options + case HTTP_TRACE: return HTTPMethod.trace + /* WebDAV */ + case HTTP_COPY: return HTTPMethod.copy + case HTTP_LOCK: return HTTPMethod.lock + case HTTP_MKCOL: return HTTPMethod.mkcol + case HTTP_MOVE: return HTTPMethod.move + case HTTP_PROPFIND: return HTTPMethod.propfind + case HTTP_PROPPATCH: return HTTPMethod.proppatch + case HTTP_SEARCH: return HTTPMethod.search + case HTTP_UNLOCK: return HTTPMethod.unlock + case HTTP_BIND: return HTTPMethod.bind + case HTTP_REBIND: return HTTPMethod.rebind + case HTTP_UNBIND: return HTTPMethod.unbind + case HTTP_ACL: return HTTPMethod.acl + /* subversion */ + case HTTP_REPORT: return HTTPMethod.report + case HTTP_MKACTIVITY: return HTTPMethod.mkactivity + case HTTP_CHECKOUT: return HTTPMethod.checkout + case HTTP_MERGE: return HTTPMethod.merge + /* upnp */ + case HTTP_MSEARCH: return HTTPMethod.msearch + case HTTP_NOTIFY: return HTTPMethod.notify + case HTTP_SUBSCRIBE: return HTTPMethod.subscribe + case HTTP_UNSUBSCRIBE: return HTTPMethod.unsubscribe + /* RFC-5789 */ + case HTTP_PATCH: return HTTPMethod.patch + case HTTP_PURGE: return HTTPMethod.purge + /* CalDAV */ // - Helge was here + case HTTP_MKCALENDAR: return HTTPMethod.mkcalendar + /* RFC-2068, section 19.6.1.2 */ + case HTTP_LINK: return HTTPMethod.link + case HTTP_UNLINK: return HTTPMethod.unlink + + default: + // TBD: - should return nil instead? + return HTTPMethod(String(cString: http_method_str(self))) + } + } +} diff --git a/Sources/HTTP/HTTPResponseWriter.swift b/Sources/HTTP/HTTPResponseWriter.swift new file mode 100644 index 0000000..e773d68 --- /dev/null +++ b/Sources/HTTP/HTTPResponseWriter.swift @@ -0,0 +1,446 @@ +// +// HTTPResponseWriter.swift +// HTTP +// +// Created by Helge Hess on 22.10.17. +// Copyright © 2017 ZeeZide GmbH. All rights reserved. +// +// Copyright (c) 2017 Swift Server API project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// + +import Foundation +import Dispatch + +/// HTTPResponseWriter provides functions to create an HTTP response +/// +/// ## Queueing +/// +/// All functions expect to be called on the queue of the request handler (which +/// is passed in as an argument). +/// For example, if you dispatch to a different queue, you need to dispatch back +/// the the handler queue for writes: +/// +/// server.start { request, response, queue in +/// backgroundQueue.async { +/// // do expensive processing ... +/// let result = 42 +/// // bounce back to the handler queue to write: +/// queue.async { +/// response.write("The answer is: \(result)") +/// response.done() +/// } +/// } +/// +public class HTTPResponseWriter : CustomStringConvertible { + + public enum Error : Swift.Error { + case connectionGone // unexpected! + case encodingError + case headersWrittenAlready + case writeFailed(Int32) + case writerIsDone + } + + /// The same queue as the `HTTPConnection` queue. + /// Separate variable because `connection` is an Optional. + final private let queue : DispatchQueue + // TODO: maybe the connection should not be an optional + + /// The connection owning the writer. + final private var connection : HTTPConnection? + // TODO: maybe the connection should not be an optional. + + final var channel : DispatchIO? = nil { + didSet { + if channel != nil, let buffer = buffer, !buffer.isEmpty { + let done = self.bufferDone + self.buffer = nil + self.bufferDone = nil + + self._write(buffer, completion: done ?? {_ in }) + } + } + } + + private var isDone = false + + /// The write buffer. + /// All writes and the done callbacks are merged into a single call. TBD + private var buffer : DispatchData? = nil + + /// The `done` callbacks of the write buffer. + /// All writes and the done callbacks are merged into a single call. TBD + private var bufferDone : ((Result) -> Void)? = nil + + /// The HTTP version requested by the client. + private var requestVersion : HTTPVersion + + private var clientRequestedKeepAlive = true + // TODO: I think we should use the HTTPParser setting for this. + private var isChunked = true + + + internal init(requestVersion : HTTPVersion, + connection : HTTPConnection, + queue : DispatchQueue) + { + self.connection = connection + self.queue = queue + self.requestVersion = requestVersion + } + + private var headersSent = false + + /// Writer function to create the headers for an HTTP response + /// + /// Queue: You need to call this on the queue that was passed into the + /// handler. + /// + /// - Parameter status: The status code to include in the HTTP response + /// - Parameter headers: The HTTP headers to include in the HTTP response + /// - Parameter completion: Closure that is called when the HTTP headers have + /// been written to the HTTP respose + public func writeHeader(status: HTTPResponseStatus, headers: HTTPHeaders, + completion: @escaping (Result) -> Void) + { + guard !self.headersSent else { + completion(Result.error(Error.headersWrittenAlready)) + return + } + guard self.gotDone == nil else { + completion(.error(Error.writerIsDone)) + return + } + + // TBD: is it more efficient to add up to a DispatchData or Data? + var header = "HTTP/1.1 \(status.code) \(status.reasonPhrase)\r\n" + + let isContinue = status == .continue + + var headers = headers + if !isContinue { + // FIXME: This has extra side-effects, don't. + self.adjustHeaders(status: status, headers: &headers) + } + for (key, value) in headers { + // TODO encode value using [RFC5987] + header += "\(key): \(value)\r\n" + } + header.append("\r\n") + + // FIXME headers are US-ASCII, anything else should be encoded using + // [RFC5987] some lines above + // TODO use requested encoding if specified + guard let data = DispatchData.fromString(header) else { + completion(Result.error(Error.encodingError)) + return + } + + if !isContinue { + self.headersSent = true + } + + self._write(data, completion: completion) + } + + /// Writer function to write a trailer header as part of the HTTP response + /// + /// Queue: You need to call this on the queue that was passed into the + /// handler. + /// + /// - Parameter trailers: The trailers to write as part of the HTTP response + /// - Parameter completion: Closure that is called when the trailers has been written to the HTTP response + /// This is not currently implemented + public func writeTrailer(_ trailers: HTTPHeaders, + completion: @escaping (Result) -> Void) + { + // TODO + // - render trailers into a DispatchData we can pass on + // - fail when done + } + + /// Writer function to write data to the body of the HTTP response + /// + /// Queue: You need to call this on the queue that was passed into the + /// handler. + /// + /// - Parameter data: The data to write as part of the HTTP response + /// - Parameter completion: Closure that is called when the data has been written to the HTTP response + public func writeBody(_ data: DispatchData, + completion: @escaping (Result) -> Void) + { + guard self.gotDone == nil else { + completion(.error(Error.writerIsDone)) + return + } + + if !self.headersSent { + self.writeHeader(status: .ok, headers: HTTPHeaders()) { + result in + + if case .error = result { + completion(result) + return + } + + self._write(self.isChunked ? data.withHTTPChunkedFrame : data, + completion: completion) + } + } + else { + self._write(self.isChunked ? data.withHTTPChunkedFrame : data, + completion: completion) + } + } + + /// Writer function to write data to the body of the HTTP response + /// + /// Queue: You need to call this on the queue that was passed into the + /// handler. + /// + /// - Parameter data: The data to write as part of the HTTP response + /// - Parameter completion: Closure that is called when the data has been written to the HTTP response + public func writeBody(_ data: UnsafeHTTPResponseBody, + completion: @escaping (Result) -> Void) + { + // Note: Using this is lame, it requires a copy. Be smart, use DispatchData. + data.withUnsafeBytes { rbp in + let data = DispatchData(bytes: rbp) + writeBody(data, completion: completion) + } + } + + /// This is set if `done` was called on the writer. It holds the completion + /// callback for that done function. + private var gotDone : ((Result) -> Void)? = nil + + /// Writer function to complete the HTTP response. + /// + /// Queue: You need to call this on the queue that was passed into the + /// handler. + /// + /// - Parameter completion: Closure that is called when the HTTP response has + /// been completed + public func done(completion: @escaping ( Result ) -> Void) { + guard self.gotDone == nil && !isDone else { + completion(.error(Error.writerIsDone)) + return + } + isDone = true + gotDone = completion + + // send chunkedEnd + if self.isChunked { + self._write(DataChunks.chunkedEnd) { result in } + // this calls reallyDone as part of gotDone + } + else if self.pendingWrites == 0 { + self._reallyDone() + } + } + + /// All writes have completed + private func _reallyDone(result: Result = .ok) { + withExtendedLifetime(self) { // TBD: necessary? + channel = nil + connection?._responseWriterIsDone(self) + connection = nil // break cycle (TBD: do we really have to?) + + if let done = gotDone { + gotDone = nil + done(result) + } + } + } + + /// abort: Abort the HTTP response + public func abort() { + // TODO: + // - do we really need this? Or is `stop` in the body parser enough? I guess + // it is kinda nice + } + + private final var pendingWrites = 0 + + private func _write(_ data: DispatchData, + completion: @escaping (Result) -> Void) + { + if let channel = channel { + pendingWrites += 1 + + channel.write(offset: 0, data: data, queue: queue) { + done, data, error in + + if done { self.pendingWrites -= 1 } + + if error != 0 { + let result = Result.error(Error.writeFailed(error)) + completion(result) + self._reallyDone(result: result) + return + } + + if done { + completion(.ok) + } + + if self.gotDone != nil && self.pendingWrites == 0 { + self._reallyDone() + } + } + } + else { // we are corked. queue writes which don't wait for completion + // TBD: We could also queue them as individual writes, which may have + // some latency advantages, but well. + if buffer != nil { buffer!.append(data) } + else { buffer = data } + + if let old = bufferDone { + bufferDone = { result in + old(result) + completion(result) + } + } + else { bufferDone = completion } + } + } + + private func adjustHeaders(status: HTTPResponseStatus, + headers: inout HTTPHeaders) + { + for header in status.suppressedHeaders { + headers[header] = nil + } + + if headers[.contentLength] != nil { + headers[.transferEncoding] = "identity" + } + else if requestVersion >= HTTPVersion(major: 1, minor: 1) { + switch headers[.transferEncoding] { + case .some("identity"): // identity without content-length + clientRequestedKeepAlive = false + case .some("chunked"): + isChunked = true + default: + isChunked = true + headers[.transferEncoding] = "chunked" + } + } + else { + // HTTP 1.0 does not support chunked + clientRequestedKeepAlive = false + headers[.transferEncoding] = nil + } + + headers[.connection] = clientRequestedKeepAlive ? "Keep-Alive" : "Close" + } + + + // MARK: - CustomStringConvertible + + public var description : String { + // requestVersion + var ms = " DispatchData? { + // vs: let utf8 = marker.utf8CString (but included \0) + guard let data = s.data(using: .utf8) else { return nil } + return data.withUnsafeBytes { DispatchData(bytes: $0) } + // TODO: stupid copying, do this better + } + + var withHTTPChunkedFrame : DispatchData { + let count = self.count + guard count > 0 else { return self } + + var result = count.chunkLenDispatchData // TBD: cache/global common sizes? + result.append(self) + result.append(DataChunks.crlf) + return result + } + +} + +fileprivate extension FixedWidthInteger { + + var chunkLenDispatchData : DispatchData { + // thanks go to @regexident + var bigEndian = self.bigEndian + + return Swift.withUnsafeBytes(of: &bigEndian) { bp in + let maxlen = bitWidth / 8 * 2 + let cstr = UnsafeMutablePointer.allocate(capacity: maxlen + 3) + var idx = 0 + + for byte in bp { + if idx == 0 && byte == 0 { continue } + + func hexFromNibble(_ nibble: UInt8) -> UInt8 { + let cA : UInt8 = 65 + let c0 : UInt8 = 48 + let corr : UInt8 = cA - c0 - 10 + let c = nibble + c0 + let mask : UInt8 = (nibble > 9) ? 0xff : 0x00; + return c + (mask & corr) + } + + cstr[idx] = hexFromNibble((byte & 0b11110000) >> 4); idx += 1 + cstr[idx] = hexFromNibble((byte & 0b00001111)); idx += 1 + } + if idx == 0 { + let c0 : UInt8 = 48 + cstr[idx] = c0; idx += 1 + } + cstr[idx] = 13; idx += 1 + cstr[idx] = 10; idx += 1 + cstr[idx] = 0 // having a valid cstr in memory is well worth a byte + + let bbp = UnsafeRawBufferPointer(start: cstr, count: idx) + return DispatchData(bytesNoCopy: bbp, deallocator: .free) + } + } +} + diff --git a/Sources/HTTP/HTTPServer.swift b/Sources/HTTP/HTTPServer.swift index 5ddc9af..637c3d7 100644 --- a/Sources/HTTP/HTTPServer.swift +++ b/Sources/HTTP/HTTPServer.swift @@ -1,54 +1,330 @@ -// This source file is part of the Swift.org Server APIs open source project // -// Copyright (c) 2017 Swift Server API project authors -// Licensed under Apache License v2.0 with Runtime Library Exception +// HTTPServer.swift +// HTTP // -// See http://swift.org/LICENSE.txt for license information +// Created by Helge Hess on 22.10.17. +// Copyright © 2017 ZeeZide GmbH. All rights reserved. // -/// Definition of an HTTP server. -public protocol HTTPServing: class { +import Foundation +import Dispatch +#if os(Linux) + import Glibc +#else + import Darwin +#endif - /// Start the HTTP server on the given `port`, using `handler` to process incoming requests - func start(port: Int, handler: @escaping HTTPRequestHandler) throws +public class HTTPServer { + + public enum SocketError : Swift.Error { + case setupFailed (Int32) + case couldNotSetOption(Int32) + case bindFailed (Int32) + case listenFailed (Int32) + } + public enum Error : Swift.Error { + // TBD: this could be done in a better way + case socketError(SocketError) + } + + private var boundAddress : sockaddr_in? = nil + + private var listenSource : DispatchSourceRead? + + internal let queue = + DispatchQueue(label: "de.zeezide.swift.server.http.server") + private var connections = [ HTTPConnection ]() + + private var acceptCount = 0 + private let handlerBaseQueues : [ DispatchQueue ] = { + var queues = [ DispatchQueue ]() + for i in 0...stride) + var addr = sockaddr_in() + + let newFD = withUnsafeMutablePointer(to: &addr) { + return $0.withMemoryRebound(to: sockaddr.self, capacity: 1) { + return accept(fd, $0, &addrlen) + } + } + let error = errno + + guard newFD != -1 else { + if error == EWOULDBLOCK { break } + // what do we want to do w/ accept errors? + break + } + + #if os(Linux) + // No: SO_NOSIGPIPE on Linux, use MSG_NOSIGNAL in send() + #else + var val : Int32 = 1 + let rc = setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, + &val, socklen_t(MemoryLayout.stride)) + if rc != 0 { + // let error = errno + // I guess this is non-fatal. At most we fail at SIGPIPE :-> + } + #endif + + // We need to synchronize on the queue to grab a new base queue, and to + // register out connection w/ the server (though that has dubious value). + queue.async { + self.handleAcceptedSocket(newFD, handler: handler) + } + } + while true // we break when we would block or on error + } + + private func handleAcceptedSocket(_ fd : Int32, + handler : @escaping HTTPRequestHandler) + { + if acceptCount == Int.max { acceptCount = 0 } // wow, this is stable code! + else { acceptCount += 1 } + + // Simple round robin. + let baseQueue = handlerBaseQueues[acceptCount % handlerBaseQueues.count] + + // Create a new queue, but share the base queue (and its thread). + let connectionQueue = + DispatchQueue(label : "de.zeezide.swift.server.http.con\(acceptCount)", + target : baseQueue) + + // Create connection, register and start it. + let connection = HTTPConnection(fd: fd, queue: connectionQueue, + requestHandler: handler, server: self) + connections.append(connection) + + connection.resume() // start reading from socket + } + + internal func _connectionIsDone(_ connection: HTTPConnection) { + queue.async { + guard let idx = self.connections.index(where: { $0 === connection }) else { + assert(false, "did not find finished connection: \(connection)") + return + } + + // break retain cycle + self.connections.remove(at: idx) + } + } + + private func createSocket(boundTo address: T) throws -> ( Int32, T ) { + // TODO: We should constraint T to a protocol adopted by sockaddr_in/in6/un + // like in Noze.io. + // FIXME: This doesn't work right for AF_LOCAL/sockaddr_un + + let fd = socket(AF_INET, SOCK_STREAM, 0) + var error = errno + if fd == -1 { throw Error.socketError(.setupFailed(error)) } + + var closeSocket = true + defer { if closeSocket { close(fd) } } + + var buf = Int32(1) + var rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, + &buf, socklen_t(MemoryLayout.stride)) + error = errno + if rc != 0 { throw Error.socketError(.couldNotSetOption(error)) } + + + // TBD: I think the O_NONBLOCK flag is actually set by GCD when we bind the + // listener, is that right? + let flags = fcntl(fd, F_GETFL) + error = errno + if flags < 0 { throw Error.socketError(.couldNotSetOption(error)) } + + rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) + error = errno + if rc < 0 { throw Error.socketError(.couldNotSetOption(error)) } - /// The number of current connections - var connectionCount: Int { get } + + /* bind */ + + var addrlen = socklen_t(MemoryLayout.stride) + var address = address + rc = withUnsafePointer(to: &address) { + return $0.withMemoryRebound(to: sockaddr.self, capacity: 1) { + return bind(fd, $0, addrlen) + } + } + error = errno + if rc != 0 { throw Error.socketError(.bindFailed(error)) } + + rc = withUnsafeMutablePointer(to: &address) { + return $0.withMemoryRebound(to: sockaddr.self, capacity: 1) { + return getsockname(fd, $0, &addrlen) + } + } + error = errno + if rc != 0 { throw Error.socketError(.bindFailed(error)) } + + closeSocket = false + return ( fd, address ) + } + } -/// A basic HTTP server. Currently this is implemented using the PoCSocket -/// abstraction, but the intention is to remove this dependency and reimplement -/// the class using transport APIs provided by the Server APIs working group. -public class HTTPServer: HTTPServing { - private let server = PoCSocketSimpleServer() - /// Create an instance of the server. This needs to be followed with a call to `start(port:handler:)` - public init() { - } +// MARK: - Socket utilities - /// Start the HTTP server on the given `port` number, using a `HTTPRequestHandler` to process incoming requests. - public func start(port: Int = 0, handler: @escaping HTTPRequestHandler) throws { - try server.start(port: port, handler: handler) +fileprivate extension sockaddr_in { + init(address: in_addr = in_addr(), port: Int?) { + self.init() + if let port = port { + #if os(Linux) + sin_port = htons(UInt16(port)) + #else + sin_port = Int(OSHostByteOrder()) == OSLittleEndian + ? _OSSwapInt16(UInt16(port)) : UInt16(port) + #endif } - - /// Stop the server - public func stop() { - server.stop() + else { + sin_port = 0 } + sin_addr = address + } + + var isWildcardPort : Bool { return sin_port == 0 } + + var port : Int { + #if os(Linux) + return Int(ntohs(addr_in.sin_port)) + #else + return Int(Int(OSHostByteOrder()) == OSLittleEndian + ? sin_port.bigEndian : sin_port) + #endif + } - /// The port number the server is listening on - public var port: Int { - return server.port - } + var asString : String { + let addr = sin_addr.asString + return isWildcardPort ? "\(addr):*" : "\(addr):\(port)" + } +} - /// The number of current connections - public var connectionCount: Int { - return server.connectionCount - } +fileprivate extension in_addr { + init() { s_addr = INADDR_ANY } + + var asString : String { + if self.s_addr == INADDR_ANY { return "*.*.*.*" } + + let len = Int(INET_ADDRSTRLEN) + 2 + var buf = [CChar](repeating:0, count: len) + + var selfCopy = self + let cs = inet_ntop(AF_INET, &selfCopy, &buf, socklen_t(len)) + return cs != nil ? String(cString: cs!) : "" + } } From 288a1474e6ba757ad657b546b696574b5217a1a7 Mon Sep 17 00:00:00 2001 From: Helge Hess Date: Fri, 10 Nov 2017 17:34:15 +0100 Subject: [PATCH 05/14] Add a very basic imp of connectionCount ... on Darwin. Use OSAtomicIncrement/Dec. --- Sources/HTTP/HTTPServer.swift | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/Sources/HTTP/HTTPServer.swift b/Sources/HTTP/HTTPServer.swift index 637c3d7..bec0091 100644 --- a/Sources/HTTP/HTTPServer.swift +++ b/Sources/HTTP/HTTPServer.swift @@ -129,13 +129,7 @@ public class HTTPServer { } - #if false - public var connectionCount: Int - // (do we really need this? requires another sync) - // - yeah, we need the connections for stop - // - `getConnectionCount { count in }` would be better, but we could also do - // an AtomicIncr/Decr I guess - #endif + private(set) public var connectionCount : Int32 = 0 private func handleListenEvent(on fd : Int32, @@ -205,6 +199,12 @@ public class HTTPServer { let connection = HTTPConnection(fd: fd, queue: connectionQueue, requestHandler: handler, server: self) connections.append(connection) + #if os(Linux) + // TODO + #else + OSAtomicIncrement32(&connectionCount) + #endif + connection.resume() // start reading from socket } @@ -216,6 +216,12 @@ public class HTTPServer { return } + #if os(Linux) + // TODO + #else + OSAtomicDecrement32(&self.connectionCount) + #endif + // break retain cycle self.connections.remove(at: idx) } From 2756b03480eb4c38235709f0d9d74f8821eaea49 Mon Sep 17 00:00:00 2001 From: Helge Hess Date: Fri, 10 Nov 2017 17:34:47 +0100 Subject: [PATCH 06/14] Make tests compile (but they won't work!) Presumably tests would need to be done differently in an async setup. Not sure how much we could reuse here. --- .../Helpers/AbortAndSendHelloHandler.swift | 2 +- Tests/HTTPTests/Helpers/EchoHandler.swift | 2 +- .../HTTPTests/Helpers/HelloWorldHandler.swift | 2 +- .../Helpers/HelloWorldKeepAliveHandler.swift | 2 +- Tests/HTTPTests/Helpers/OkHandler.swift | 2 +- .../Helpers/SimpleResponseCreator.swift | 2 +- .../Helpers/TestResponseResolver.swift | 21 +++++++++++++++++++ .../Helpers/UnchunkedHelloWorldHandler.swift | 2 +- Tests/HTTPTests/ServerTests.swift | 9 ++++++-- 9 files changed, 35 insertions(+), 9 deletions(-) diff --git a/Tests/HTTPTests/Helpers/AbortAndSendHelloHandler.swift b/Tests/HTTPTests/Helpers/AbortAndSendHelloHandler.swift index a640bf2..717b7fe 100644 --- a/Tests/HTTPTests/Helpers/AbortAndSendHelloHandler.swift +++ b/Tests/HTTPTests/Helpers/AbortAndSendHelloHandler.swift @@ -15,7 +15,7 @@ class AbortAndSendHelloHandler: HTTPRequestHandling { var chunkCalledCount=0 var chunkLength=0 - func handle(request: HTTPRequest, response: HTTPResponseWriter ) -> HTTPBodyProcessing { + func handle(request: HTTPRequest, response: HTTPResponseWriter, queue: DispatchQueue ) -> HTTPBodyProcessing { //Assume the router gave us the right request - at least for now response.writeHeader(status: .ok, headers: [.transferEncoding: "chunked", "X-foo": "bar"]) return .processBody { (chunk, stop) in diff --git a/Tests/HTTPTests/Helpers/EchoHandler.swift b/Tests/HTTPTests/Helpers/EchoHandler.swift index ffd3f72..fd3b1d0 100644 --- a/Tests/HTTPTests/Helpers/EchoHandler.swift +++ b/Tests/HTTPTests/Helpers/EchoHandler.swift @@ -11,7 +11,7 @@ import HTTP /// Simple `HTTPRequestHandler` that just echoes back whatever input it gets class EchoHandler: HTTPRequestHandling { - func handle(request: HTTPRequest, response: HTTPResponseWriter ) -> HTTPBodyProcessing { + func handle(request: HTTPRequest, response: HTTPResponseWriter, queue: DispatchQueue ) -> HTTPBodyProcessing { //Assume the router gave us the right request - at least for now response.writeHeader(status: .ok, headers: ["Transfer-Encoding": "chunked", "X-foo": "bar"]) return .processBody { (chunk, stop) in diff --git a/Tests/HTTPTests/Helpers/HelloWorldHandler.swift b/Tests/HTTPTests/Helpers/HelloWorldHandler.swift index 57e5777..01ff0d5 100644 --- a/Tests/HTTPTests/Helpers/HelloWorldHandler.swift +++ b/Tests/HTTPTests/Helpers/HelloWorldHandler.swift @@ -11,7 +11,7 @@ import HTTP /// Simple `HTTPRequestHandler` that prints "Hello, World" as per K&R class HelloWorldHandler: HTTPRequestHandling { - func handle(request: HTTPRequest, response: HTTPResponseWriter ) -> HTTPBodyProcessing { + func handle(request: HTTPRequest, response: HTTPResponseWriter, queue: DispatchQueue ) -> HTTPBodyProcessing { //Assume the router gave us the right request - at least for now response.writeHeader(status: .ok, headers: [.transferEncoding: "chunked", "X-foo": "bar"]) return .processBody { (chunk, stop) in diff --git a/Tests/HTTPTests/Helpers/HelloWorldKeepAliveHandler.swift b/Tests/HTTPTests/Helpers/HelloWorldKeepAliveHandler.swift index f27447e..0b5933f 100644 --- a/Tests/HTTPTests/Helpers/HelloWorldKeepAliveHandler.swift +++ b/Tests/HTTPTests/Helpers/HelloWorldKeepAliveHandler.swift @@ -11,7 +11,7 @@ import HTTP /// `HelloWorldRequestHandler` that sets the keep alive header for XCTest purposes class HelloWorldKeepAliveHandler: HTTPRequestHandling { - func handle(request: HTTPRequest, response: HTTPResponseWriter ) -> HTTPBodyProcessing { + func handle(request: HTTPRequest, response: HTTPResponseWriter, queue: DispatchQueue ) -> HTTPBodyProcessing { //Assume the router gave us the right request - at least for now response.writeHeader(status: .ok, headers: [ "Transfer-Encoding": "chunked", diff --git a/Tests/HTTPTests/Helpers/OkHandler.swift b/Tests/HTTPTests/Helpers/OkHandler.swift index 0fc53c8..e83f2a1 100644 --- a/Tests/HTTPTests/Helpers/OkHandler.swift +++ b/Tests/HTTPTests/Helpers/OkHandler.swift @@ -11,7 +11,7 @@ import HTTP /// Simple `HTTPRequestHandler` that returns 200: OK without a body class OkHandler: HTTPRequestHandling { - func handle(request: HTTPRequest, response: HTTPResponseWriter ) -> HTTPBodyProcessing { + func handle(request: HTTPRequest, response: HTTPResponseWriter, queue: DispatchQueue ) -> HTTPBodyProcessing { //Assume the router gave us the right request - at least for now response.writeHeader(status: .ok, headers: ["Transfer-Encoding": "chunked", "X-foo": "bar"]) return .discardBody diff --git a/Tests/HTTPTests/Helpers/SimpleResponseCreator.swift b/Tests/HTTPTests/Helpers/SimpleResponseCreator.swift index 5134d31..c5785eb 100644 --- a/Tests/HTTPTests/Helpers/SimpleResponseCreator.swift +++ b/Tests/HTTPTests/Helpers/SimpleResponseCreator.swift @@ -32,7 +32,7 @@ public class SimpleResponseCreator: HTTPRequestHandling { var buffer = Data() - public func handle(request: HTTPRequest, response: HTTPResponseWriter ) -> HTTPBodyProcessing { + public func handle(request: HTTPRequest, response: HTTPResponseWriter, queue: DispatchQueue ) -> HTTPBodyProcessing { return .processBody { (chunk, stop) in switch chunk { case .chunk(let data, let finishedProcessing): diff --git a/Tests/HTTPTests/Helpers/TestResponseResolver.swift b/Tests/HTTPTests/Helpers/TestResponseResolver.swift index 7c119c5..3d6f1f7 100644 --- a/Tests/HTTPTests/Helpers/TestResponseResolver.swift +++ b/Tests/HTTPTests/Helpers/TestResponseResolver.swift @@ -10,6 +10,25 @@ import Foundation import Dispatch import HTTP +#if true + // FIXME: `HTTPResponseWriter` is not a protocol anymore, hence we can't + // mock it. + class TestResponseResolver { + // Just to please the compiler. Doesn't actually work. + + var response: (status: HTTPResponseStatus, headers: HTTPHeaders)? + var responseBody: HTTPResponseBody? + + init(request: HTTPRequest, requestBody: Data) { + } + + func resolveHandler(_ handler: HTTPRequestHandler) { + } + + } + +#else + /// Acts as a fake/mock `HTTPServer` so we can write XCTests without having to worry about Sockets and such class TestResponseResolver: HTTPResponseWriter { let request: HTTPRequest @@ -95,3 +114,5 @@ class TestResponseResolver: HTTPResponseWriter { fatalError("abort called, not sure what to do with it") } } +#endif + diff --git a/Tests/HTTPTests/Helpers/UnchunkedHelloWorldHandler.swift b/Tests/HTTPTests/Helpers/UnchunkedHelloWorldHandler.swift index 73eef30..ef6df93 100644 --- a/Tests/HTTPTests/Helpers/UnchunkedHelloWorldHandler.swift +++ b/Tests/HTTPTests/Helpers/UnchunkedHelloWorldHandler.swift @@ -11,7 +11,7 @@ import HTTP /// Simple `HTTPRequestHandler` that prints "Hello, World" as per K&R class UnchunkedHelloWorldHandler: HTTPRequestHandling { - func handle(request: HTTPRequest, response: HTTPResponseWriter ) -> HTTPBodyProcessing { + func handle(request: HTTPRequest, response: HTTPResponseWriter, queue: DispatchQueue ) -> HTTPBodyProcessing { //Assume the router gave us the right request - at least for now let responseString = "Hello, World!" response.writeHeader(status: .ok, headers: [.contentLength: "\(responseString.lengthOfBytes(using: .utf8))"]) diff --git a/Tests/HTTPTests/ServerTests.swift b/Tests/HTTPTests/ServerTests.swift index 3ca9df4..7fd748b 100644 --- a/Tests/HTTPTests/ServerTests.swift +++ b/Tests/HTTPTests/ServerTests.swift @@ -368,7 +368,7 @@ class ServerTests: XCTestCase { } } - + #if false // we have no PoCSocketSimpleServer func testRequestLargeEchoEndToEnd() { let receivedExpectation = self.expectation(description: "Received web response \(#function)") @@ -425,7 +425,9 @@ class ServerTests: XCTestCase { XCTFail("Error listening on port \(0): \(error). Use server.failed(callback:) to handle") } } - + #endif + + #if false // we have no PoCSocketSimpleServer func testRequestLargePostHelloWorld() { let receivedExpectation = self.expectation(description: "Received web response \(#function)") @@ -522,6 +524,7 @@ class ServerTests: XCTestCase { XCTFail("Error listening on port \(0): \(error). Use server.failed(callback:) to handle") } } + #endif static var allTests = [ ("testEcho", testEcho), @@ -533,8 +536,10 @@ class ServerTests: XCTestCase { ("testSimpleHelloEndToEnd", testSimpleHelloEndToEnd), ("testRequestEchoEndToEnd", testRequestEchoEndToEnd), ("testRequestKeepAliveEchoEndToEnd", testRequestKeepAliveEchoEndToEnd), +/* ("testRequestLargeEchoEndToEnd", testRequestLargeEchoEndToEnd), ("testExplicitCloseConnections", testExplicitCloseConnections), ("testRequestLargePostHelloWorld", testRequestLargePostHelloWorld), + */ ] } From e1425d0447aa0fa58339b882ca48c7e2c8b6a871 Mon Sep 17 00:00:00 2001 From: Helge Hess Date: Fri, 10 Nov 2017 17:45:36 +0100 Subject: [PATCH 07/14] Fix Linux compile Import Dispatch explicitly, some SOCK_STREAM weirdness. --- .gitignore | 2 ++ Sources/HTTP/HTTPCommon.swift | 2 +- Sources/HTTP/HTTPConnection.swift | 2 +- Sources/HTTP/HTTPServer.swift | 7 +++++-- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 4e1432a..6712861 100644 --- a/.gitignore +++ b/.gitignore @@ -74,3 +74,5 @@ fastlane/test_output /.idea /Package.pins docs +.build-linux + diff --git a/Sources/HTTP/HTTPCommon.swift b/Sources/HTTP/HTTPCommon.swift index 10e051f..526f975 100644 --- a/Sources/HTTP/HTTPCommon.swift +++ b/Sources/HTTP/HTTPCommon.swift @@ -6,7 +6,7 @@ // See http://swift.org/LICENSE.txt for license information // -import Foundation +import Dispatch /// Typealias for a closure that handles an incoming HTTP request /// The following is an example of an echo `HTTPRequestHandler` that returns the request it receives as a response: diff --git a/Sources/HTTP/HTTPConnection.swift b/Sources/HTTP/HTTPConnection.swift index 8696468..42b7743 100644 --- a/Sources/HTTP/HTTPConnection.swift +++ b/Sources/HTTP/HTTPConnection.swift @@ -89,7 +89,7 @@ internal class HTTPConnection : CustomStringConvertible { private func closeReadSide() { guard isReadSideOpen else { return } isReadSideOpen = false - shutdown(channel.fileDescriptor, SHUT_RD) + shutdown(channel.fileDescriptor, Int32(SHUT_RD)) } private var isSuspended = true // started in suspended state diff --git a/Sources/HTTP/HTTPServer.swift b/Sources/HTTP/HTTPServer.swift index bec0091..51d84a8 100644 --- a/Sources/HTTP/HTTPServer.swift +++ b/Sources/HTTP/HTTPServer.swift @@ -231,8 +231,11 @@ public class HTTPServer { // TODO: We should constraint T to a protocol adopted by sockaddr_in/in6/un // like in Noze.io. // FIXME: This doesn't work right for AF_LOCAL/sockaddr_un + #if os(Linux) + let SOCK_STREAM = Glibc.SOCK_STREAM.rawValue + #endif - let fd = socket(AF_INET, SOCK_STREAM, 0) + let fd = socket(AF_INET, Int32(SOCK_STREAM), 0) var error = errno if fd == -1 { throw Error.socketError(.setupFailed(error)) } @@ -307,7 +310,7 @@ fileprivate extension sockaddr_in { var port : Int { #if os(Linux) - return Int(ntohs(addr_in.sin_port)) + return Int(ntohs(sin_port)) #else return Int(Int(OSHostByteOrder()) == OSLittleEndian ? sin_port.bigEndian : sin_port) From ee4b2171e1e411f8a73a4e3f1b1d9dcf6126055e Mon Sep 17 00:00:00 2001 From: Helge Hess Date: Fri, 10 Nov 2017 17:48:40 +0100 Subject: [PATCH 08/14] Add queue to README demo handlers --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index a4a6143..4d9b315 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ The following code implements a very simple "Hello World!" server: import Foundation import HTTP -func hello(request: HTTPRequest, response: HTTPResponseWriter ) -> HTTPBodyProcessing { +func hello(request: HTTPRequest, response: HTTPResponseWriter, queue: DispatchQueue ) -> HTTPBodyProcessing { response.writeHeader(status: .ok) response.writeBody("Hello, World!") response.done() @@ -36,7 +36,7 @@ The following code implements a very simple Echo server that responds with the c import Foundation import HTTP -func echo(request: HTTPRequest, response: HTTPResponseWriter ) -> HTTPBodyProcessing { +func echo(request: HTTPRequest, response: HTTPResponseWriter, queue: DispatchQueue ) -> HTTPBodyProcessing { response.writeHeader(status: .ok) return .processBody { (chunk, stop) in switch chunk { From 57ff29916ac15f1a1256d71992545db722bf4c47 Mon Sep 17 00:00:00 2001 From: Helge Hess Date: Fri, 10 Nov 2017 18:03:18 +0100 Subject: [PATCH 09/14] Drop `print` in start ... debug log, remove. --- Sources/HTTP/HTTPServer.swift | 3 --- 1 file changed, 3 deletions(-) diff --git a/Sources/HTTP/HTTPServer.swift b/Sources/HTTP/HTTPServer.swift index 51d84a8..57a63af 100644 --- a/Sources/HTTP/HTTPServer.swift +++ b/Sources/HTTP/HTTPServer.swift @@ -101,9 +101,6 @@ public class HTTPServer { close(fd) throw Error.socketError(.listenFailed(error)) } - - // TBD: - print("Listening on:", boundAddress ?? "-") } public func stop() { From 6565e6ec366b0a42a501ea3d50a6c8cbc0c1447d Mon Sep 17 00:00:00 2001 From: Carl Brown Date: Wed, 15 Nov 2017 15:34:27 -0600 Subject: [PATCH 10/14] Gets the simple end-to-end tests running on Mac These tests pass on Linux if run one at a time, but there's some issue so if you run them all, the first test passes and the second one hangs. Still need to fix that. --- Sources/HTTP/HTTPServer.swift | 2 +- .../Helpers/AbortAndSendHelloHandler.swift | 1 + Tests/HTTPTests/Helpers/EchoHandler.swift | 1 + .../HTTPTests/Helpers/HelloWorldHandler.swift | 1 + .../Helpers/HelloWorldKeepAliveHandler.swift | 1 + Tests/HTTPTests/Helpers/OkHandler.swift | 2 + .../Helpers/SimpleResponseCreator.swift | 1 + .../Helpers/TestResponseResolver.swift | 118 ------------------ .../Helpers/UnchunkedHelloWorldHandler.swift | 1 + Tests/HTTPTests/ServerTests.swift | 80 +++--------- 10 files changed, 24 insertions(+), 184 deletions(-) delete mode 100644 Tests/HTTPTests/Helpers/TestResponseResolver.swift diff --git a/Sources/HTTP/HTTPServer.swift b/Sources/HTTP/HTTPServer.swift index 57a63af..9bc1da2 100644 --- a/Sources/HTTP/HTTPServer.swift +++ b/Sources/HTTP/HTTPServer.swift @@ -122,7 +122,7 @@ public class HTTPServer { public var port : Int { guard let boundAddress = boundAddress else { return -1 } - return Int(boundAddress.sin_port.littleEndian) + return Int(boundAddress.port) } diff --git a/Tests/HTTPTests/Helpers/AbortAndSendHelloHandler.swift b/Tests/HTTPTests/Helpers/AbortAndSendHelloHandler.swift index 717b7fe..e0e97c2 100644 --- a/Tests/HTTPTests/Helpers/AbortAndSendHelloHandler.swift +++ b/Tests/HTTPTests/Helpers/AbortAndSendHelloHandler.swift @@ -7,6 +7,7 @@ // import Foundation +import Dispatch import HTTP /// Simple `HTTPRequestHandler` that prints "Hello, World" as per K&R diff --git a/Tests/HTTPTests/Helpers/EchoHandler.swift b/Tests/HTTPTests/Helpers/EchoHandler.swift index fd3b1d0..cabe234 100644 --- a/Tests/HTTPTests/Helpers/EchoHandler.swift +++ b/Tests/HTTPTests/Helpers/EchoHandler.swift @@ -7,6 +7,7 @@ // import Foundation +import Dispatch import HTTP /// Simple `HTTPRequestHandler` that just echoes back whatever input it gets diff --git a/Tests/HTTPTests/Helpers/HelloWorldHandler.swift b/Tests/HTTPTests/Helpers/HelloWorldHandler.swift index 01ff0d5..e98ebb3 100644 --- a/Tests/HTTPTests/Helpers/HelloWorldHandler.swift +++ b/Tests/HTTPTests/Helpers/HelloWorldHandler.swift @@ -7,6 +7,7 @@ // import Foundation +import Dispatch import HTTP /// Simple `HTTPRequestHandler` that prints "Hello, World" as per K&R diff --git a/Tests/HTTPTests/Helpers/HelloWorldKeepAliveHandler.swift b/Tests/HTTPTests/Helpers/HelloWorldKeepAliveHandler.swift index 0b5933f..83b5ad8 100644 --- a/Tests/HTTPTests/Helpers/HelloWorldKeepAliveHandler.swift +++ b/Tests/HTTPTests/Helpers/HelloWorldKeepAliveHandler.swift @@ -7,6 +7,7 @@ // import Foundation +import Dispatch import HTTP /// `HelloWorldRequestHandler` that sets the keep alive header for XCTest purposes diff --git a/Tests/HTTPTests/Helpers/OkHandler.swift b/Tests/HTTPTests/Helpers/OkHandler.swift index e83f2a1..279d8db 100644 --- a/Tests/HTTPTests/Helpers/OkHandler.swift +++ b/Tests/HTTPTests/Helpers/OkHandler.swift @@ -7,6 +7,7 @@ // import Foundation +import Dispatch import HTTP /// Simple `HTTPRequestHandler` that returns 200: OK without a body @@ -14,6 +15,7 @@ class OkHandler: HTTPRequestHandling { func handle(request: HTTPRequest, response: HTTPResponseWriter, queue: DispatchQueue ) -> HTTPBodyProcessing { //Assume the router gave us the right request - at least for now response.writeHeader(status: .ok, headers: ["Transfer-Encoding": "chunked", "X-foo": "bar"]) + response.done() return .discardBody } } diff --git a/Tests/HTTPTests/Helpers/SimpleResponseCreator.swift b/Tests/HTTPTests/Helpers/SimpleResponseCreator.swift index c5785eb..a7f61e2 100644 --- a/Tests/HTTPTests/Helpers/SimpleResponseCreator.swift +++ b/Tests/HTTPTests/Helpers/SimpleResponseCreator.swift @@ -12,6 +12,7 @@ */ import Foundation +import Dispatch import HTTP /// Simple block-based wrapper to create a `HTTPRequestHandler`. Normally used during XCTests diff --git a/Tests/HTTPTests/Helpers/TestResponseResolver.swift b/Tests/HTTPTests/Helpers/TestResponseResolver.swift deleted file mode 100644 index 3d6f1f7..0000000 --- a/Tests/HTTPTests/Helpers/TestResponseResolver.swift +++ /dev/null @@ -1,118 +0,0 @@ -// This source file is part of the Swift.org Server APIs open source project -// -// Copyright (c) 2017 Swift Server API project authors -// Licensed under Apache License v2.0 with Runtime Library Exception -// -// See http://swift.org/LICENSE.txt for license information -// - -import Foundation -import Dispatch -import HTTP - -#if true - // FIXME: `HTTPResponseWriter` is not a protocol anymore, hence we can't - // mock it. - class TestResponseResolver { - // Just to please the compiler. Doesn't actually work. - - var response: (status: HTTPResponseStatus, headers: HTTPHeaders)? - var responseBody: HTTPResponseBody? - - init(request: HTTPRequest, requestBody: Data) { - } - - func resolveHandler(_ handler: HTTPRequestHandler) { - } - - } - -#else - -/// Acts as a fake/mock `HTTPServer` so we can write XCTests without having to worry about Sockets and such -class TestResponseResolver: HTTPResponseWriter { - let request: HTTPRequest - let requestBody: DispatchData - - var response: (status: HTTPResponseStatus, headers: HTTPHeaders)? - var responseBody: HTTPResponseBody? - - ///Flag to track whether our handler has told us not to call it anymore - private let _shouldStopProcessingBodyLock = DispatchSemaphore(value: 1) - private var _shouldStopProcessingBody: Bool = false - private var shouldStopProcessingBody: Bool { - get { - _shouldStopProcessingBodyLock.wait() - defer { - _shouldStopProcessingBodyLock.signal() - } - return _shouldStopProcessingBody - } - set { - _shouldStopProcessingBodyLock.wait() - defer { - _shouldStopProcessingBodyLock.signal() - } - _shouldStopProcessingBody = newValue - } - } - - init(request: HTTPRequest, requestBody: Data) { - self.request = request - self.requestBody = requestBody.withUnsafeBytes { (ptr: UnsafePointer) -> DispatchData in - #if swift(>=4.0) - return DispatchData(bytes: UnsafeRawBufferPointer(start: ptr, count: requestBody.count)) - #else - return DispatchData(bytes: UnsafeBufferPointer(start: ptr, count: requestBody.count)) - #endif - } - } - - func resolveHandler(_ handler: HTTPRequestHandler) { - let chunkHandler = handler(request, self) - if shouldStopProcessingBody { - return - } - switch chunkHandler { - case .processBody(let handler): - _shouldStopProcessingBodyLock.wait() - handler(.chunk(data: self.requestBody, finishedProcessing: {self._shouldStopProcessingBodyLock.signal()}), &_shouldStopProcessingBody) - var dummy = false - handler(.end, &dummy) - case .discardBody: - break - } - } - - func writeHeader(status: HTTPResponseStatus, headers: HTTPHeaders, completion: @escaping (Result) -> Void) { - self.response = (status: status, headers: headers) - completion(.ok) - } - - func writeTrailer(_ trailers: HTTPHeaders, completion: @escaping (Result) -> Void) { - fatalError("Not implemented") - } - - func writeBody(_ data: UnsafeHTTPResponseBody, completion: @escaping (Result) -> Void) { - if let data = data as? HTTPResponseBody { - self.responseBody = data - } else { - self.responseBody = data.withUnsafeBytes { Data($0) } - } - completion(.ok) - } - - func done(completion: @escaping (Result) -> Void) { - completion(.ok) - } - func done() /* convenience */ { - done { _ in - } - } - - func abort() { - fatalError("abort called, not sure what to do with it") - } -} -#endif - diff --git a/Tests/HTTPTests/Helpers/UnchunkedHelloWorldHandler.swift b/Tests/HTTPTests/Helpers/UnchunkedHelloWorldHandler.swift index ef6df93..f14a6f0 100644 --- a/Tests/HTTPTests/Helpers/UnchunkedHelloWorldHandler.swift +++ b/Tests/HTTPTests/Helpers/UnchunkedHelloWorldHandler.swift @@ -7,6 +7,7 @@ // import Foundation +import Dispatch import HTTP /// Simple `HTTPRequestHandler` that prints "Hello, World" as per K&R diff --git a/Tests/HTTPTests/ServerTests.swift b/Tests/HTTPTests/ServerTests.swift index 7fd748b..1abc7e0 100644 --- a/Tests/HTTPTests/ServerTests.swift +++ b/Tests/HTTPTests/ServerTests.swift @@ -12,59 +12,14 @@ import Dispatch @testable import HTTP class ServerTests: XCTestCase { - func testResponseOK() { - let request = HTTPRequest(method: .get, target: "/echo", httpVersion: HTTPVersion(major: 1, minor: 1), headers: ["X-foo": "bar"]) - let resolver = TestResponseResolver(request: request, requestBody: Data()) - resolver.resolveHandler(EchoHandler().handle) - XCTAssertNotNil(resolver.response) - XCTAssertNotNil(resolver.responseBody) - XCTAssertEqual(HTTPResponseStatus.ok.code, resolver.response?.status.code ?? 0) - } - - func testEcho() { - let testString = "This is a test" - let request = HTTPRequest(method: .post, target: "/echo", httpVersion: HTTPVersion(major: 1, minor: 1), headers: ["X-foo": "bar"]) - let resolver = TestResponseResolver(request: request, requestBody: testString.data(using: .utf8)!) - resolver.resolveHandler(EchoHandler().handle) - XCTAssertNotNil(resolver.response) - XCTAssertNotNil(resolver.responseBody) - XCTAssertEqual(HTTPResponseStatus.ok.code, resolver.response?.status.code ?? 0) - XCTAssertEqual(testString, resolver.responseBody?.withUnsafeBytes { String(bytes: $0, encoding: .utf8) } ?? "Nil") - } - - func testHello() { - let request = HTTPRequest(method: .get, target: "/helloworld", httpVersion: HTTPVersion(major: 1, minor: 1), headers: ["X-foo": "bar"]) - let resolver = TestResponseResolver(request: request, requestBody: Data()) - resolver.resolveHandler(HelloWorldHandler().handle) - XCTAssertNotNil(resolver.response) - XCTAssertNotNil(resolver.responseBody) - XCTAssertEqual(HTTPResponseStatus.ok.code, resolver.response?.status.code ?? 0) - XCTAssertEqual("Hello, World!", resolver.responseBody?.withUnsafeBytes { String(bytes: $0, encoding: .utf8) } ?? "Nil") - } - - func testSimpleHello() { - let request = HTTPRequest(method: .get, target: "/helloworld", httpVersion: HTTPVersion(major: 1, minor: 1), headers: ["X-foo": "bar"]) - let resolver = TestResponseResolver(request: request, requestBody: Data()) - let simpleHelloWebApp = SimpleResponseCreator { (_, body) -> SimpleResponseCreator.Response in - return SimpleResponseCreator.Response( - status: .ok, - headers: ["X-foo": "bar"], - body: "Hello, World!".data(using: .utf8)! - ) - } - resolver.resolveHandler(simpleHelloWebApp.handle) - XCTAssertNotNil(resolver.response) - XCTAssertNotNil(resolver.responseBody) - XCTAssertEqual(HTTPResponseStatus.ok.code, resolver.response?.status.code ?? 0) - XCTAssertEqual("Hello, World!", resolver.responseBody?.withUnsafeBytes { String(bytes: $0, encoding: .utf8) } ?? "Nil") - } - + func testOkEndToEnd() { let receivedExpectation = self.expectation(description: "Received web response \(#function)") let server = HTTPServer() do { try server.start(port: 0, handler: OkHandler().handle) + XCTAssertNotEqual(0, server.port) let session = URLSession(configuration: .default) let url = URL(string: "http://localhost:\(server.port)/")! print("Test \(#function) on port \(server.port)") @@ -368,12 +323,11 @@ class ServerTests: XCTestCase { } } - #if false // we have no PoCSocketSimpleServer func testRequestLargeEchoEndToEnd() { let receivedExpectation = self.expectation(description: "Received web response \(#function)") //Use a small chunk size to make sure that we're testing multiple HTTPBodyHandler calls - let chunkSize = 1024 + //FIXME: Make this settable for testing //let chunkSize = 1024 // Get a file we know exists let executableURL = URL(fileURLWithPath: CommandLine.arguments[0]) @@ -396,9 +350,9 @@ class ServerTests: XCTestCase { let testData = Data(testDataLong) - let server = PoCSocketSimpleServer() + let server = HTTPServer() do { - try server.start(port: 0, maxReadLength: chunkSize, handler: EchoHandler().handle) + try server.start(port: 0, handler: EchoHandler().handle) let session = URLSession(configuration: .default) let url = URL(string: "http://localhost:\(server.port)/echo")! print("Test \(#function) on port \(server.port)") @@ -425,10 +379,9 @@ class ServerTests: XCTestCase { XCTFail("Error listening on port \(0): \(error). Use server.failed(callback:) to handle") } } - #endif - #if false // we have no PoCSocketSimpleServer - func testRequestLargePostHelloWorld() { + #if false //FIXME: Abort & Stop handling + func NOtestRequestLargePostHelloWorld() { let receivedExpectation = self.expectation(description: "Received web response \(#function)") //Use a small chunk size to make sure that we stop after one HTTPBodyHandler call @@ -450,10 +403,10 @@ class ServerTests: XCTestCase { let executableLength = testExecutableData.count - let server = PoCSocketSimpleServer() + let server = HTTPServer() do { let testHandler = AbortAndSendHelloHandler() - try server.start(port: 0, maxReadLength: chunkSize, handler: testHandler.handle) + try server.start(port: 0, handler: testHandler.handle) let session = URLSession(configuration: .default) let url = URL(string: "http://localhost:\(server.port)/echo")! print("Test \(#function) on port \(server.port)") @@ -482,15 +435,16 @@ class ServerTests: XCTestCase { XCTFail("Error listening on port \(0): \(error). Use server.failed(callback:) to handle") } } + #endif - - func testExplicitCloseConnections() { + #if false //No timeout right now, so can't run this test + func NOtestExplicitCloseConnections() { let expectation = self.expectation(description: "0 Open Connection") - let server = PoCSocketSimpleServer() + let server = HTTPServer() let keepAliveTimeout = 0.1 do { - try server.start(port: 0, keepAliveTimeout: keepAliveTimeout, handler: OkHandler().handle) + try server.start(port: 0, handler: OkHandler().handle) let session = URLSession(configuration: .default) let url1 = URL(string: "http://localhost:\(server.port)")! @@ -527,17 +481,13 @@ class ServerTests: XCTestCase { #endif static var allTests = [ - ("testEcho", testEcho), - ("testHello", testHello), - ("testSimpleHello", testSimpleHello), - ("testResponseOK", testResponseOK), ("testOkEndToEnd", testOkEndToEnd), ("testHelloEndToEnd", testHelloEndToEnd), ("testSimpleHelloEndToEnd", testSimpleHelloEndToEnd), ("testRequestEchoEndToEnd", testRequestEchoEndToEnd), ("testRequestKeepAliveEchoEndToEnd", testRequestKeepAliveEchoEndToEnd), -/* ("testRequestLargeEchoEndToEnd", testRequestLargeEchoEndToEnd), +/* These tests aren't working, yet ("testExplicitCloseConnections", testExplicitCloseConnections), ("testRequestLargePostHelloWorld", testRequestLargePostHelloWorld), */ From 69302c398f12343b16aecd163ff8afac0401e707 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Helge=20He=C3=9F?= Date: Thu, 7 Dec 2017 09:30:43 +0100 Subject: [PATCH 11/14] Do not use seperate/concurrent queue for accept MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ... this was utter non-sense, no idea why I did this :-) The listen socket is already non-blocking and it already deals w/ nio accept. 🤦‍♀️ --- Sources/HTTP/HTTPServer.swift | 26 ++++++-------------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/Sources/HTTP/HTTPServer.swift b/Sources/HTTP/HTTPServer.swift index 9bc1da2..b3ac8bd 100644 --- a/Sources/HTTP/HTTPServer.swift +++ b/Sources/HTTP/HTTPServer.swift @@ -45,13 +45,6 @@ public class HTTPServer { } return queues }() - - /// Queue used to run `accept()` on. - /// `accept()` can take time (AFAIK there is no NIO variant of it), run it - /// multithreaded. GCD will decide the thread count and such for us. - private let acceptQueue = - DispatchQueue(label: "de.zeezide.swift.server.http.accept", - attributes: .concurrent) public init() { } @@ -83,10 +76,9 @@ public class HTTPServer { /* Setup Listen Source */ listenSource = DispatchSource.makeReadSource(fileDescriptor: fd, - queue: acceptQueue) + queue: queue) listenSource?.setEventHandler { - // if we don't pass it over, we need to synchronize access to the fd! - self.handleListenEvent(on: fd, handler: handler, localAddress: address) + self.handleListenEvent(on: fd, handler: handler) } listenSource?.resume() @@ -129,11 +121,9 @@ public class HTTPServer { private(set) public var connectionCount : Int32 = 0 - private func handleListenEvent(on fd : Int32, - handler : @escaping HTTPRequestHandler, - localAddress : sockaddr_in) + private func handleListenEvent(on fd : Int32, + handler : @escaping HTTPRequestHandler) { - // Running in the accept-queue (concurrent) // TBD: // - what are we doing with accept errors?? // - do we need a 'shutdown' mode? I don't think so, the accept will just @@ -169,16 +159,12 @@ public class HTTPServer { } #endif - // We need to synchronize on the queue to grab a new base queue, and to - // register out connection w/ the server (though that has dubious value). - queue.async { - self.handleAcceptedSocket(newFD, handler: handler) - } + self.handleAcceptedSocket(on: newFD, handler: handler) } while true // we break when we would block or on error } - private func handleAcceptedSocket(_ fd : Int32, + private func handleAcceptedSocket(on fd : Int32, handler : @escaping HTTPRequestHandler) { if acceptCount == Int.max { acceptCount = 0 } // wow, this is stable code! From 633a4130b8b5f93403d113adac17862bf2c27224 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Helge=20He=C3=9F?= Date: Thu, 7 Dec 2017 09:45:18 +0100 Subject: [PATCH 12/14] Move base-queue selection to optional closure Remove the specific algorithm for selecting a base queue and use an optional closure to return one, as suggested by Johannes. By default this will not assign a target queue and let GCD deal with this. See related thread on PR #96. --- Sources/HTTP/HTTPServer.swift | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/Sources/HTTP/HTTPServer.swift b/Sources/HTTP/HTTPServer.swift index b3ac8bd..bf7f473 100644 --- a/Sources/HTTP/HTTPServer.swift +++ b/Sources/HTTP/HTTPServer.swift @@ -27,24 +27,26 @@ public class HTTPServer { case socketError(SocketError) } + /// The address ther server was bound to locally. If no port was passed in, + /// this will hold the kernel selected port of the server. private var boundAddress : sockaddr_in? = nil + /// The source connected to the server socket. This notifies the server if new + /// connections come in. The server will then accept the socket and spawn a + /// new connection. private var listenSource : DispatchSourceRead? internal let queue = DispatchQueue(label: "de.zeezide.swift.server.http.server") private var connections = [ HTTPConnection ]() - private var acceptCount = 0 - private let handlerBaseQueues : [ DispatchQueue ] = { - var queues = [ DispatchQueue ]() - for i in 0.. DispatchQueue)? = nil + // TODO: move to Options public init() { } @@ -107,7 +109,7 @@ public class HTTPServer { } boundAddress = nil - queue.async { + queue.async { // TBD: this may not be necessary self.connections.forEach { $0.serverWillStop() } } } @@ -170,20 +172,17 @@ public class HTTPServer { if acceptCount == Int.max { acceptCount = 0 } // wow, this is stable code! else { acceptCount += 1 } - // Simple round robin. - let baseQueue = handlerBaseQueues[acceptCount % handlerBaseQueues.count] - // Create a new queue, but share the base queue (and its thread). let connectionQueue = DispatchQueue(label : "de.zeezide.swift.server.http.con\(acceptCount)", - target : baseQueue) + target : selectBaseQueue?() ?? nil) // Create connection, register and start it. let connection = HTTPConnection(fd: fd, queue: connectionQueue, requestHandler: handler, server: self) connections.append(connection) #if os(Linux) - // TODO + // TODO: maybe just use NSLock? Or pthread_mutex. #else OSAtomicIncrement32(&connectionCount) #endif @@ -193,6 +192,7 @@ public class HTTPServer { } internal func _connectionIsDone(_ connection: HTTPConnection) { + // Called from arbitrary queue (i.e. the connection queue) queue.async { guard let idx = self.connections.index(where: { $0 === connection }) else { assert(false, "did not find finished connection: \(connection)") From 033e235dc2eaf541da2e2ca480ea70b471ba88e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Helge=20He=C3=9F?= Date: Mon, 18 Dec 2017 16:07:54 +0100 Subject: [PATCH 13/14] Replace ugly type dependency w/ closure The connection doesn't need to know about the server anymore. Instead we pass in a closure to be called when the connection tears down. Note: there is an intentional cycle between server and connection. It will be broken when the connection is done. --- Sources/HTTP/HTTPConnection.swift | 13 +++++++------ Sources/HTTP/HTTPServer.swift | 6 ++++-- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/Sources/HTTP/HTTPConnection.swift b/Sources/HTTP/HTTPConnection.swift index 42b7743..d961873 100644 --- a/Sources/HTTP/HTTPConnection.swift +++ b/Sources/HTTP/HTTPConnection.swift @@ -27,9 +27,9 @@ import CHTTPParser /// internal class HTTPConnection : CustomStringConvertible { - /// The `HTTPServer` which created the connection. If the connection shuts - /// down, it will unregister from that server. - private let server : HTTPServer + /// A closure that gets called if the connection is finished and tears down. + /// The Server will add this to unregister the connection. + private var doneCB : (( HTTPConnection ) -> ())? /// Serialized access to the state of the connection. Everything needs to /// happen on this queue. It is the `DispatchQueue.main` for a handler. @@ -60,9 +60,9 @@ internal class HTTPConnection : CustomStringConvertible { internal init(fd : Int32, queue : DispatchQueue, requestHandler : @escaping HTTPRequestHandler, - server : HTTPServer) + done doneCB : @escaping ( HTTPConnection ) -> ()) { - self.server = server + self.doneCB = doneCB self.queue = queue self.requestHandler = requestHandler @@ -274,7 +274,8 @@ internal class HTTPConnection : CustomStringConvertible { private func _connectionHasFinished() { guard !didFinish else { return } didFinish = true - server._connectionIsDone(self) + doneCB?(self) + doneCB = nil // free retain cycle } diff --git a/Sources/HTTP/HTTPServer.swift b/Sources/HTTP/HTTPServer.swift index a993c08..bf2d335 100644 --- a/Sources/HTTP/HTTPServer.swift +++ b/Sources/HTTP/HTTPServer.swift @@ -197,7 +197,9 @@ public class HTTPServer { // Create connection, register and start it. let connection = HTTPConnection(fd: fd, queue: connectionQueue, - requestHandler: handler, server: self) + requestHandler: handler) { + self._connectionIsDone($0) + } connections.append(connection) #if os(Linux) @@ -211,7 +213,7 @@ public class HTTPServer { connection.resume() // start reading from socket } - internal func _connectionIsDone(_ connection: HTTPConnection) { + private func _connectionIsDone(_ connection: HTTPConnection) { // Called from arbitrary queue (i.e. the connection queue) queue.async { guard let idx = self.connections.index(where: { $0 === connection }) else { From 1dd9992715ca7df876944b6e7ebfd57052ad6416 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Helge=20He=C3=9F?= Date: Mon, 18 Dec 2017 16:40:12 +0100 Subject: [PATCH 14/14] Use assertionFailure() instead of assert(false) ... a little better I guess. --- Sources/HTTP/HTTPConnection.swift | 2 +- Sources/HTTP/HTTPServer.swift | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/HTTP/HTTPConnection.swift b/Sources/HTTP/HTTPConnection.swift index d961873..c64d915 100644 --- a/Sources/HTTP/HTTPConnection.swift +++ b/Sources/HTTP/HTTPConnection.swift @@ -255,7 +255,7 @@ internal class HTTPConnection : CustomStringConvertible { writers.remove(at: idx) } else { - assert(false, "did not find writer which went done: \(self)") + assertionFailure("did not find writer which went done: \(self)") } // activate next writer in the pipeline diff --git a/Sources/HTTP/HTTPServer.swift b/Sources/HTTP/HTTPServer.swift index bf2d335..dca44ee 100644 --- a/Sources/HTTP/HTTPServer.swift +++ b/Sources/HTTP/HTTPServer.swift @@ -217,7 +217,7 @@ public class HTTPServer { // Called from arbitrary queue (i.e. the connection queue) queue.async { guard let idx = self.connections.index(where: { $0 === connection }) else { - assert(false, "did not find finished connection: \(connection)") + assertionFailure("did not find finished connection: \(connection)") return }