diff --git a/apps/website/docs/api-reference/tasks/classes/task-driver-manager.mdx b/apps/website/docs/api-reference/tasks/classes/task-driver-manager.mdx index 2f7f6a44..c59ee1bb 100644 --- a/apps/website/docs/api-reference/tasks/classes/task-driver-manager.mdx +++ b/apps/website/docs/api-reference/tasks/classes/task-driver-manager.mdx @@ -41,6 +41,7 @@ const taskId = await manager.createTask({ ```ts title="Signature" class TaskDriverManager { + public driver: TaskDriver | null = null; setDriver(driver: TaskDriver) => void; createTask(task: TaskData) => Promise; deleteTask(identifier: string) => Promise; @@ -50,6 +51,11 @@ class TaskDriverManager {
+### driver + +TaskDriver | null`} /> + + ### setDriver TaskDriver) => void`} /> diff --git a/apps/website/docs/api-reference/tasks/classes/tasks-plugin.mdx b/apps/website/docs/api-reference/tasks/classes/tasks-plugin.mdx index ba47785c..03e624d2 100644 --- a/apps/website/docs/api-reference/tasks/classes/tasks-plugin.mdx +++ b/apps/website/docs/api-reference/tasks/classes/tasks-plugin.mdx @@ -13,7 +13,7 @@ import MemberDescription from '@site/src/components/MemberDescription'; ## TasksPlugin - + CommandKit plugin that provides task management capabilities. diff --git a/apps/website/docs/api-reference/tasks/interfaces/tasks-plugin-options.mdx b/apps/website/docs/api-reference/tasks/interfaces/tasks-plugin-options.mdx index 3e247ec8..4b505e67 100644 --- a/apps/website/docs/api-reference/tasks/interfaces/tasks-plugin-options.mdx +++ b/apps/website/docs/api-reference/tasks/interfaces/tasks-plugin-options.mdx @@ -13,7 +13,7 @@ import MemberDescription from '@site/src/components/MemberDescription'; ## TasksPluginOptions - + Configuration options for the tasks plugin. @@ -22,6 +22,20 @@ Future versions may support customizing the tasks directory path and HMR behavio ```ts title="Signature" interface TasksPluginOptions { - + initializeDefaultDriver?: boolean; } ``` + +
+ +### initializeDefaultDriver + + + +Whether to initialize the default driver. + +If true, the plugin will initialize the default driver. +If false, the plugin will not initialize the default driver. + + +
diff --git a/apps/website/docs/guide/17-tasks/01-getting-started.mdx b/apps/website/docs/guide/17-tasks/01-getting-started.mdx index 530f551d..689acf63 100644 --- a/apps/website/docs/guide/17-tasks/01-getting-started.mdx +++ b/apps/website/docs/guide/17-tasks/01-getting-started.mdx @@ -28,9 +28,10 @@ export default defineConfig({ }); ``` -## Setting Up a Driver +## Setting Up a Driver (optional) -Configure a driver before the tasks plugin loads: +By default, the plugin will initialize the sqlite driver. You can set up a different driver by calling `setDriver` function from the `@commandkit/tasks` package. +If you want to disable the default driver initialization behavior, you can pass `initializeDefaultDriver: false` to the `tasks()` options in your commandkit config. ```ts import { setDriver } from '@commandkit/tasks'; diff --git a/apps/website/docs/guide/17-tasks/02-task-drivers.mdx b/apps/website/docs/guide/17-tasks/02-task-drivers.mdx index cf9c7d44..43768d65 100644 --- a/apps/website/docs/guide/17-tasks/02-task-drivers.mdx +++ b/apps/website/docs/guide/17-tasks/02-task-drivers.mdx @@ -173,6 +173,10 @@ class CustomDriver implements TaskDriver { async create(task: TaskData): Promise { // Implement your scheduling logic const id = await this.scheduler.schedule(task); + + // invoke the runner function to execute the task (normally, this would be invoked by the scheduler) + await this.runner?.(task); + return id; } diff --git a/packages/tasks/README.md b/packages/tasks/README.md index f9711d32..8446cfe2 100644 --- a/packages/tasks/README.md +++ b/packages/tasks/README.md @@ -29,7 +29,9 @@ export default defineConfig({ }); ``` -### 2. Set up a driver +### 2. Set up a driver (optional) + +By default, the plugin will initialize the sqlite driver. You can set up a different driver by calling `setDriver` function from the `@commandkit/tasks` package. If you want to disable the default driver initialization behavior, you can pass `initializeDefaultDriver: false` to the `tasks()` options in your commandkit config. ```ts import { setDriver } from '@commandkit/tasks'; diff --git a/packages/tasks/src/driver-manager.ts b/packages/tasks/src/driver-manager.ts index 613b7675..3fe63259 100644 --- a/packages/tasks/src/driver-manager.ts +++ b/packages/tasks/src/driver-manager.ts @@ -24,7 +24,7 @@ import { PartialTaskData, TaskData } from './types'; * ``` */ export class TaskDriverManager { - private driver: TaskDriver | null = null; + public driver: TaskDriver | null = null; /** * Sets the active task driver. diff --git a/packages/tasks/src/drivers/sqlite.ts b/packages/tasks/src/drivers/sqlite.ts index 26078576..b9daa48d 100644 --- a/packages/tasks/src/drivers/sqlite.ts +++ b/packages/tasks/src/drivers/sqlite.ts @@ -1,6 +1,6 @@ import { TaskDriver, TaskRunner } from '../driver'; import { TaskData } from '../types'; -import { DatabaseSync } from 'node:sqlite'; +import { DatabaseSync, StatementSync } from 'node:sqlite'; import cronParser from 'cron-parser'; /** @@ -21,6 +21,14 @@ export class SQLiteDriver implements TaskDriver { private runner: TaskRunner | null = null; private db: DatabaseSync; private interval: NodeJS.Timeout | null = null; + private statements!: { + count: StatementSync; + select: StatementSync; + insert: StatementSync; + delete: StatementSync; + updateNextRun: StatementSync; + updateCompleted: StatementSync; + }; /** * Create a new SQLiteDriver instance. @@ -43,7 +51,7 @@ export class SQLiteDriver implements TaskDriver { * Initialize the jobs table and start the polling loop. */ private init() { - this.db.exec(`CREATE TABLE IF NOT EXISTS jobs ( + this.db.exec(/* sql */ `CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, data TEXT, @@ -55,6 +63,26 @@ export class SQLiteDriver implements TaskDriver { created_at INTEGER NOT NULL, last_run INTEGER )`); + + this.statements = { + count: this.db.prepare( + /* sql */ `SELECT COUNT(*) as count FROM jobs WHERE status = 'pending' AND next_run <= ?`, + ), + select: this.db.prepare( + /* sql */ `SELECT * FROM jobs WHERE status = 'pending' AND next_run <= ? ORDER BY next_run ASC LIMIT ? OFFSET ?`, + ), + insert: this.db.prepare( + /* sql */ `INSERT INTO jobs (name, data, schedule_type, schedule_value, timezone, next_run, status, created_at) VALUES (?, ?, ?, ?, ?, ?, 'pending', ?)`, + ), + delete: this.db.prepare(/* sql */ `DELETE FROM jobs WHERE id = ?`), + updateNextRun: this.db.prepare( + /* sql */ `UPDATE jobs SET next_run = ?, last_run = ? WHERE id = ?`, + ), + updateCompleted: this.db.prepare( + /* sql */ `UPDATE jobs SET status = 'completed', last_run = ? WHERE id = ?`, + ), + }; + this.startPolling(); } @@ -82,10 +110,7 @@ export class SQLiteDriver implements TaskDriver { nextRun = typeof schedule === 'number' ? schedule : schedule.getTime(); } - const stmt = this.db.prepare( - `INSERT INTO jobs (name, data, schedule_type, schedule_value, timezone, next_run, status, created_at) VALUES (?, ?, ?, ?, ?, ?, 'pending', ?)`, - ); - const result = stmt.run( + const result = this.statements.insert.run( name, JSON.stringify(data ?? {}), scheduleType, @@ -111,8 +136,7 @@ export class SQLiteDriver implements TaskDriver { * @param identifier Job ID */ async delete(identifier: string): Promise { - const stmt = this.db.prepare(`DELETE FROM jobs WHERE id = ?`); - stmt.run(identifier); + this.statements.delete.run(identifier); } /** @@ -129,7 +153,7 @@ export class SQLiteDriver implements TaskDriver { */ private startPolling() { if (this.interval) clearInterval(this.interval); - this.interval = setInterval(() => this.pollJobs(), 1000); + this.interval = setInterval(() => this.pollJobs(), 1000).unref(); // Run immediately on startup this.pollJobs(); } @@ -140,24 +164,30 @@ export class SQLiteDriver implements TaskDriver { private pollJobs() { if (!this.runner) return; const now = Date.now(); - const stmt = this.db.prepare( - `SELECT * FROM jobs WHERE status = 'pending' AND next_run <= ?`, - ); - const rows = stmt.all(now) as Array<{ - id: number; - name: string; - data: string; - schedule_type: string; - schedule_value: string; - timezone: string | null; - next_run: number; - status: string; - created_at: number; - last_run: number | null; - }>; - - for (const job of rows) { - this.executeJob(job); + const chunkSize = 10; + + const countResult = this.statements.count.get(now) as { count: number }; + const totalJobs = countResult.count; + + if (totalJobs === 0) return; + + for (let offset = 0; offset < totalJobs; offset += chunkSize) { + const rows = this.statements.select.all(now, chunkSize, offset) as Array<{ + id: number; + name: string; + data: string; + schedule_type: string; + schedule_value: string; + timezone: string | null; + next_run: number; + status: string; + created_at: number; + last_run: number | null; + }>; + + for (const job of rows) { + this.executeJob(job); + } } } @@ -202,21 +232,12 @@ export class SQLiteDriver implements TaskDriver { nextRun = null; } if (nextRun) { - const stmt = this.db.prepare( - `UPDATE jobs SET next_run = ?, last_run = ? WHERE id = ?`, - ); - stmt.run(nextRun, now, job.id); + this.statements.updateNextRun.run(nextRun, now, job.id); } else { - const stmt = this.db.prepare( - `UPDATE jobs SET status = 'completed', last_run = ? WHERE id = ?`, - ); - stmt.run(now, job.id); + this.statements.updateCompleted.run(now, job.id); } } else { - const stmt = this.db.prepare( - `UPDATE jobs SET status = 'completed', last_run = ? WHERE id = ?`, - ); - stmt.run(now, job.id); + this.statements.updateCompleted.run(now, job.id); } } } diff --git a/packages/tasks/src/index.ts b/packages/tasks/src/index.ts index 8ebbc55e..42e2c897 100644 --- a/packages/tasks/src/index.ts +++ b/packages/tasks/src/index.ts @@ -27,7 +27,10 @@ import { TasksPlugin, TasksPluginOptions } from './plugin'; * @returns A configured tasks plugin instance */ export function tasks(options?: TasksPluginOptions) { - return new TasksPlugin(options ?? {}); + return new TasksPlugin({ + initializeDefaultDriver: true, + ...options, + }); } export * from './plugin'; diff --git a/packages/tasks/src/plugin.ts b/packages/tasks/src/plugin.ts index 86ed9f28..ae795f2d 100644 --- a/packages/tasks/src/plugin.ts +++ b/packages/tasks/src/plugin.ts @@ -1,5 +1,4 @@ import { - CommandKit, CommandKitPluginRuntime, Logger, RuntimePlugin, @@ -23,9 +22,15 @@ import { existsSync } from 'node:fs'; * Future versions may support customizing the tasks directory path and HMR behavior. */ export interface TasksPluginOptions { - // Future options may include: - // tasksPath?: string; - // enableHMR?: boolean; + /** + * Whether to initialize the default driver. + * + * If true, the plugin will initialize the default driver. + * If false, the plugin will not initialize the default driver. + * + * @default true + */ + initializeDefaultDriver?: boolean; } /** @@ -64,6 +69,19 @@ export class TasksPlugin extends RuntimePlugin { * @param ctx - The CommandKit plugin runtime context */ public async activate(ctx: CommandKitPluginRuntime): Promise { + if (this.options.initializeDefaultDriver && !taskDriverManager.driver) { + try { + const { SQLiteDriver } = + require('./drivers/sqlite') as typeof import('./drivers/sqlite'); + + taskDriverManager.setDriver(new SQLiteDriver()); + } catch (e: any) { + Logger.error( + `Failed to initialize the default driver for tasks plugin: ${e?.stack || e}`, + ); + } + } + taskDriverManager.setTaskRunner(async (task) => { try { const taskInstance = this.tasks.get(task.name);