From cdbbd04f8d1f7d1cfb6a764480a56a611d0abdda Mon Sep 17 00:00:00 2001 From: Tim Condon <0xTim@users.noreply.github.com> Date: Tue, 7 May 2024 18:57:19 -0500 Subject: [PATCH] Provide AsyncFileStreaming API (#3184) * Provide async stream implemetation * Add AsyncFileTests * Fix double/int bug * Allow the stream writer to complete the writer promise for async streams while cascading errors thrown from the stream callback to the promise. * Use the same ordering for both stores to the isComplete atomic in ChannelResponseBodyStream * Actually call the onCompleted() callback in asyncStreamFile() * Make sure the promise is never dropped on the floor in release builds (in which assertions like the one in ChannelResponseBodyStream's deinit aren't checked). * Make completion handler async * Use NIOFilesytemFoundationCompat * Fix imports * Make sure the NIO FileHandle is always closed on error. * Make sure the onCompleted() callback is always invoked in the error case even if trying to write the error indication to the stream fails * Migrate old ELF stream file function to use async version under the hood * Migrate FileMiddleware to async * Heavily revise ChannelResponseBodyStream's logic to eliminate races, atomics, and to handle errors and the promise completions more consistently. Especially affects async streaming, but eliminates error handling issues in non-async streaming as well. * Fix off-by-one error in FileIO.readFile(at:chunkSize:offset:byteCount:) async * Use XCTAssertNoThrow() to simplify several of the AsyncFileTests * Fix testSimpleETagHeaders test * Add async versions for XCTVapor and mark wait() calls with noasync * Fix XCTVapor error messages * Try and make tests async * Fix some warnings in tests * One less test running on 8080 * Fix the tests * Revert "Fix the tests" This reverts commit c98f0bf1516787ddc639f59a786e2f859cb492f9. * Hook up the response body callback for async streams * Remove a couple of instances of FileManager * Remove FileManager from AsyncFileTests * Fix default file * Fix the tests * Rework it to reduce all the returns * Update Sources/Vapor/Utilities/FileIO.swift Co-authored-by: Gwynne Raskind * PR Reviews * Fix merge issue: * Test the correct behaviour --------- Co-authored-by: Gwynne Raskind --- Package.swift | 4 +- Package@swift-5.9.swift | 4 +- .../Vapor/HTTP/Server/HTTPServerHandler.swift | 15 +- .../Server/HTTPServerResponseEncoder.swift | 96 +++-- Sources/Vapor/Middleware/FileMiddleware.swift | 72 ++-- Sources/Vapor/Response/Response+Body.swift | 16 +- Sources/Vapor/Utilities/FileIO.swift | 292 +++++++------ Sources/XCTVapor/XCTApplication.swift | 88 +++- Tests/VaporTests/AsyncFileTests.swift | 389 ++++++++++++++++++ Tests/VaporTests/ContentTests.swift | 4 +- Tests/VaporTests/FileTests.swift | 28 +- Tests/VaporTests/MiddlewareTests.swift | 8 +- Tests/VaporTests/PipelineTests.swift | 4 +- Tests/VaporTests/WebSocketTests.swift | 1 + 14 files changed, 795 insertions(+), 226 deletions(-) create mode 100644 Tests/VaporTests/AsyncFileTests.swift diff --git a/Package.swift b/Package.swift index 080c9aa4f7..aa763c6e28 100644 --- a/Package.swift +++ b/Package.swift @@ -34,7 +34,7 @@ let package = Package( .package(url: "https://github.com/swift-server/swift-backtrace.git", from: "1.1.1"), // Event-driven network application framework for high performance protocol servers & clients, non-blocking. - .package(url: "https://github.com/apple/swift-nio.git", from: "2.63.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.65.0"), // Bindings to OpenSSL-compatible libraries for TLS support in SwiftNIO .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"), @@ -92,8 +92,8 @@ let package = Package( .product(name: "WebSocketKit", package: "websocket-kit"), .product(name: "MultipartKit", package: "multipart-kit"), .product(name: "Atomics", package: "swift-atomics"), - .product(name: "_NIOFileSystem", package: "swift-nio"), + .product(name: "_NIOFileSystemFoundationCompat", package: "swift-nio"), ]), // Development diff --git a/Package@swift-5.9.swift b/Package@swift-5.9.swift index 39a222f00d..15b022d9c7 100644 --- a/Package@swift-5.9.swift +++ b/Package@swift-5.9.swift @@ -31,7 +31,7 @@ let package = Package( .package(url: "https://github.com/vapor/routing-kit.git", from: "4.9.0"), // Event-driven network application framework for high performance protocol servers & clients, non-blocking. - .package(url: "https://github.com/apple/swift-nio.git", from: "2.63.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.65.0"), // Bindings to OpenSSL-compatible libraries for TLS support in SwiftNIO .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"), @@ -90,8 +90,8 @@ let package = Package( .product(name: "WebSocketKit", package: "websocket-kit"), .product(name: "MultipartKit", package: "multipart-kit"), .product(name: "Atomics", package: "swift-atomics"), - .product(name: "_NIOFileSystem", package: "swift-nio"), + .product(name: "_NIOFileSystemFoundationCompat", package: "swift-nio"), ], swiftSettings: [.enableExperimentalFeature("StrictConcurrency=complete")] ), diff --git a/Sources/Vapor/HTTP/Server/HTTPServerHandler.swift b/Sources/Vapor/HTTP/Server/HTTPServerHandler.swift index d1dc30a0e4..57873cad6a 100644 --- a/Sources/Vapor/HTTP/Server/HTTPServerHandler.swift +++ b/Sources/Vapor/HTTP/Server/HTTPServerHandler.swift @@ -59,6 +59,10 @@ final class HTTPServerHandler: ChannelInboundHandler, RemovableChannelHandler { case .failure(let error): if case .stream(let stream) = response.body.storage { stream.callback(ErrorBodyStreamWriter(eventLoop: request.eventLoop, error: error)) + } else if case .asyncStream(let stream) = response.body.storage { + Task { + try? await stream.callback(ErrorBodyStreamWriter(eventLoop: request.eventLoop, error: error)) + } } handler.errorCaught(context: context, error: error) } @@ -77,10 +81,15 @@ final class HTTPServerHandler: ChannelInboundHandler, RemovableChannelHandler { } } -struct ErrorBodyStreamWriter: BodyStreamWriter { - var eventLoop: EventLoop - var error: Error +fileprivate struct ErrorBodyStreamWriter: BodyStreamWriter, AsyncBodyStreamWriter { + let eventLoop: EventLoop + let error: Error + func write(_ result: BodyStreamResult, promise: EventLoopPromise?) { promise?.fail(error) } + + func write(_ result: BodyStreamResult) async throws { + throw error + } } diff --git a/Sources/Vapor/HTTP/Server/HTTPServerResponseEncoder.swift b/Sources/Vapor/HTTP/Server/HTTPServerResponseEncoder.swift index bba4d8b9d0..76992b3d3e 100644 --- a/Sources/Vapor/HTTP/Server/HTTPServerResponseEncoder.swift +++ b/Sources/Vapor/HTTP/Server/HTTPServerResponseEncoder.swift @@ -1,7 +1,6 @@ import NIOCore import NIOHTTP1 import NIOConcurrencyHelpers -import Atomics final class HTTPServerResponseEncoder: ChannelOutboundHandler, RemovableChannelHandler { typealias OutboundIn = Response @@ -74,12 +73,21 @@ final class HTTPServerResponseEncoder: ChannelOutboundHandler, RemovableChannelH let channelStream = ChannelResponseBodyStream( context: context, handler: self, - promise: nil, + promise: promise, count: stream.count == -1 ? nil : stream.count ) - promise?.completeWithTask { - try await stream.callback(channelStream) + Task { + do { + try await stream.callback(channelStream) + // We assert in ChannelResponseBodyStream that either .end or .error gets sent, so once we + // get here the promise can be assumed to already be completed. However, just in case, succeed + // it here anyway. This guarantees we never leave the callback without completing the promise + // one way or the other in release builds. + promise?.succeed() + } catch { + promise?.fail(error) + } } } } @@ -100,13 +108,14 @@ private final class ChannelResponseBodyStream: BodyStreamWriter, AsyncBodyStream let handlerBox: NIOLoopBound let promise: EventLoopPromise? let count: Int? - let currentCount: ManagedAtomic - let isComplete: ManagedAtomic + let currentCount: NIOLoopBoundBox + let isComplete: NIOLockedValueBox let eventLoop: EventLoop enum Error: Swift.Error { case tooManyBytes case notEnoughBytes + case apiMisuse // tried to send a buffer or end indication after already ending or erroring the stream } init( @@ -115,52 +124,79 @@ private final class ChannelResponseBodyStream: BodyStreamWriter, AsyncBodyStream promise: EventLoopPromise?, count: Int? ) { + context.eventLoop.assertInEventLoop() + self.contextBox = .init(context, eventLoop: context.eventLoop) self.handlerBox = .init(handler, eventLoop: context.eventLoop) self.promise = promise self.count = count - self.currentCount = .init(0) + self.currentCount = .init(0, eventLoop: context.eventLoop) self.isComplete = .init(false) self.eventLoop = context.eventLoop } func write(_ result: BodyStreamResult) async throws { - // Explicitly adds the ELF because Swift 5.6 fails to infer the return type - try await self.eventLoop.flatSubmit { () -> EventLoopFuture in - let promise = self.eventLoop.makePromise(of: Void.self) - self.write(result, promise: promise) - return promise.futureResult - }.get() + let promise = self.eventLoop.makePromise(of: Void.self) + + self.eventLoop.execute { self.write(result, promise: promise) } + try await promise.futureResult.get() } + /// > Note: `self.promise` is the promise that completes the original write to `HTTPServerResponseEncoder` that + /// > triggers the streaming response; it should only be succeeded when the stream ends. The `promise` parameter + /// > of this method is specific to the particular invocation and signals that a buffer has finished writing or + /// > that the stream has been fully completed, and should always be completed or pending completion by the time + /// > this method returns. Both promises should be failed when an error occurs, unless otherwise specifically noted. func write(_ result: BodyStreamResult, promise: EventLoopPromise?) { + self.eventLoop.assertInEventLoop() // Only check in debug, just in case... + + func finishStream() { + self.isComplete.withLockedValue { $0 = true } + self.contextBox.value.fireUserInboundEventTriggered(HTTPServerResponseEncoder.ResponseEndSentEvent()) + // Don't forward the current promise (if any) to the write completion of the end-response signal, as we + // will be notified of errors through other paths and can get spurious I/O errors from this write that + // ought to be ignored. + self.contextBox.value.writeAndFlush(self.handlerBox.value.wrapOutboundOut(.end(nil)), promise: nil) + } + + // See https://github.com/vapor/vapor/issues/2976 for why we do some of these checks. switch result { case .buffer(let buffer): - // See: https://github.com/vapor/vapor/issues/2976 - self.contextBox.value.writeAndFlush(self.handlerBox.value.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: promise) - if let count = self.count, self.currentCount.wrappingIncrementThenLoad(by: buffer.readableBytes, ordering: .sequentiallyConsistent) > count { + guard !self.isComplete.withLockedValue({ $0 }) else { // Don't try to send data if we already ended + return promise?.fail(Error.apiMisuse) ?? () // self.promise is already completed, so fail the local one and bail + } + if let count = self.count, (self.currentCount.value + buffer.readableBytes) > count { self.promise?.fail(Error.tooManyBytes) - promise?.fail(Error.notEnoughBytes) + promise?.fail(Error.tooManyBytes) + } else { + self.currentCount.value += buffer.readableBytes + // Cascade the completion of the buffer write to the local promise (if any). + self.contextBox.value.writeAndFlush(self.handlerBox.value.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: promise) } case .end: - // See: https://github.com/vapor/vapor/issues/2976 - self.isComplete.store(true, ordering: .sequentiallyConsistent) - if let count = self.count, self.currentCount.load(ordering: .sequentiallyConsistent) < count { - self.promise?.fail(Error.notEnoughBytes) - promise?.fail(Error.notEnoughBytes) + if !self.isComplete.withLockedValue({ $0 }) { // Don't send the response end events more than once. + finishStream() + // check this only after sending the stream end; we want to make send that regardless + if let count = self.count, self.currentCount.value < count { + self.promise?.fail(Error.notEnoughBytes) + promise?.fail(Error.notEnoughBytes) + } else { + self.promise?.succeed() + promise?.succeed() + } + } else { + promise?.fail(Error.apiMisuse) // If we already ended, fail the local promise with API misuse } - self.contextBox.value.fireUserInboundEventTriggered(HTTPServerResponseEncoder.ResponseEndSentEvent()) - self.contextBox.value.writeAndFlush(self.handlerBox.value.wrapOutboundOut(.end(nil)), promise: promise) - self.promise?.succeed(()) case .error(let error): - self.isComplete.store(true, ordering: .relaxed) - self.contextBox.value.fireUserInboundEventTriggered(HTTPServerResponseEncoder.ResponseEndSentEvent()) - self.contextBox.value.writeAndFlush(self.handlerBox.value.wrapOutboundOut(.end(nil)), promise: promise) - self.promise?.fail(error) + if !self.isComplete.withLockedValue({ $0 }) { // Don't send the response end events more than once. + finishStream() + self.promise?.fail(error) + } + promise?.fail(error) // We want to fail the local promise regardless. Echo the error back. } } deinit { - assert(self.isComplete.load(ordering: .sequentiallyConsistent), "Response body stream writer deinitialized before .end or .error was sent.") + assert(self.isComplete.withLockedValue { $0 }, "Response body stream writer deinitialized before .end or .error was sent.") } } diff --git a/Sources/Vapor/Middleware/FileMiddleware.swift b/Sources/Vapor/Middleware/FileMiddleware.swift index f7a45b8ec7..d6e0880bcd 100644 --- a/Sources/Vapor/Middleware/FileMiddleware.swift +++ b/Sources/Vapor/Middleware/FileMiddleware.swift @@ -1,10 +1,11 @@ import Foundation import NIOCore +import _NIOFileSystem /// Serves static files from a public directory. /// /// `FileMiddleware` will default to `DirectoryConfig`'s working directory with `"/Public"` appended. -public final class FileMiddleware: Middleware { +public final class FileMiddleware: AsyncMiddleware { /// The public directory. Guaranteed to end with a slash. private let publicDirectory: String private let defaultFile: String? @@ -47,10 +48,10 @@ public final class FileMiddleware: Middleware { self.advancedETagComparison = advancedETagComparison } - public func respond(to request: Request, chainingTo next: Responder) -> EventLoopFuture { + public func respond(to request: Request, chainingTo next: any AsyncResponder) async throws -> Response { // make a copy of the percent-decoded path guard var path = request.url.path.removingPercentEncoding else { - return request.eventLoop.makeFailedFuture(Abort(.badRequest)) + throw Abort(.badRequest) } // path must be relative. @@ -58,51 +59,44 @@ public final class FileMiddleware: Middleware { // protect against relative paths guard !path.contains("../") else { - return request.eventLoop.makeFailedFuture(Abort(.forbidden)) + throw Abort(.forbidden) } // create absolute path var absPath = self.publicDirectory + path - - // check if path exists and whether it is a directory - var isDir: ObjCBool = false - guard FileManager.default.fileExists(atPath: absPath, isDirectory: &isDir) else { - return next.respond(to: request) - } - if isDir.boolValue { - guard absPath.hasSuffix("/") else { - switch directoryAction.kind { - case .redirect: - var redirectUrl = request.url - redirectUrl.path += "/" - return request.eventLoop.future( - request.redirect(to: redirectUrl.string, redirectType: .permanent) - ) - case .none: - return next.respond(to: request) + if let fileInfo = try await FileSystem.shared.info(forFileAt: .init(absPath)) { + // path exists, check for directory or file + if fileInfo.type == .directory { + // directory exists, see if we can return a file + if absPath.hasSuffix("/") { + // If a directory, check for the default file + if let defaultFile = defaultFile { + if defaultFile.isAbsolute() { + absPath = self.publicDirectory + defaultFile.removeLeadingSlashes() + } else { + absPath = absPath + defaultFile + } + + if try await FileSystem.shared.info(forFileAt: .init(absPath)) != nil { + // If the default file exists, stream it + return try await request.fileio.asyncStreamFile(at: absPath, advancedETagComparison: advancedETagComparison) + } + } + } else { + if directoryAction.kind == .redirect { + var redirectUrl = request.url + redirectUrl.path += "/" + return request.redirect(to: redirectUrl.string, redirectType: .permanent) + } } - } - - // If a directory, check for the default file - guard let defaultFile = defaultFile else { - return next.respond(to: request) - } - - if defaultFile.isAbsolute() { - absPath = self.publicDirectory + defaultFile.removeLeadingSlashes() } else { - absPath = absPath + defaultFile - } - - // If the default file doesn't exist, pass on request - guard FileManager.default.fileExists(atPath: absPath) else { - return next.respond(to: request) + // file exists, stream it + return try await request.fileio.asyncStreamFile(at: absPath, advancedETagComparison: advancedETagComparison) } } - - // stream the file - return request.fileio.streamFile(at: absPath, advancedETagComparison: advancedETagComparison) + + return try await next.respond(to: request) } /// Creates a new `FileMiddleware` for a server contained in an Xcode Project. diff --git a/Sources/Vapor/Response/Response+Body.swift b/Sources/Vapor/Response/Response+Body.swift index 4258167d7c..f4c4bc9353 100644 --- a/Sources/Vapor/Response/Response+Body.swift +++ b/Sources/Vapor/Response/Response+Body.swift @@ -108,6 +108,13 @@ extension Response { stream.callback(collector) return collector.promise.futureResult .map { $0 } + case .asyncStream(let stream): + let collector = ResponseBodyCollector(eventLoop: eventLoop, byteBufferAllocator: self.byteBufferAllocator) + return eventLoop.makeFutureWithTask { + try await stream.callback(collector) + }.flatMap { + collector.promise.futureResult.map { $0 } + } default: return eventLoop.makeSucceededFuture(self.buffer) } @@ -242,7 +249,7 @@ extension Response { // Since all buffer mutation is done on the event loop, we can be unchecked here. // This removes the need for a lock and performance hits from that // Any changes to this type need to be carefully considered -private final class ResponseBodyCollector: BodyStreamWriter, @unchecked Sendable { +private final class ResponseBodyCollector: BodyStreamWriter, AsyncBodyStreamWriter, @unchecked Sendable { var buffer: ByteBuffer let eventLoop: EventLoop let promise: EventLoopPromise @@ -268,4 +275,11 @@ private final class ResponseBodyCollector: BodyStreamWriter, @unchecked Sendable // Fixes an issue where errors in the stream should fail the individual write promise. if let promise { future.cascade(to: promise) } } + + func write(_ result: BodyStreamResult) async throws { + let promise = self.eventLoop.makePromise(of: Void.self) + + self.eventLoop.execute { self.write(result, promise: promise) } + try await promise.futureResult.get() + } } diff --git a/Sources/Vapor/Utilities/FileIO.swift b/Sources/Vapor/Utilities/FileIO.swift index eeb431c116..073c801322 100644 --- a/Sources/Vapor/Utilities/FileIO.swift +++ b/Sources/Vapor/Utilities/FileIO.swift @@ -1,11 +1,12 @@ import Foundation import NIOCore -import NIOFileSystem +import _NIOFileSystem import NIOHTTP1 import NIOPosix import Logging import Crypto import NIOConcurrencyHelpers +import _NIOFileSystemFoundationCompat extension Request { public var fileio: FileIO { @@ -93,20 +94,18 @@ public struct FileIO: Sendable { chunkSize: Int = NonBlockingFileIO.defaultChunkSize, onRead: @Sendable @escaping (ByteBuffer) -> EventLoopFuture ) -> EventLoopFuture { - guard - let attributes = try? FileManager.default.attributesOfItem(atPath: path), - let fileSize = attributes[.size] as? NSNumber - else { - return self.request.eventLoop.makeFailedFuture(Abort(.internalServerError)) + self.request.eventLoop.makeFutureWithTask { + guard let fileSize = try await FileSystem.shared.info(forFileAt: .init(path))?.size else { + throw Abort(.internalServerError) + } + try await self.read( + path: path, + fromOffset: 0, + byteCount: Int(fileSize), + chunkSize: chunkSize, + onRead: onRead + ).get() } - return self.read( - path: path, - fromOffset: 0, - byteCount: - fileSize.intValue, - chunkSize: chunkSize, - onRead: onRead - ) } /// Generates a chunked `Response` for the specified file. This method respects values in @@ -231,7 +230,7 @@ public struct FileIO: Sendable { /// will be used. This method will also set the `"Content-Type"` header /// automatically if an appropriate `MediaType` can be found for the file's suffix. /// - /// router.get("file-stream") { req in + /// app.get("file-stream") { req in /// return req.fileio.streamFile(at: "/path/to/file.txt") /// } /// @@ -250,104 +249,8 @@ public struct FileIO: Sendable { onCompleted: @escaping @Sendable (Result) -> () = { _ in } ) -> EventLoopFuture { // Get file attributes for this file. - guard - let attributes = try? FileManager.default.attributesOfItem(atPath: path), - let modifiedAt = attributes[.modificationDate] as? Date, - let fileSize = (attributes[.size] as? NSNumber)?.intValue - else { - return request.eventLoop.makeSucceededFuture(Response(status: .internalServerError)) - } - - let contentRange: HTTPHeaders.Range? - if let rangeFromHeaders = request.headers.range { - if rangeFromHeaders.unit == .bytes && rangeFromHeaders.ranges.count == 1 { - contentRange = rangeFromHeaders - } else { - contentRange = nil - } - } else if request.headers.contains(name: .range) { - // Range header was supplied but could not be parsed i.e. it was invalid - request.logger.debug("Range header was provided in request but was invalid") - let response = Response(status: .badRequest) - return request.eventLoop.makeSucceededFuture(response) - } else { - contentRange = nil - } - - var eTagFuture: EventLoopFuture - - if advancedETagComparison { - eTagFuture = generateETagHash(path: path, lastModified: modifiedAt) - } else { - // Generate ETag value, "last modified date in epoch time" + "-" + "file size" - eTagFuture = request.eventLoop.makeSucceededFuture("\"\(modifiedAt.timeIntervalSince1970)-\(fileSize)\"") - } - - return eTagFuture.map { fileETag in - // Create empty headers array. - var headers: HTTPHeaders = [:] - - // Respond with lastModified header - headers.lastModified = HTTPHeaders.LastModified(value: modifiedAt) - - headers.replaceOrAdd(name: .eTag, value: fileETag) - - // Check if file has been cached already and return NotModified response if the etags match - if fileETag == request.headers.first(name: .ifNoneMatch) { - // Per RFC 9110 here: https://www.rfc-editor.org/rfc/rfc9110.html#status.304 - // and here: https://www.rfc-editor.org/rfc/rfc9110.html#name-content-encoding - // A 304 response MUST include the ETag header and a Content-Length header matching what the original resource's content length would have been were this a 200 response. - headers.replaceOrAdd(name: .contentLength, value: fileSize.description) - return Response(status: .notModified, version: .http1_1, headersNoUpdate: headers, body: .empty) - } - - // Create the HTTP response. - let response = Response(status: .ok, headers: headers) - let offset: Int64 - let byteCount: Int - if let contentRange = contentRange { - response.status = .partialContent - response.headers.add(name: .accept, value: contentRange.unit.serialize()) - if let firstRange = contentRange.ranges.first { - do { - let range = try firstRange.asResponseContentRange(limit: fileSize) - response.headers.contentRange = HTTPHeaders.ContentRange(unit: contentRange.unit, range: range) - (offset, byteCount) = try firstRange.asByteBufferBounds(withMaxSize: fileSize, logger: request.logger) - } catch { - let response = Response(status: .badRequest) - return response - } - } else { - offset = 0 - byteCount = fileSize - } - } else { - offset = 0 - byteCount = fileSize - } - // Set Content-Type header based on the media type - // Only set Content-Type if file not modified and returned above. - if - let fileExtension = path.components(separatedBy: ".").last, - let type = mediaType ?? HTTPMediaType.fileExtension(fileExtension) - { - response.headers.contentType = type - } - response.body = .init(stream: { stream in - self.read(path: path, fromOffset: offset, byteCount: byteCount, chunkSize: chunkSize) { chunk in - return stream.write(.buffer(chunk)) - }.whenComplete { result in - switch result { - case .failure(let error): - stream.write(.error(error), promise: nil) - case .success: - stream.write(.end, promise: nil) - } - onCompleted(result) - } - }, count: byteCount, byteBufferAllocator: request.byteBufferAllocator) - - return response + self.request.eventLoop.makeFutureWithTask { + try await self.asyncStreamFile(at: path, chunkSize: chunkSize, mediaType: mediaType, advancedETagComparison: advancedETagComparison, onCompleted: onCompleted) } } @@ -462,19 +365,19 @@ public struct FileIO: Sendable { /// This can be removed once `NIOFileSystem` reaches a stable API. public struct FileChunks: AsyncSequence { public typealias Element = ByteBuffer - private let fileHandle: NIOFileSystem.FileHandleProtocol - private let fileChunks: NIOFileSystem.FileChunks + private let fileHandle: _NIOFileSystem.FileHandleProtocol + private let fileChunks: _NIOFileSystem.FileChunks - init(fileChunks: NIOFileSystem.FileChunks, fileHandle: some NIOFileSystem.FileHandleProtocol) { + init(fileChunks: _NIOFileSystem.FileChunks, fileHandle: some _NIOFileSystem.FileHandleProtocol) { self.fileChunks = fileChunks self.fileHandle = fileHandle } public struct FileChunksIterator: AsyncIteratorProtocol { - private var iterator: NIOFileSystem.FileChunks.AsyncIterator - private let fileHandle: NIOFileSystem.FileHandleProtocol + private var iterator: _NIOFileSystem.FileChunks.AsyncIterator + private let fileHandle: _NIOFileSystem.FileHandleProtocol - fileprivate init(wrapping iterator: NIOFileSystem.FileChunks.AsyncIterator, fileHandle: some NIOFileSystem.FileHandleProtocol) { + fileprivate init(wrapping iterator: _NIOFileSystem.FileChunks.AsyncIterator, fileHandle: some _NIOFileSystem.FileHandleProtocol) { self.iterator = iterator self.fileHandle = fileHandle } @@ -482,11 +385,16 @@ public struct FileIO: Sendable { public mutating func next() async throws -> ByteBuffer? { let chunk = try await iterator.next() if chunk == nil { + // For convenience's sake, close when we hit EOF. Closing on error is left up to the caller. try await fileHandle.close() } return chunk } } + + public func closeHandle() async throws { + try await self.fileHandle.close() + } public func makeAsyncIterator() -> FileChunksIterator { FileChunksIterator(wrapping: fileChunks.makeAsyncIterator(), fileHandle: fileHandle) @@ -505,12 +413,25 @@ public struct FileIO: Sendable { /// - returns: `FileChunks` containing the file data chunks. public func readFile( at path: String, - chunkSize: Int = NonBlockingFileIO.defaultChunkSize + chunkSize: Int = NonBlockingFileIO.defaultChunkSize, + offset: Int64? = nil, + byteCount: Int? = nil ) async throws -> FileChunks { let filePath = FilePath(path) let readHandle = try await fileSystem.openFile(forReadingAt: filePath) - let chunks = readHandle.readChunks(chunkLength: .bytes(Int64(chunkSize))) + + let chunks: _NIOFileSystem.FileChunks + + if let offset { + if let byteCount { + chunks = readHandle.readChunks(in: offset..<(offset+Int64(byteCount)), chunkLength: .bytes(Int64(chunkSize))) + } else { + chunks = readHandle.readChunks(in: offset..., chunkLength: .bytes(Int64(chunkSize))) + } + } else { + chunks = readHandle.readChunks(chunkLength: .bytes(Int64(chunkSize))) + } return FileChunks(fileChunks: chunks, fileHandle: readHandle) } @@ -531,6 +452,135 @@ public struct FileIO: Sendable { } try await self.io.write(fileHandle: fd, buffer: buffer) } + + /// Generates a chunked `Response` for the specified file. This method respects values in + /// the `"ETag"` header and is capable of responding `304 Not Modified` if the file in question + /// has not been modified since last served. If `advancedETagComparison` is set to true, + /// the response will have its ETag field set to a byte-by-byte hash of the requested file. If set to false, a simple ETag consisting of the last modified date and file size + /// will be used. This method will also set the `"Content-Type"` header + /// automatically if an appropriate `MediaType` can be found for the file's suffix. + /// + /// app.get("file-stream") { req in + /// return req.fileio.asyncStreamFile(at: "/path/to/file.txt") + /// } + /// + /// Async equivalent of ``streamFile(at:chunkSize:mediaType:advancedETagComparison:onCompleted:)`` using Swift Concurrency + /// functions under the hood + /// + /// - parameters: + /// - path: Path to file on the disk. + /// - chunkSize: Maximum size for the file data chunks. + /// - mediaType: HTTPMediaType, if not specified, will be created from file extension. + /// - advancedETagComparison: The method used when ETags are generated. If true, a byte-by-byte hash is created (and cached), otherwise a simple comparison based on the file's last modified date and size. + /// - onCompleted: Closure to be run on completion of stream. + /// - returns: A `200 OK` response containing the file stream and appropriate headers. + public func asyncStreamFile( + at path: String, + chunkSize: Int = NonBlockingFileIO.defaultChunkSize, + mediaType: HTTPMediaType? = nil, + advancedETagComparison: Bool = false, + onCompleted: @escaping @Sendable (Result) async throws -> () = { _ in } + ) async throws -> Response { + // Get file attributes for this file. + guard let fileInfo = try await FileSystem.shared.info(forFileAt: .init(path)) else { + throw Abort(.internalServerError) + } + + let contentRange: HTTPHeaders.Range? + if let rangeFromHeaders = request.headers.range { + if rangeFromHeaders.unit == .bytes && rangeFromHeaders.ranges.count == 1 { + contentRange = rangeFromHeaders + } else { + contentRange = nil + } + } else if request.headers.contains(name: .range) { + // Range header was supplied but could not be parsed i.e. it was invalid + request.logger.debug("Range header was provided in request but was invalid") + throw Abort(.badRequest) + } else { + contentRange = nil + } + + let eTag: String + + if advancedETagComparison { + eTag = try await generateETagHash(path: path, lastModified: fileInfo.lastDataModificationTime.date).get() + } else { + // Generate ETag value, "last modified date in epoch time" + "-" + "file size" + eTag = "\"\(fileInfo.lastDataModificationTime.seconds)-\(fileInfo.size)\"" + } + + // Create empty headers array. + var headers: HTTPHeaders = [:] + + // Respond with lastModified header + headers.lastModified = HTTPHeaders.LastModified(value: fileInfo.lastDataModificationTime.date) + + headers.replaceOrAdd(name: .eTag, value: eTag) + + // Check if file has been cached already and return NotModified response if the etags match + if eTag == request.headers.first(name: .ifNoneMatch) { + // Per RFC 9110 here: https://www.rfc-editor.org/rfc/rfc9110.html#status.304 + // and here: https://www.rfc-editor.org/rfc/rfc9110.html#name-content-encoding + // A 304 response MUST include the ETag header and a Content-Length header matching what the original resource's content length would have been were this a 200 response. + headers.replaceOrAdd(name: .contentLength, value: fileInfo.size.description) + return Response(status: .notModified, version: .http1_1, headersNoUpdate: headers, body: .empty) + } + + // Create the HTTP response. + let response = Response(status: .ok, headers: headers) + let offset: Int64 + let byteCount: Int + if let contentRange = contentRange { + response.status = .partialContent + response.headers.add(name: .accept, value: contentRange.unit.serialize()) + if let firstRange = contentRange.ranges.first { + do { + let range = try firstRange.asResponseContentRange(limit: Int(fileInfo.size)) + response.headers.contentRange = HTTPHeaders.ContentRange(unit: contentRange.unit, range: range) + (offset, byteCount) = try firstRange.asByteBufferBounds(withMaxSize: Int(fileInfo.size), logger: request.logger) + } catch { + throw Abort(.badRequest) + } + } else { + offset = 0 + byteCount = Int(fileInfo.size) + } + } else { + offset = 0 + byteCount = Int(fileInfo.size) + } + // Set Content-Type header based on the media type + // Only set Content-Type if file not modified and returned above. + if + let fileExtension = path.components(separatedBy: ".").last, + let type = mediaType ?? HTTPMediaType.fileExtension(fileExtension) + { + response.headers.contentType = type + } + + response.body = .init(asyncStream: { stream in + do { + let chunks = try await self.readFile(at: path, chunkSize: chunkSize, offset: offset, byteCount: byteCount) + do { + for try await chunk in chunks { + try await stream.writeBuffer(chunk) + } + try? await chunks.closeHandle() + } catch { + try? await chunks.closeHandle() + throw error + } + try await stream.write(.end) + try await onCompleted(.success(())) + } catch { + try? await stream.write(.error(error)) + try await onCompleted(.failure(error)) + } + }, count: byteCount, byteBufferAllocator: request.byteBufferAllocator) + + return response + } } extension HTTPHeaders.Range.Value { diff --git a/Sources/XCTVapor/XCTApplication.swift b/Sources/XCTVapor/XCTApplication.swift index e8ff4b0449..32103ccf40 100644 --- a/Sources/XCTVapor/XCTApplication.swift +++ b/Sources/XCTVapor/XCTApplication.swift @@ -5,9 +5,14 @@ import XCTest import Vapor extension Application: XCTApplicationTester { + @available(*, noasync, message: "Use the async method instead.") public func performTest(request: XCTHTTPRequest) throws -> XCTHTTPResponse { try self.testable().performTest(request: request) } + + public func performTest(request: XCTHTTPRequest) async throws -> XCTHTTPResponse { + try await self.testable().performTest(request: request) + } } extension Application { @@ -44,6 +49,7 @@ extension Application { self.port = port } + @available(*, noasync, message: "Use the async method instead.") func performTest(request: XCTHTTPRequest) throws -> XCTHTTPResponse { try app.server.start(address: .hostname(self.hostname, port: self.port)) defer { app.server.shutdown() } @@ -81,6 +87,44 @@ extension Application { body: response.body ?? ByteBufferAllocator().buffer(capacity: 0) ) } + + func performTest(request: XCTHTTPRequest) async throws -> XCTHTTPResponse { + try app.server.start(address: .hostname(self.hostname, port: self.port)) + defer { app.server.shutdown() } + + let client = HTTPClient(eventLoopGroup: MultiThreadedEventLoopGroup.singleton) + defer { try! client.syncShutdown() } + var path = request.url.path + path = path.hasPrefix("/") ? path : "/\(path)" + + let actualPort: Int + + if self.port == 0 { + guard let portAllocated = app.http.server.shared.localAddress?.port else { + throw Abort(.internalServerError, reason: "Failed to get port from local address") + } + actualPort = portAllocated + } else { + actualPort = self.port + } + + var url = "http://\(self.hostname):\(actualPort)\(path)" + if let query = request.url.query { + url += "?\(query)" + } + var clientRequest = try HTTPClient.Request( + url: url, + method: request.method, + headers: request.headers + ) + clientRequest.body = .byteBuffer(request.body) + let response = try await client.execute(request: clientRequest).get() + return XCTHTTPResponse( + status: response.status, + headers: response.headers, + body: response.body ?? ByteBufferAllocator().buffer(capacity: 0) + ) + } } private struct InMemory: XCTApplicationTester { @@ -89,6 +133,7 @@ extension Application { self.app = app } + @available(*, noasync, message: "Use the async method instead.") @discardableResult public func performTest( request: XCTHTTPRequest @@ -114,11 +159,39 @@ extension Application { body: res.body.collect(on: request.eventLoop).wait() ?? ByteBufferAllocator().buffer(capacity: 0) ) } + + @discardableResult + public func performTest( + request: XCTHTTPRequest + ) async throws -> XCTHTTPResponse { + var headers = request.headers + headers.replaceOrAdd( + name: .contentLength, + value: request.body.readableBytes.description + ) + let request = Request( + application: app, + method: request.method, + url: request.url, + headers: headers, + collectedBody: request.body.readableBytes == 0 ? nil : request.body, + remoteAddress: nil, + on: self.app.eventLoopGroup.next() + ) + let res = try await self.app.responder.respond(to: request).get() + return try await XCTHTTPResponse( + status: res.status, + headers: res.headers, + body: res.body.collect(on: request.eventLoop).get() ?? ByteBufferAllocator().buffer(capacity: 0) + ) + } } } public protocol XCTApplicationTester { + @available(*, noasync, message: "Use the async method instead.") func performTest(request: XCTHTTPRequest) throws -> XCTHTTPResponse + func performTest(request: XCTHTTPRequest) async throws -> XCTHTTPResponse } extension XCTApplicationTester { @@ -144,6 +217,7 @@ extension XCTApplicationTester { ) } + @available(*, noasync, message: "Use the async method instead.") @discardableResult public func test( _ method: HTTPMethod, @@ -185,15 +259,16 @@ extension XCTApplicationTester { ) try await beforeRequest(&request) do { - let response = try self.performTest(request: request) + let response = try await self.performTest(request: request) try await afterResponse(response) } catch { - XCTFail("\(error)", file: file, line: line) + XCTFail("\(String(reflecting: error))", file: file, line: line) throw error } return self } + @available(*, noasync, message: "Use the async method instead.") @discardableResult public func test( _ method: HTTPMethod, @@ -216,7 +291,7 @@ extension XCTApplicationTester { let response = try self.performTest(request: request) try afterResponse(response) } catch { - XCTFail("\(error)", file: file, line: line) + XCTFail("\(String(reflecting: error))", file: file, line: line) throw error } return self @@ -239,13 +314,14 @@ extension XCTApplicationTester { ) try await beforeRequest(&request) do { - return try self.performTest(request: request) + return try await self.performTest(request: request) } catch { - XCTFail("\(error)", file: file, line: line) + XCTFail("\(String(reflecting: error))", file: file, line: line) throw error } } + @available(*, noasync, message: "Use the async method instead.") public func sendRequest( _ method: HTTPMethod, _ path: String, @@ -265,7 +341,7 @@ extension XCTApplicationTester { do { return try self.performTest(request: request) } catch { - XCTFail("\(error)", file: file, line: line) + XCTFail("\(String(reflecting: error))", file: file, line: line) throw error } } diff --git a/Tests/VaporTests/AsyncFileTests.swift b/Tests/VaporTests/AsyncFileTests.swift new file mode 100644 index 0000000000..c1085e78df --- /dev/null +++ b/Tests/VaporTests/AsyncFileTests.swift @@ -0,0 +1,389 @@ +import XCTVapor +import XCTest +import Vapor +import NIOCore +import NIOHTTP1 +import _NIOFileSystem +import Crypto + +final class AsyncFileTests: XCTestCase, @unchecked Sendable { + func testStreamFile() throws { + let app = Application(.testing) + defer { app.shutdown() } + + app.get("file-stream") { req -> Response in + return try await req.fileio.asyncStreamFile(at: #file, advancedETagComparison: true) { result in + do { + try result.get() + } catch { + XCTFail("File Stream should have succeeded") + } + } + } + + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream") { res in + let test = "the quick brown fox" + XCTAssertNotNil(res.headers.first(name: .eTag)) + XCTAssertContains(res.body.string, test) + } + } + + func testStreamFileConnectionClose() throws { + let app = Application(.testing) + defer { app.shutdown() } + + app.get("file-stream") { req -> Response in + return try await req.fileio.asyncStreamFile(at: #file, advancedETagComparison: true) + } + + var headers = HTTPHeaders() + headers.replaceOrAdd(name: .connection, value: "close") + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headers) { res in + let test = "the quick brown fox" + XCTAssertNotNil(res.headers.first(name: .eTag)) + XCTAssertContains(res.body.string, test) + } + } + + func testStreamFileNull() throws { + let app = Application(.testing) + defer { app.shutdown() } + + app.get("file-stream") { req -> Response in + var tmpPath: String + repeat { + tmpPath = try await FileSystem.shared.temporaryDirectory.appending(UUID().uuidString).string + } while try await self.fileExists(at: tmpPath) + + return try await req.fileio.asyncStreamFile(at: tmpPath, advancedETagComparison: true) { result in + do { + try result.get() + XCTFail("File Stream should have failed") + } catch { + } + } + } + + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream") { res in + XCTAssertEqual(res.status, .internalServerError) + } + } + + private func fileExists(at path: String) async throws -> Bool { + return try await FileSystem.shared.info(forFileAt: .init(path)) != nil + } + + func testAdvancedETagHeaders() throws { + let app = Application(.testing) + defer { app.shutdown() } + + app.get("file-stream") { req -> Response in + return try await req.fileio.asyncStreamFile(at: #file, advancedETagComparison: true) { result in + do { + try result.get() + } catch { + XCTFail("File Stream should have succeeded") + } + } + } + + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream") { res in + let fileData = try Data(contentsOf: URL(fileURLWithPath: #file)) + let digest = SHA256.hash(data: fileData) + let eTag = res.headers.first(name: "etag") + XCTAssertEqual(eTag, digest.hex) + } + } + + func testSimpleETagHeaders() async throws { + let app = Application(.testing) + defer { app.shutdown() } + + app.get("file-stream") { req -> Response in + return try await req.fileio.asyncStreamFile(at: #file, advancedETagComparison: false) { result in + do { + try result.get() + } catch { + XCTFail("File Stream should have succeeded") + } + } + } + + try await app.testable(method: .running(port: 0)).test(.GET, "/file-stream") { res in + guard let fileInfo = try await FileSystem.shared.info(forFileAt: .init(#file)) else { + XCTFail("Missing File Info") + return + } + let fileETag = "\"\(Int(fileInfo.lastDataModificationTime.date.timeIntervalSince1970))-\(fileInfo.size)\"" + XCTAssertEqual(res.headers.first(name: .eTag), fileETag) + } + } + + func testStreamFileContentHeaderTail() throws { + let app = Application(.testing) + defer { app.shutdown() } + + app.get("file-stream") { req -> Response in + return try await req.fileio.asyncStreamFile(at: #file, advancedETagComparison: true) { result in + do { + try result.get() + } catch { + XCTFail("File Stream should have succeeded") + } + } + } + + var headerRequest = HTTPHeaders() + headerRequest.range = .init(unit: .bytes, ranges: [.tail(value: 20)]) + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headerRequest) { res in + + let contentRange = res.headers.first(name: "content-range") + let contentLength = res.headers.first(name: "content-length") + + let lowerRange = Int((contentRange?.split(separator: "-")[0].split(separator: " ")[1])!)! + let upperRange = Int((contentRange?.split(separator: "-")[1].split(separator: "/")[0])!)! + + let range = upperRange - lowerRange + 1 + let length = Int(contentLength!)! + + XCTAssertTrue(range == length) + } + } + + func testStreamFileContentHeaderStart() throws { + let app = Application(.testing) + defer { app.shutdown() } + + app.get("file-stream") { req -> Response in + return try await req.fileio.asyncStreamFile(at: #file, advancedETagComparison: true) { result in + do { + try result.get() + } catch { + XCTFail("File Stream should have succeeded") + } + } + } + + var headerRequest = HTTPHeaders() + headerRequest.range = .init(unit: .bytes, ranges: [.start(value: 20)]) + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headerRequest) { res in + + let contentRange = res.headers.first(name: "content-range") + let contentLength = res.headers.first(name: "content-length") + + let lowerRange = Int((contentRange?.split(separator: "-")[0].split(separator: " ")[1])!)! + let upperRange = Int((contentRange?.split(separator: "-")[1].split(separator: "/")[0])!)! + + let range = upperRange - lowerRange + 1 + let length = Int(contentLength!)! + + XCTAssertTrue(range == length) + } + } + + func testStreamFileContentHeadersWithin() throws { + let app = Application(.testing) + defer { app.shutdown() } + + app.get("file-stream") { req -> Response in + try await req.fileio.asyncStreamFile(at: #file, advancedETagComparison: true) { result in + XCTAssertNoThrow(try result.get()) + } + } + + var headerRequest = HTTPHeaders() + headerRequest.range = .init(unit: .bytes, ranges: [.within(start: 20, end: 25)]) + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headerRequest) { res in + + let contentRange = res.headers.first(name: "content-range") + let contentLength = res.headers.first(name: "content-length") + + let lowerRange = Int((contentRange?.split(separator: "-")[0].split(separator: " ")[1])!)! + let upperRange = Int((contentRange?.split(separator: "-")[1].split(separator: "/")[0])!)! + + let range = upperRange - lowerRange + 1 + let length = Int(contentLength!)! + + XCTAssertTrue(range == length) + } + } + + func testStreamFileContentHeadersOnlyFirstByte() async throws { + let app = Application(.testing) + defer { app.shutdown() } + + app.get("file-stream") { req in + try await req.fileio.asyncStreamFile(at: #file, advancedETagComparison: true) { result in + XCTAssertNoThrow(try result.get()) + } + } + + var headers = HTTPHeaders() + headers.range = .init(unit: .bytes, ranges: [.within(start: 0, end: 0)]) + try await app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headers) { res async in + XCTAssertEqual(res.status, .partialContent) + + XCTAssertEqual(res.headers.first(name: .contentLength), "1") + let range = res.headers.first(name: .contentRange)!.split(separator: "/").first!.split(separator: " ").last! + XCTAssertEqual(range, "0-0") + + XCTAssertEqual(res.body.readableBytes, 1) + } + } + + func testStreamFileContentHeadersWithinFail() throws { + let app = Application(.testing) + defer { app.shutdown() } + + app.get("file-stream") { req -> Response in + try await req.fileio.asyncStreamFile(at: #file, advancedETagComparison: true) { result in + XCTAssertNoThrow(try result.get()) + } + } + + var headerRequest = HTTPHeaders() + headerRequest.range = .init(unit: .bytes, ranges: [.within(start: -20, end: 25)]) + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headerRequest) { res in + XCTAssertEqual(res.status, .badRequest) + } + + headerRequest.range = .init(unit: .bytes, ranges: [.within(start: 10, end: 100000000)]) + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headerRequest) { res in + XCTAssertEqual(res.status, .badRequest) + } + } + + func testStreamFileContentHeadersStartFail() throws { + let app = Application(.testing) + defer { app.shutdown() } + + app.get("file-stream") { req -> Response in + try await req.fileio.asyncStreamFile(at: #file, advancedETagComparison: true) { result in + XCTAssertNoThrow(try result.get()) + } + } + + var headerRequest = HTTPHeaders() + headerRequest.range = .init(unit: .bytes, ranges: [.start(value: -20)]) + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headerRequest) { res in + XCTAssertEqual(res.status, .badRequest) + } + + headerRequest.range = .init(unit: .bytes, ranges: [.start(value: 100000000)]) + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headerRequest) { res in + XCTAssertEqual(res.status, .badRequest) + } + } + + func testStreamFileContentHeadersTailFail() throws { + let app = Application(.testing) + defer { app.shutdown() } + + app.get("file-stream") { req -> Response in + try await req.fileio.asyncStreamFile(at: #file, advancedETagComparison: true) { result in + XCTAssertNoThrow(try result.get()) + } + } + + var headerRequest = HTTPHeaders() + headerRequest.range = .init(unit: .bytes, ranges: [.tail(value: -20)]) + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headerRequest) { res in + XCTAssertEqual(res.status, .badRequest) + } + + headerRequest.range = .init(unit: .bytes, ranges: [.tail(value: 100000000)]) + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headerRequest) { res in + XCTAssertEqual(res.status, .badRequest) + } + } + + func testFileWrite() async throws { + let data = "Hello" + let path = "/tmp/fileio_write.txt" + + do { + let app = Application(.testing) + defer { app.shutdown() } + + let request = Request(application: app, on: app.eventLoopGroup.next()) + + try await request.fileio.writeFile(ByteBuffer(string: data), at: path) + + let result = try String(contentsOfFile: path) + XCTAssertEqual(result, data) + } catch { + try await FileSystem.shared.removeItem(at: .init(path)) + throw error + } + } + + // https://github.com/vapor/vapor/security/advisories/GHSA-vj2m-9f5j-mpr5 + func testInvalidRangeHeaderDoesNotCrash() throws { + let app = Application(.testing) + defer { app.shutdown() } + + app.get("file-stream") { req -> Response in + try await req.fileio.asyncStreamFile(at: #file, advancedETagComparison: true) + } + + var headers = HTTPHeaders() + headers.replaceOrAdd(name: .range, value: "bytes=0-9223372036854775807") + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headers) { res in + XCTAssertEqual(res.status, .badRequest) + } + + headers.replaceOrAdd(name: .range, value: "bytes=-1-10") + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headers) { res in + XCTAssertEqual(res.status, .badRequest) + } + + headers.replaceOrAdd(name: .range, value: "bytes=100-10") + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headers) { res in + XCTAssertEqual(res.status, .badRequest) + } + + headers.replaceOrAdd(name: .range, value: "bytes=10--100") + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headers) { res in + XCTAssertEqual(res.status, .badRequest) + } + + headers.replaceOrAdd(name: .range, value: "bytes=9223372036854775808-") + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headers) { res in + XCTAssertEqual(res.status, .badRequest) + } + + headers.replaceOrAdd(name: .range, value: "bytes=922337203-") + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headers) { res in + XCTAssertEqual(res.status, .badRequest) + } + + headers.replaceOrAdd(name: .range, value: "bytes=-922337203") + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headers) { res in + XCTAssertEqual(res.status, .badRequest) + } + + headers.replaceOrAdd(name: .range, value: "bytes=-9223372036854775808") + try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headers) { res in + XCTAssertEqual(res.status, .badRequest) + } + } + + func testAsyncFileRead() async throws { + let app = Application(.testing) + defer { app.shutdown() } + + let request = Request(application: app, on: app.eventLoopGroup.next()) + + let path = "/" + #filePath.split(separator: "/").dropLast().joined(separator: "/") + "/Utilities/long-test-file.txt" + + let content = try String(contentsOfFile: path) + + var readContent = "" + let file = try await request.fileio.readFile(at: path, chunkSize: 16 * 1024) // 32Kb, ~5 chunks + for try await chunk in file { + readContent += String(buffer: chunk) + } + + XCTAssertEqual(readContent, content, "The content read from the file does not match the expected content.") + } +} diff --git a/Tests/VaporTests/ContentTests.swift b/Tests/VaporTests/ContentTests.swift index c17c3943d7..5015a60152 100644 --- a/Tests/VaporTests/ContentTests.swift +++ b/Tests/VaporTests/ContentTests.swift @@ -303,7 +303,7 @@ final class ContentTests: XCTestCase { func testMultipartEncode() throws { struct User: Content { - static var defaultContentType: HTTPMediaType = .formData + static let defaultContentType: HTTPMediaType = .formData var name: String var age: Int var image: File @@ -331,7 +331,7 @@ final class ContentTests: XCTestCase { func testMultiPartEncodeUnicode() throws { struct User: Content { - static var defaultContentType: HTTPMediaType = .formData + static let defaultContentType: HTTPMediaType = .formData var name: String var age: Int var image: File diff --git a/Tests/VaporTests/FileTests.swift b/Tests/VaporTests/FileTests.swift index 03fc367507..628dc4f2f6 100644 --- a/Tests/VaporTests/FileTests.swift +++ b/Tests/VaporTests/FileTests.swift @@ -86,7 +86,7 @@ final class FileTests: XCTestCase { } try app.testable(method: .running(port: 0)).test(.GET, "/file-stream") { res in - XCTAssertTrue(res.body.string.isEmpty) + XCTAssertEqual(res.status, .internalServerError) } } @@ -130,7 +130,7 @@ final class FileTests: XCTestCase { let attributes = try FileManager.default.attributesOfItem(atPath: #file) let modifiedAt = attributes[.modificationDate] as! Date let fileSize = (attributes[.size] as? NSNumber)!.intValue - let fileETag = "\"\(modifiedAt.timeIntervalSince1970)-\(fileSize)\"" + let fileETag = "\"\(Int(modifiedAt.timeIntervalSince1970))-\(fileSize)\"" XCTAssertEqual(res.headers.first(name: .eTag), fileETag) } @@ -245,7 +245,7 @@ final class FileTests: XCTestCase { var headers = HTTPHeaders() headers.range = .init(unit: .bytes, ranges: [.within(start: 0, end: 0)]) - try app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headers) { res in + try await app.testable(method: .running(port: 0)).test(.GET, "/file-stream", headers: headers) { res async in XCTAssertEqual(res.status, .partialContent) XCTAssertEqual(res.headers.first(name: .contentLength), "1") @@ -350,61 +350,61 @@ final class FileTests: XCTestCase { XCTAssertEqual(result, data) } - func testPercentDecodedFilePath() throws { + func testPercentDecodedFilePath() async throws { let app = Application(.testing) defer { app.shutdown() } let path = #filePath.split(separator: "/").dropLast().joined(separator: "/") app.middleware.use(FileMiddleware(publicDirectory: "/" + path)) - try app.test(.GET, "/Utilities/foo%20bar.html") { res in + try await app.test(.GET, "/Utilities/foo%20bar.html") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "

Hello

\n") } } - func testPercentDecodedRelativePath() throws { + func testPercentDecodedRelativePath() async throws { let app = Application(.testing) defer { app.shutdown() } let path = #filePath.split(separator: "/").dropLast().joined(separator: "/") app.middleware.use(FileMiddleware(publicDirectory: "/" + path)) - try app.test(.GET, "%2e%2e/VaporTests/Utilities/foo.txt") { res in + try await app.test(.GET, "%2e%2e/VaporTests/Utilities/foo.txt") { res async in XCTAssertEqual(res.status, .forbidden) - }.test(.GET, "Utilities/foo.txt") { res in + }.test(.GET, "Utilities/foo.txt") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "bar\n") } } - func testDefaultFileRelative() throws { + func testDefaultFileRelative() async throws { let app = Application(.testing) defer { app.shutdown() } let path = #filePath.split(separator: "/").dropLast().joined(separator: "/") app.middleware.use(FileMiddleware(publicDirectory: "/" + path, defaultFile: "index.html")) - try app.test(.GET, "Utilities/") { res in + try await app.test(.GET, "Utilities/") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "

Root Default

\n") - }.test(.GET, "Utilities/SubUtilities/") { res in + }.test(.GET, "Utilities/SubUtilities/") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "

Subdirectory Default

\n") } } - func testDefaultFileAbsolute() throws { + func testDefaultFileAbsolute() async throws { let app = Application(.testing) defer { app.shutdown() } let path = #filePath.split(separator: "/").dropLast().joined(separator: "/") app.middleware.use(FileMiddleware(publicDirectory: "/" + path, defaultFile: "/Utilities/index.html")) - try app.test(.GET, "Utilities/") { res in + try await app.test(.GET, "Utilities/") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "

Root Default

\n") - }.test(.GET, "Utilities/SubUtilities/") { res in + }.test(.GET, "Utilities/SubUtilities/") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "

Root Default

\n") } diff --git a/Tests/VaporTests/MiddlewareTests.swift b/Tests/VaporTests/MiddlewareTests.swift index 80fef5776b..7d8ae57574 100644 --- a/Tests/VaporTests/MiddlewareTests.swift +++ b/Tests/VaporTests/MiddlewareTests.swift @@ -111,7 +111,7 @@ final class MiddlewareTests: XCTestCase { } } - func testFileMiddlewareFromBundle() throws { + func testFileMiddlewareFromBundle() async throws { var fileMiddleware: FileMiddleware! XCTAssertNoThrow(fileMiddleware = try FileMiddleware(bundle: .module, publicDirectory: "/"), "FileMiddleware instantiation from Bundle should not fail") @@ -120,13 +120,13 @@ final class MiddlewareTests: XCTestCase { defer { app.shutdown() } app.middleware.use(fileMiddleware) - try app.testable().test(.GET, "/foo.txt") { result in + try await app.testable().test(.GET, "/foo.txt") { result in XCTAssertEqual(result.status, .ok) XCTAssertEqual(result.body.string, "bar\n") } } - func testFileMiddlewareFromBundleSubfolder() throws { + func testFileMiddlewareFromBundleSubfolder() async throws { var fileMiddleware: FileMiddleware! XCTAssertNoThrow(fileMiddleware = try FileMiddleware(bundle: .module, publicDirectory: "SubUtilities"), "FileMiddleware instantiation from Bundle should not fail") @@ -135,7 +135,7 @@ final class MiddlewareTests: XCTestCase { defer { app.shutdown() } app.middleware.use(fileMiddleware) - try app.testable().test(.GET, "/index.html") { result in + try await app.testable().test(.GET, "/index.html") { result in XCTAssertEqual(result.status, .ok) XCTAssertEqual(result.body.string, "

Subdirectory Default

\n") } diff --git a/Tests/VaporTests/PipelineTests.swift b/Tests/VaporTests/PipelineTests.swift index c000b477c1..aa3a78f6c3 100644 --- a/Tests/VaporTests/PipelineTests.swift +++ b/Tests/VaporTests/PipelineTests.swift @@ -224,7 +224,7 @@ final class PipelineTests: XCTestCase { return ResponseThing(eventLoop: eventLoop) } - try app.test(.GET, "dont-crash") { res in + try await app.test(.GET, "dont-crash") { res async in XCTAssertEqual(res.status, .ok) } @@ -254,7 +254,7 @@ final class PipelineTests: XCTestCase { return "OK" } - try app.test(.GET, "dont-crash") { res in + try await app.test(.GET, "dont-crash") { res async in XCTAssertEqual(res.status, .ok) } diff --git a/Tests/VaporTests/WebSocketTests.swift b/Tests/VaporTests/WebSocketTests.swift index be02a2f55d..771c096b1a 100644 --- a/Tests/VaporTests/WebSocketTests.swift +++ b/Tests/VaporTests/WebSocketTests.swift @@ -56,6 +56,7 @@ final class WebSocketTests: XCTestCase { ws.close(promise: nil) } + app.http.server.configuration.port = 0 app.environment.arguments = ["serve"] try app.start()