Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const taskId = await manager.createTask({

```ts title="Signature"
class TaskDriverManager {
public driver: TaskDriver | null = null;
setDriver(driver: TaskDriver) => void;
createTask(task: TaskData) => Promise<string>;
deleteTask(identifier: string) => Promise<void>;
Expand All @@ -50,6 +51,11 @@ class TaskDriverManager {

<div className="members-wrapper">

### driver

<MemberInfo kind="property" type={`<a href='/docs/next/api-reference/tasks/interfaces/task-driver#taskdriver'>TaskDriver</a> | null`} />


### setDriver

<MemberInfo kind="method" type={`(driver: <a href='/docs/next/api-reference/tasks/interfaces/task-driver#taskdriver'>TaskDriver</a>) => void`} />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import MemberDescription from '@site/src/components/MemberDescription';

## TasksPlugin

<GenerationInfo sourceFile="packages/tasks/src/plugin.ts" sourceLine="49" packageName="@commandkit/tasks" />
<GenerationInfo sourceFile="packages/tasks/src/plugin.ts" sourceLine="54" packageName="@commandkit/tasks" />

CommandKit plugin that provides task management capabilities.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import MemberDescription from '@site/src/components/MemberDescription';

## TasksPluginOptions

<GenerationInfo sourceFile="packages/tasks/src/plugin.ts" sourceLine="25" packageName="@commandkit/tasks" />
<GenerationInfo sourceFile="packages/tasks/src/plugin.ts" sourceLine="24" packageName="@commandkit/tasks" />

Configuration options for the tasks plugin.

Expand All @@ -22,6 +22,20 @@ Future versions may support customizing the tasks directory path and HMR behavio

```ts title="Signature"
interface TasksPluginOptions {

initializeDefaultDriver?: boolean;
}
```

<div className="members-wrapper">

### initializeDefaultDriver

<MemberInfo kind="property" type={`boolean`} default={`true`} />

Whether to initialize the default driver.

If true, the plugin will initialize the default driver.
If false, the plugin will not initialize the default driver.


</div>
5 changes: 3 additions & 2 deletions apps/website/docs/guide/17-tasks/01-getting-started.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ export default defineConfig({
});
```

## Setting Up a Driver
## Setting Up a Driver (optional)

Configure a driver before the tasks plugin loads:
By default, the plugin will initialize the sqlite driver. You can set up a different driver by calling `setDriver` function from the `@commandkit/tasks` package.
If you want to disable the default driver initialization behavior, you can pass `initializeDefaultDriver: false` to the `tasks()` options in your commandkit config.

```ts
import { setDriver } from '@commandkit/tasks';
Expand Down
4 changes: 4 additions & 0 deletions apps/website/docs/guide/17-tasks/02-task-drivers.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ class CustomDriver implements TaskDriver {
async create(task: TaskData): Promise<string> {
// Implement your scheduling logic
const id = await this.scheduler.schedule(task);

// invoke the runner function to execute the task (normally, this would be invoked by the scheduler)
await this.runner?.(task);

return id;
}

Expand Down
4 changes: 3 additions & 1 deletion packages/tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ export default defineConfig({
});
```

### 2. Set up a driver
### 2. Set up a driver (optional)

By default, the plugin will initialize the sqlite driver. You can set up a different driver by calling `setDriver` function from the `@commandkit/tasks` package. If you want to disable the default driver initialization behavior, you can pass `initializeDefaultDriver: false` to the `tasks()` options in your commandkit config.

```ts
import { setDriver } from '@commandkit/tasks';
Expand Down
2 changes: 1 addition & 1 deletion packages/tasks/src/driver-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { PartialTaskData, TaskData } from './types';
* ```
*/
export class TaskDriverManager {
private driver: TaskDriver | null = null;
public driver: TaskDriver | null = null;

/**
* Sets the active task driver.
Expand Down
99 changes: 60 additions & 39 deletions packages/tasks/src/drivers/sqlite.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { TaskDriver, TaskRunner } from '../driver';
import { TaskData } from '../types';
import { DatabaseSync } from 'node:sqlite';
import { DatabaseSync, StatementSync } from 'node:sqlite';
import cronParser from 'cron-parser';

/**
Expand All @@ -21,6 +21,14 @@ export class SQLiteDriver implements TaskDriver {
private runner: TaskRunner | null = null;
private db: DatabaseSync;
private interval: NodeJS.Timeout | null = null;
private statements!: {
count: StatementSync;
select: StatementSync;
insert: StatementSync;
delete: StatementSync;
updateNextRun: StatementSync;
updateCompleted: StatementSync;
};

/**
* Create a new SQLiteDriver instance.
Expand All @@ -43,7 +51,7 @@ export class SQLiteDriver implements TaskDriver {
* Initialize the jobs table and start the polling loop.
*/
private init() {
this.db.exec(`CREATE TABLE IF NOT EXISTS jobs (
this.db.exec(/* sql */ `CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
data TEXT,
Expand All @@ -55,6 +63,26 @@ export class SQLiteDriver implements TaskDriver {
created_at INTEGER NOT NULL,
last_run INTEGER
)`);

this.statements = {
count: this.db.prepare(
/* sql */ `SELECT COUNT(*) as count FROM jobs WHERE status = 'pending' AND next_run <= ?`,
),
select: this.db.prepare(
/* sql */ `SELECT * FROM jobs WHERE status = 'pending' AND next_run <= ? ORDER BY next_run ASC LIMIT ? OFFSET ?`,
),
insert: this.db.prepare(
/* sql */ `INSERT INTO jobs (name, data, schedule_type, schedule_value, timezone, next_run, status, created_at) VALUES (?, ?, ?, ?, ?, ?, 'pending', ?)`,
),
delete: this.db.prepare(/* sql */ `DELETE FROM jobs WHERE id = ?`),
updateNextRun: this.db.prepare(
/* sql */ `UPDATE jobs SET next_run = ?, last_run = ? WHERE id = ?`,
),
updateCompleted: this.db.prepare(
/* sql */ `UPDATE jobs SET status = 'completed', last_run = ? WHERE id = ?`,
),
};

this.startPolling();
}

Expand Down Expand Up @@ -82,10 +110,7 @@ export class SQLiteDriver implements TaskDriver {
nextRun = typeof schedule === 'number' ? schedule : schedule.getTime();
}

const stmt = this.db.prepare(
`INSERT INTO jobs (name, data, schedule_type, schedule_value, timezone, next_run, status, created_at) VALUES (?, ?, ?, ?, ?, ?, 'pending', ?)`,
);
const result = stmt.run(
const result = this.statements.insert.run(
name,
JSON.stringify(data ?? {}),
scheduleType,
Expand All @@ -111,8 +136,7 @@ export class SQLiteDriver implements TaskDriver {
* @param identifier Job ID
*/
async delete(identifier: string): Promise<void> {
const stmt = this.db.prepare(`DELETE FROM jobs WHERE id = ?`);
stmt.run(identifier);
this.statements.delete.run(identifier);
}

/**
Expand All @@ -129,7 +153,7 @@ export class SQLiteDriver implements TaskDriver {
*/
private startPolling() {
if (this.interval) clearInterval(this.interval);
this.interval = setInterval(() => this.pollJobs(), 1000);
this.interval = setInterval(() => this.pollJobs(), 1000).unref();
// Run immediately on startup
this.pollJobs();
}
Expand All @@ -140,24 +164,30 @@ export class SQLiteDriver implements TaskDriver {
private pollJobs() {
if (!this.runner) return;
const now = Date.now();
const stmt = this.db.prepare(
`SELECT * FROM jobs WHERE status = 'pending' AND next_run <= ?`,
);
const rows = stmt.all(now) as Array<{
id: number;
name: string;
data: string;
schedule_type: string;
schedule_value: string;
timezone: string | null;
next_run: number;
status: string;
created_at: number;
last_run: number | null;
}>;

for (const job of rows) {
this.executeJob(job);
const chunkSize = 10;

const countResult = this.statements.count.get(now) as { count: number };
const totalJobs = countResult.count;

if (totalJobs === 0) return;

for (let offset = 0; offset < totalJobs; offset += chunkSize) {
const rows = this.statements.select.all(now, chunkSize, offset) as Array<{
id: number;
name: string;
data: string;
schedule_type: string;
schedule_value: string;
timezone: string | null;
next_run: number;
status: string;
created_at: number;
last_run: number | null;
}>;

for (const job of rows) {
this.executeJob(job);
}
}
}

Expand Down Expand Up @@ -202,21 +232,12 @@ export class SQLiteDriver implements TaskDriver {
nextRun = null;
}
if (nextRun) {
const stmt = this.db.prepare(
`UPDATE jobs SET next_run = ?, last_run = ? WHERE id = ?`,
);
stmt.run(nextRun, now, job.id);
this.statements.updateNextRun.run(nextRun, now, job.id);
} else {
const stmt = this.db.prepare(
`UPDATE jobs SET status = 'completed', last_run = ? WHERE id = ?`,
);
stmt.run(now, job.id);
this.statements.updateCompleted.run(now, job.id);
}
} else {
const stmt = this.db.prepare(
`UPDATE jobs SET status = 'completed', last_run = ? WHERE id = ?`,
);
stmt.run(now, job.id);
this.statements.updateCompleted.run(now, job.id);
}
}
}
5 changes: 4 additions & 1 deletion packages/tasks/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import { TasksPlugin, TasksPluginOptions } from './plugin';
* @returns A configured tasks plugin instance
*/
export function tasks(options?: TasksPluginOptions) {
return new TasksPlugin(options ?? {});
return new TasksPlugin({
initializeDefaultDriver: true,
...options,
});
}

export * from './plugin';
Expand Down
26 changes: 22 additions & 4 deletions packages/tasks/src/plugin.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {
CommandKit,
CommandKitPluginRuntime,
Logger,
RuntimePlugin,
Expand All @@ -23,9 +22,15 @@ import { existsSync } from 'node:fs';
* Future versions may support customizing the tasks directory path and HMR behavior.
*/
export interface TasksPluginOptions {
// Future options may include:
// tasksPath?: string;
// enableHMR?: boolean;
/**
* Whether to initialize the default driver.
*
* If true, the plugin will initialize the default driver.
* If false, the plugin will not initialize the default driver.
*
* @default true
*/
initializeDefaultDriver?: boolean;
}

/**
Expand Down Expand Up @@ -64,6 +69,19 @@ export class TasksPlugin extends RuntimePlugin<TasksPluginOptions> {
* @param ctx - The CommandKit plugin runtime context
*/
public async activate(ctx: CommandKitPluginRuntime): Promise<void> {
if (this.options.initializeDefaultDriver && !taskDriverManager.driver) {
try {
const { SQLiteDriver } =
require('./drivers/sqlite') as typeof import('./drivers/sqlite');

taskDriverManager.setDriver(new SQLiteDriver());
} catch (e: any) {
Logger.error(
`Failed to initialize the default driver for tasks plugin: ${e?.stack || e}`,
);
}
}

taskDriverManager.setTaskRunner(async (task) => {
try {
const taskInstance = this.tasks.get(task.name);
Expand Down