Skip to content

Commit

Permalink
Add missing platform specifiers (#121)
Browse files Browse the repository at this point in the history
* Add missing platform specifiers

* Fix imports

* Remove outdated concurrency conditionals, replace deprecated NIOAtomic with ManagedAtomic
  • Loading branch information
gwynne committed Mar 22, 2023
1 parent 89e98b8 commit f1adaf4
Show file tree
Hide file tree
Showing 17 changed files with 43 additions and 38 deletions.
5 changes: 4 additions & 1 deletion Package.swift
Expand Up @@ -4,7 +4,10 @@ import PackageDescription
let package = Package(
name: "queues",
platforms: [
.macOS(.v10_15)
.macOS(.v10_15),
.iOS(.v13),
.tvOS(.v13),
.watchOS(.v6),
],
products: [
.library(name: "Queues", targets: ["Queues"]),
Expand Down
1 change: 1 addition & 0 deletions Sources/Queues/Application+Queues.swift
@@ -1,4 +1,5 @@
import Foundation
import Logging
import Vapor
import NIO

Expand Down
5 changes: 0 additions & 5 deletions Sources/Queues/AsyncJob.swift
Expand Up @@ -2,9 +2,7 @@ import Vapor
import NIOCore
import Foundation

#if compiler(>=5.5) && canImport(_Concurrency)
/// A task that can be queued for future execution.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public protocol AsyncJob: Job {
/// The data associated with a job
associatedtype Payload
Expand Down Expand Up @@ -41,7 +39,6 @@ public protocol AsyncJob: Job {
static func parsePayload(_ bytes: [UInt8]) throws -> Payload
}

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension AsyncJob where Payload: Codable {

/// Serialize a payload into Data
Expand All @@ -57,7 +54,6 @@ extension AsyncJob where Payload: Codable {
}
}

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension AsyncJob {
/// The jobName of the Job
public static var name: String {
Expand Down Expand Up @@ -93,4 +89,3 @@ extension AsyncJob {
return
}
}
#endif
4 changes: 0 additions & 4 deletions Sources/Queues/AsyncJobEventDelegate.swift
@@ -1,8 +1,6 @@
#if compiler(>=5.5) && canImport(_Concurrency)
import NIOCore

/// Represents an object that can receive notifications about job statuses
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public protocol AsyncJobEventDelegate: JobEventDelegate {
/// Called when the job is first dispatched
/// - Parameters:
Expand All @@ -26,7 +24,6 @@ public protocol AsyncJobEventDelegate: JobEventDelegate {
func error(jobId: String, error: Error) async throws
}

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension AsyncJobEventDelegate {
public func dispatched(job: JobEventData) async throws { }
public func didDequeue(jobId: String) async throws { }
Expand Down Expand Up @@ -57,4 +54,3 @@ extension AsyncJobEventDelegate {
}
}
}
#endif
4 changes: 0 additions & 4 deletions Sources/Queues/AsyncScheduledJob.swift
Expand Up @@ -2,9 +2,7 @@ import Vapor
import NIOCore
import Foundation

#if compiler(>=5.5) && canImport(_Concurrency)
/// Describes a job that can be scheduled and repeated
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public protocol AsyncScheduledJob: ScheduledJob {
var name: String { get }

Expand All @@ -13,7 +11,6 @@ public protocol AsyncScheduledJob: ScheduledJob {
func run(context: QueueContext) async throws
}

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension AsyncScheduledJob {
public var name: String { "\(Self.self)" }

Expand All @@ -25,4 +22,3 @@ extension AsyncScheduledJob {
return promise.futureResult
}
}
#endif
3 changes: 2 additions & 1 deletion Sources/Queues/Job.swift
@@ -1,5 +1,6 @@
import NIO
import NIOCore
import Foundation
import Logging
import Vapor

/// A task that can be queued for future execution.
Expand Down
4 changes: 1 addition & 3 deletions Sources/Queues/Queue+Async.swift
@@ -1,15 +1,14 @@
import Foundation
import Vapor
import NIOCore

#if compiler(>=5.5) && canImport(_Concurrency)
extension Queue {
/// Dispatch a job into the queue for processing
/// - Parameters:
/// - job: The Job type
/// - payload: The payload data to be dispatched
/// - maxRetryCount: Number of times to retry this job on failure
/// - delayUntil: Delay the processing of this job until a certain date
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public func dispatch<J>(
_ job: J.Type,
_ payload: J.Payload,
Expand All @@ -20,4 +19,3 @@ extension Queue {
try await self.dispatch(job, payload, maxRetryCount: maxRetryCount, delayUntil: delayUntil, id: id).get()
}
}
#endif
2 changes: 2 additions & 0 deletions Sources/Queues/QueueContext.swift
@@ -1,3 +1,5 @@
import Logging
import NIOCore
import Vapor

/// The context for a queue.
Expand Down
18 changes: 13 additions & 5 deletions Sources/Queues/QueuesCommand.swift
@@ -1,6 +1,14 @@
import ConsoleKit
import Dispatch
import Vapor
import NIOConcurrencyHelpers
import NIOCore
import Atomics
#if os(Linux)
import Glibc
#else
import Darwin.C
#endif

/// The command to start the Queue job
public final class QueuesCommand: Command {
Expand Down Expand Up @@ -30,7 +38,7 @@ public final class QueuesCommand: Command {
private var signalSources: [DispatchSourceSignal]
private var didShutdown: Bool

private let isShuttingDown: NIOAtomic<Bool>
private let isShuttingDown: ManagedAtomic<Bool>

private var eventLoopGroup: EventLoopGroup {
self.application.eventLoopGroup
Expand All @@ -41,7 +49,7 @@ public final class QueuesCommand: Command {
self.application = application
self.jobTasks = []
self.scheduledTasks = [:]
self.isShuttingDown = .makeAtomic(value: false)
self.isShuttingDown = .init(false)
self.signalSources = []
self.didShutdown = false
self.lock = .init()
Expand Down Expand Up @@ -114,7 +122,7 @@ public final class QueuesCommand: Command {
return worker.run().map {
self.application.logger.trace("Worker ran the task successfully")
//Check if shutting down
if self.isShuttingDown.load() {
if self.isShuttingDown.load(ordering: .relaxed) {
self.application.logger.trace("Shutting down, cancelling the task")
task.cancel()
}
Expand Down Expand Up @@ -145,7 +153,7 @@ public final class QueuesCommand: Command {
}

private func schedule(_ job: AnyScheduledJob) {
if self.isShuttingDown.load() {
if self.isShuttingDown.load(ordering: .relaxed) {
self.application.logger.trace("Application is shutting down, cancelling scheduling \(job.job.name)")
return
}
Expand Down Expand Up @@ -180,7 +188,7 @@ public final class QueuesCommand: Command {
self.lock.lock()
defer { self.lock.unlock() }

self.isShuttingDown.store(true)
self.isShuttingDown.store(true, ordering: .relaxed)
self.didShutdown = true

// stop running in case shutting downf rom signal
Expand Down
2 changes: 1 addition & 1 deletion Sources/Queues/QueuesEventLoopPreference.swift
Expand Up @@ -6,7 +6,7 @@
//

import Foundation
import NIO
import NIOCore

/// Determines which event loop the jobs worker uses while executing jobs.
public enum QueuesEventLoopPreference {
Expand Down
2 changes: 1 addition & 1 deletion Sources/Queues/Request+Queues.swift
@@ -1,6 +1,6 @@
import Foundation
import Vapor
import NIO
import NIOCore

extension Request {
/// Returns the default job `Queue`
Expand Down
2 changes: 1 addition & 1 deletion Sources/Queues/ScheduleBuilder.swift
Expand Up @@ -24,7 +24,7 @@ public final class ScheduleBuilder {
case first
case last
case exact(Int)

public init(integerLiteral value: Int) {
self = .exact(value)
}
Expand Down
2 changes: 2 additions & 0 deletions Sources/Queues/ScheduledJob.swift
@@ -1,4 +1,6 @@
import NIOCore
import Foundation
import Logging

/// Describes a job that can be scheduled and repeated
public protocol ScheduledJob {
Expand Down
1 change: 1 addition & 0 deletions Sources/XCTQueues/TestQueueDriver.swift
@@ -1,5 +1,6 @@
import Queues
import Vapor
import NIOCore
import NIOConcurrencyHelpers

extension Application.Queues.Provider {
Expand Down
9 changes: 4 additions & 5 deletions Tests/QueuesTests/AsyncQueueTests.swift
@@ -1,19 +1,20 @@
#if compiler(>=5.5) && canImport(_Concurrency)
import Queues
import Foundation
import Vapor
import XCTest
import XCTVapor
import XCTQueues
@testable import Vapor
import NIOCore
import NIOConcurrencyHelpers

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
final class AsyncQueueTests: XCTestCase {
func testAsyncJob() throws {
let app = Application(.testing)
defer { app.shutdown() }
app.queues.use(.test)

let promise = app.eventLoopGroup.next().makePromise(of: Void.self)
let promise = app.eventLoopGroup.any().makePromise(of: Void.self)
app.queues.add(MyAsyncJob(promise: promise))

app.get("foo") { req in
Expand Down Expand Up @@ -41,7 +42,6 @@ final class AsyncQueueTests: XCTestCase {
}
}

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
struct MyAsyncJob: AsyncJob {
let promise: EventLoopPromise<Void>

Expand All @@ -54,4 +54,3 @@ struct MyAsyncJob: AsyncJob {
return
}
}
#endif
14 changes: 9 additions & 5 deletions Tests/QueuesTests/QueueTests.swift
@@ -1,9 +1,13 @@
import Atomics
import Queues
import Vapor
import Foundation
import XCTest
import XCTVapor
import XCTQueues
@testable import Vapor
import NIOCore
import NIOConcurrencyHelpers
@testable import Vapor

final class QueueTests: XCTestCase {
func testVaporIntegrationWithInProcessJob() throws {
Expand Down Expand Up @@ -137,13 +141,13 @@ final class QueueTests: XCTestCase {
let app = Application(.testing)
defer { app.shutdown() }

XCTAssertEqual(TestingScheduledJob.count.load(), 0)
XCTAssertEqual(TestingScheduledJob.count.load(ordering: .relaxed), 0)
app.queues.schedule(TestingScheduledJob()).everySecond()
try app.queues.startScheduledJobs()

let promise = app.eventLoopGroup.next().makePromise(of: Void.self)
app.eventLoopGroup.next().scheduleTask(in: .seconds(5)) { () -> Void in
XCTAssert(TestingScheduledJob.count.load() > 4)
XCTAssert(TestingScheduledJob.count.load(ordering: .relaxed) > 4)
promise.succeed(())
}

Expand Down Expand Up @@ -408,10 +412,10 @@ struct FailingScheduledJob: ScheduledJob {
}

struct TestingScheduledJob: ScheduledJob {
static var count = NIOAtomic<Int>.makeAtomic(value: 0)
static var count = ManagedAtomic<Int>(0)

func run(context: QueueContext) -> EventLoopFuture<Void> {
TestingScheduledJob.count.add(1)
TestingScheduledJob.count.wrappingIncrement(ordering: .relaxed)
return context.eventLoop.future()
}
}
Expand Down
3 changes: 1 addition & 2 deletions Tests/QueuesTests/ScheduleBuilderTests.swift
@@ -1,3 +1,4 @@
import Foundation
import Queues
import XCTest
import NIOCore
Expand Down Expand Up @@ -157,8 +158,6 @@ final class ScheduleBuilderTests: XCTestCase {

}



final class Cleanup: ScheduledJob {
func run(context: QueueContext) -> EventLoopFuture<Void> {
return context.eventLoop.makeSucceededFuture(())
Expand Down

0 comments on commit f1adaf4

Please sign in to comment.