Skip to content

Commit

Permalink
Add docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Joannis committed Oct 21, 2023
1 parent 977ee39 commit a4b7f3e
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 16 deletions.
22 changes: 21 additions & 1 deletion Sources/MongoQueue/MongoQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,26 @@ import Foundation
import Meow

/// A MongoQueue is a queue that uses MongoDB as a backend for storing tasks. It is designed to be used in a distributed environment.
///
/// 1. First, connect to MongoDB and create the MongoQueue.
/// 2. Then, register your tasks _with_ ExecutionContext.
/// 3. Finally, start the job queue.
///
/// ```swift
/// import MongoKitten
///
/// let db = try await MongoDatabase.connect(to: "mongodb://localhost/my-db")
/// let queue = MongoQueue(db["job-queue"])
/// queue.registerTask(Reminder.self, context: executionContext)
/// // Run the queue until it's stopped or a cancellation is received
/// try await queue.run()
/// ```
///
/// To insert a new task into the queue:
///
/// ```swift
/// try await queue.queueTask(Reminder(username: "Joannis"))
/// ```
public final class MongoQueue {
internal let collection: MongoCollection
internal let logger = Logger(label: "org.openkitten.mongo-queues")
Expand Down Expand Up @@ -317,7 +337,7 @@ public final class MongoQueue {
}
}

