diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index acc6159..48b8aa7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,4 +8,4 @@ jobs: uses: vapor/ci/.github/workflows/run-unit-tests.yml@reusable-workflows with: with_coverage: true - with_tsan: true + with_tsan: false diff --git a/Package.swift b/Package.swift index a52f090..aec0cdf 100644 --- a/Package.swift +++ b/Package.swift @@ -1,4 +1,4 @@ -// swift-tools-version:5.6 +// swift-tools-version:5.8 import PackageDescription let package = Package( @@ -14,7 +14,7 @@ let package = Package( .library(name: "XCTQueues", targets: ["XCTQueues"]) ], dependencies: [ - .package(url: "https://github.com/vapor/vapor.git", from: "4.76.2"), + .package(url: "https://github.com/vapor/vapor.git", from: "4.101.1"), .package(url: "https://github.com/apple/swift-nio.git", from: "2.53.0"), ], targets: [ diff --git a/Sources/Queues/Application+Queues.swift b/Sources/Queues/Application+Queues.swift index b217eae..dc7d914 100644 --- a/Sources/Queues/Application+Queues.swift +++ b/Sources/Queues/Application+Queues.swift @@ -49,6 +49,15 @@ extension Application { driver.shutdown() } } + + func shutdownAsync(_ application: Application) async { + for command in application.queues.storage.commands { + await command.asyncShutdown() + } + if let driver = application.queues.storage.driver { + await driver.asyncShutdown() + } + } } /// The `QueuesConfiguration` object diff --git a/Sources/Queues/QueuesCommand.swift b/Sources/Queues/QueuesCommand.swift index e4af895..cd2cada 100644 --- a/Sources/Queues/QueuesCommand.swift +++ b/Sources/Queues/QueuesCommand.swift @@ -208,6 +208,32 @@ public final class QueuesCommand: Command { } } + public func asyncShutdown() async { + self.lock.lock() + + self.isShuttingDown.store(true, ordering: .relaxed) + self.didShutdown = true + + // stop running in case shutting downf rom signal + self.application.running?.stop() + + // clear signal sources + self.signalSources.forEach { $0.cancel() } // clear refs + self.signalSources = [] + + // Release the lock before we start any suspensions + self.lock.unlock() + + // stop all job queue workers + for jobTask in self.jobTasks { + await jobTask.asyncCancel(on: self.eventLoopGroup.any()) + } + // stop all scheduled jobs + for scheduledTask in self.scheduledTasks.values { + await scheduledTask.task.asyncCancel(on: self.eventLoopGroup.any()) + } + } + deinit { assert(self.didShutdown, "JobsCommand did not shutdown before deinit") } diff --git a/Sources/Queues/QueuesDriver.swift b/Sources/Queues/QueuesDriver.swift index 1853a74..02452d7 100644 --- a/Sources/Queues/QueuesDriver.swift +++ b/Sources/Queues/QueuesDriver.swift @@ -6,4 +6,13 @@ public protocol QueuesDriver { /// Shuts down the driver func shutdown() + + /// Shut down the driver asyncrhonously. Helps avoid calling `.wait()` + func asyncShutdown() async +} + +extension QueuesDriver { + public func asyncShutdown() async { + shutdown() + } } diff --git a/Sources/Queues/RepeatedTask+Cancel.swift b/Sources/Queues/RepeatedTask+Cancel.swift index 7f2ac0b..841fa0c 100644 --- a/Sources/Queues/RepeatedTask+Cancel.swift +++ b/Sources/Queues/RepeatedTask+Cancel.swift @@ -10,4 +10,14 @@ extension RepeatedTask { print("failed cancelling repeated task \(error)") } } -} \ No newline at end of file + + func asyncCancel(on eventLoop: EventLoop) async { + do { + let promise = eventLoop.makePromise(of: Void.self) + self.cancel(promise: promise) + try await promise.futureResult.get() + } catch { + print("failed cancelling repeated task \(error)") + } + } +} diff --git a/Tests/QueuesTests/AsyncQueueTests.swift b/Tests/QueuesTests/AsyncQueueTests.swift index 7c6c732..7c0f42e 100644 --- a/Tests/QueuesTests/AsyncQueueTests.swift +++ b/Tests/QueuesTests/AsyncQueueTests.swift @@ -9,9 +9,17 @@ import NIOCore import NIOConcurrencyHelpers final class AsyncQueueTests: XCTestCase { - func testAsyncJob() throws { - let app = Application(.testing) - defer { app.shutdown() } + var app: Application! + + override func setUp() async throws { + app = try await Application.make(.testing) + } + + override func tearDown() async throws { + try await app.asyncShutdown() + } + + func testAsyncJob() async throws { app.queues.use(.test) let promise = app.eventLoopGroup.any().makePromise(of: Void.self) @@ -22,7 +30,7 @@ final class AsyncQueueTests: XCTestCase { .map { _ in "done" } } - try app.testable().test(.GET, "foo") { res in + try await app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") } @@ -34,7 +42,7 @@ final class AsyncQueueTests: XCTestCase { XCTAssertNotNil(job) XCTAssertEqual(job!.foo, "bar") - try app.queues.queue.worker.run().wait() + try await app.queues.queue.worker.run().get() XCTAssertEqual(app.queues.test.queue.count, 0) XCTAssertEqual(app.queues.test.jobs.count, 0)