/// Queues a task for execution. The task will be executed after the specified Date.
/// Queues a task for execution.
public func queueTask<T: _QueuedTask>(_ task: T) async throws {
assert(
knownTypes.contains(where: { $0.category == T.category }),
Expand Down
19 changes: 12 additions & 7 deletions Sources/MongoQueue/QueuedTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,26 @@ import Foundation
///
/// These tasks can be queued to MongoDB, and an available process built on MongoQueue will pick up the task and run it
///
/// You cannot implement `_QueuedTask` yourself, but instead need to implement one of the derived protocols
/// You cannot implement this protocol directly, but instead need to implement one of the derived protocols.
/// This can be either ``ScheduledTask`` or ``RecurringTask``, but not both at the same time.
public protocol _QueuedTask: Codable {
associatedtype ExecutionContext
associatedtype ExecutionContext = Void

/// The type of task being scheduled, defaults to your `Task.Type` name
/// The type of task being scheduled, defaults to your Type's name. This chosen value may only be associated with one type at a time.
static var category: String { get }

/// A group represents custom information that you can query for when disabling or deleting tasks.
///
/// For example, a user's ID can be associated with a `group`, so that all the tasks that provide information to this user can be cleaned up in bulk when a user is deleted from the system.
var group: String? { get }

/// The amount of urgency your task has. Tasks with higher priority take precedence over lower priorities.
/// When priorities are equal, the first-created task is executed fist.
/// When priorities are equal, the earlier-created task is executed first.
var priority: TaskPriority { get }

/// An internal configuration object that MongoQueue uses to pass around internal metadata
///
/// - Warning: Do not implement or use this yourself, if you need this hook let us know
var configuration: _TaskConfiguration { get }

/// The expected maximum duration of this task, defaults to 10 minutes
Expand All @@ -29,10 +34,10 @@ public protocol _QueuedTask: Codable {
/// Defaults to `false`
// var allowsParallelisation: Bool { get }

/// Executes the task using the available metadata stored in `self`
/// Executes the task using the stored properties in `self`. `ExecutionContext` can be any instance of your choosing, and is used as a means to execute the task. In the case of an newsletter task, this would be the email client.
mutating func execute(withContext context: ExecutionContext) async throws
/// Do not implement this method yourself, if you need this hook let us know

/// - Warning: Do not implement this method yourself, if you need this hook let us know
func _onDequeueTask(_ task: TaskModel, withContext context: ExecutionContext, inQueue queue: MongoQueue) async throws -> _DequeueResult

/// Called when the task failed to execute. Provides an opportunity to decide the fate of this task
Expand Down
50 changes: 47 additions & 3 deletions Sources/MongoQueue/RecurringTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,63 @@ import MongoCore
import Foundation
import Meow

/// A task that can be executed on a recurring basis (e.g. every day, every month, etc)
/// A protocol that describes a task that can be executed on a recurring basis (e.g. every day, every month, etc)
///
/// When conforming to this type, you're also conforming to `Codable`. Using this Codable conformance, all stored properties will be stored in and retrieved from MongoDB. Your task's `execute` function represents your business logic of how tasks are handled, whereas the stored properties of this type represent the input you to execute this work.
///
/// The context provided into ``execute`` can be any type of your choosing, and is used as a means to execute the task. In the case of an newsletter task, this would be the email client.
///
/// ```swift
/// struct DailyReminder: RecurringTask {
/// typealias ExecutionContext = SMTPClient
///
/// // Stored properties are encoded to MongoDB
/// // When the task runs, they'll be decodd into a new `Reminder` instance
/// // After which `execute` will be called
/// let username: String
///
/// // A mandatory property, allowing MongoQueue to set the initial execution date
/// // In this case, MongoQueue will set the execution date to "now".
/// // This causes the task to be ran as soon as possible.
/// // Because this is computed, the property is not stored in MongoDB.
/// var initialTaskExecutionDate: Date { Date() }
///
/// // Your business logic for this task, which can `mutate` self.
/// // This allow it to pass updated info into the next iteration.
/// mutating func execute(withContext: context: ExecutionContext) async throws {
/// print("I'm running! Wake up, \(username)")
/// }
///
/// // Calculate the next time when this task should be executed again
/// func getNextRecurringTaskDate(_ context: ExecutionContext) async throws -> Date? {
/// // Re-run again in 24 hours
/// return Date().addingTimeInterval(3600 * 24)
/// }
///
/// // What to do when `execute` throws an error
/// func onExecutionFailure(
/// failureContext: QueuedTaskFailure<MyTaskContext>
/// ) async throws -> TaskExecutionFailureAction {
/// // Removes the task from the queue without re-attempting
/// return .dequeue()
/// }
/// }
/// ```
public protocol RecurringTask: _QueuedTask {
/// The moment that you want this to be executed on (delay)
/// The moment that you want this to be first executed on (delay)
/// If you want it to be immediate, use `Date()`
var initialTaskExecutionDate: Date { get }

/// If you want only one task of this type to exist, use a static task key
/// If you want to have many tasks, but not duplicate the task, identify this task by the task key
/// If you don't want this task to be uniquely identified, and you want to spawn many of them, use `UUID().uuidString`
var uniqueTaskKey: String { get }

/// Tasks won't be executed after this moment
var taskExecutionDeadline: TimeInterval? { get }

/// Calculates the next moment that this task should be executed on (e.g. next month, next day, etc)
/// Calculates the next moment that this task should be executed on (e.g. next month, next day, etc).
/// This is called _after_ your `execute` function has successfully completed the work.
/// If you want to stop recurring, return `nil`.
/// - parameter context: The context that was used to execute the task.
func getNextRecurringTaskDate(_ context: ExecutionContext) async throws -> Date?
Expand Down
36 changes: 35 additions & 1 deletion Sources/MongoQueue/ScheduledTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,40 @@ import Meow

/// A task that is scheduled to be executed at a specific moment in time.
/// This task will be executed once, and then removed from the queue.
///
/// When conforming to this type, you're also conforming to `Codable`. Using this Codable conformance, all stored properties will be stored in and retrieved from MongoDB. Your task's `execute` function represents your business logic of how tasks are handled, whereas the stored properties of this type represent the input you to execute this work.
///
/// The context provided into ``execute`` can be any type of your choosing, and is used as a means to execute the task. In the case of an newsletter task, this would be the email client.
///
/// ```swift
/// struct Reminder: ScheduledTask {
/// typealias ExecutionContext = SMTPClient
///
/// // Stored properties are encoded to MongoDB
/// // When the task runs, they'll be decodd into a new `Reminder` instance
/// // After which `execute` will be called
/// let username: String
///
/// // A mandatory property, allowing MongoQueue to set the execution date
/// // In this case, MongoQueue will set the execution date to "now".
/// // This causes the task to be ran as soon as possible.
/// // Because this is computed, the property is not stored in MongoDB.
/// var taskExecutionDate: Date { Date() }
///
/// // Your business logic for this task
/// func execute(withContext: context: ExecutionContext) async throws {
/// print("I'm running! Hello, \(username)")
/// }
///
/// // What to do when `execute` throws an error
/// func onExecutionFailure(
/// failureContext: QueuedTaskFailure<ExecutionContext>
/// ) async throws -> TaskExecutionFailureAction {
/// // Removes the task from the queue without re-attempting
/// return .dequeue()
/// }
/// }
/// ```
public protocol ScheduledTask: _QueuedTask {
/// The date that you want this to be executed (delay)
/// If you want it to be immediate, use `Date()`
Expand All @@ -21,7 +55,7 @@ extension ScheduledTask {
public var taskExecutionDeadline: Date? { nil }
public var taskRemovalAction: TaskRemovalAction { .dequeue() }

public func _onDequeueTask(_ task: TaskModel, withContext context: ExecutionContext, inQueue queue: MongoQueue) async throws -> _DequeueResult{
public func _onDequeueTask(_ task: TaskModel, withContext context: ExecutionContext, inQueue queue: MongoQueue) async throws -> _DequeueResult {
do {
// TODO: We assume this succeeds, but what if it does not?
var concern = WriteConcern()
Expand Down
16 changes: 12 additions & 4 deletions Sources/MongoQueue/TaskModel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ public struct TaskPriority {
/// Not as urgent as regular user actions, but please do not take all the time in the world
public static let lower = TaskPriority(raw: .lower)

/// Regular user actions
/// Regular user actions, this is the default value
public static let normal = TaskPriority(raw: .normal)

/// This is needed fast, think of real-time communication
/// This is needed faster than other items
public static let higher = TaskPriority(raw: .higher)

/// THIS SHOULD NOT WAIT
Expand Down Expand Up @@ -86,7 +86,7 @@ public struct TaskModel: Codable {

let _id: ObjectId

/// Contains `Task.name`
/// Contains `Task.name`, used to identify how to decode the `metadata`
let category: String
let group: String?
let uniqueKey: String?
Expand All @@ -97,10 +97,16 @@ public struct TaskModel: Codable {
var executeBefore: Date?
var attempts: Int
var status: TaskStatus._Raw

/// The Task's stored properties, created by encoding the task using BSONEncoder
var metadata: Document

/// When this is set in the database, this task is currently being executed
var execution: ExecutingContext?

/// The maximum time that this task is expected to take. If the task takes longer than this, `execution.lasUpdate` **must** be updated before the time expires. If the times expires, the task's runner is assumed to be killed, and the task will be re-queued for execution.
let maxTaskDuration: TimeInterval

// let allowsParallelisation: Bool

private enum ConfigurationType: String, Codable {
Expand Down Expand Up @@ -160,6 +166,8 @@ public struct TaskModel: Codable {
}

/// The configuration of a task, used to determine when the task should be executed. This is a wrapper around the actual configuration as to allow for future expansion.
///
/// - Warning: Do not interact with this type yourself. It exists as a means to discourage/prevent users from creating custom Task types. If you need a different Task type, open an issue instead!
public struct _TaskConfiguration {
internal enum _TaskConfiguration {
case scheduled(ScheduledTaskConfiguration)
Expand All @@ -179,7 +187,7 @@ struct RecurringTaskConfiguration: Codable {
let deadline: TimeInterval?
}

public struct ScheduledTaskConfiguration: Codable {
struct ScheduledTaskConfiguration: Codable {
let scheduledDate: Date
let executeBefore: Date?
}

0 comments on commit a4b7f3e

Please sign in to comment.