Installation


In [17]:
%%bash
python -m pip install -e .


Obtaining file:///content
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Checking if build backend supports build_editable: started
  Checking if build backend supports build_editable: finished with status 'done'
  Getting requirements to build editable: started
  Getting requirements to build editable: finished with status 'done'
  Preparing editable metadata (pyproject.toml): started
  Preparing editable metadata (pyproject.toml): finished with status 'done'
Building wheels for collected packages: queuectl
  Building editable for queuectl (pyproject.toml): started
  Building editable for queuectl (pyproject.toml): finished with status 'done'
  Created wheel for queuectl: filename=queuectl-0.1.0-0.editable-py3-none-any.whl size=3038 sha256=c12edab00bf188c9339a4c5575692bd56ddc4394ff1de652c6eddf8ec3a91b28
  Stored in directory: /tmp/pip-ephem-wheel-cache-471z63r3/wheels/bd/e2/ad/6557ae2989fbf3d2351bffa42147f9477243538a6ea9803db9
S

In [22]:
%%bash
mkdir -p queuectl queuectl/scripts

# pyproject.toml
cat > pyproject.toml <<'PY'
[project]
name = "queuectl"
version = "0.1.0"
description = "CLI-based background job queue with workers, retries, DLQ"
authors = [{name = "Your Name"}]
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
  "typer>=0.12",
  "rich>=13.7",
]

[project.scripts]
queuectl = "queuectl.cli:app"

[tool.setuptools]
packages = ["queuectl"]
PY

# requirements.txt
cat > requirements.txt <<'REQ'
typer>=0.12
rich>=13.7
REQ

# README.md (optional in Colab)
cat > README.md <<'MD'
queuectl – CLI background job queue (Python + SQLite)
MD

# queuectl/__init__.py
cat > queuectl/__init__.py <<'PY'
__all__ = ["db", "worker", "cli"]
PY

# queuectl/db.py
cat > queuectl/db.py <<'PY'
from __future__ import annotations
import os
import sqlite3
from pathlib import Path
from typing import Optional

DB_PATH = Path(os.getenv("QUEUECTL_DB_PATH", "queue.db")).resolve()

SCHEMA = """
PRAGMA journal_mode=WAL;

CREATE TABLE IF NOT EXISTS jobs (
  id TEXT PRIMARY KEY,
  command TEXT NOT NULL,
  state TEXT NOT NULL DEFAULT 'pending',
  attempts INTEGER NOT NULL DEFAULT 0,
  max_retries INTEGER NOT NULL DEFAULT 3,
  created_at TEXT NOT NULL,
  updated_at TEXT NOT NULL,
  due_at TEXT NOT NULL,
  last_error TEXT
);

CREATE TABLE IF NOT EXISTS workers (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  pid INTEGER NOT NULL,
  started_at TEXT NOT NULL,
  last_heartbeat TEXT NOT NULL,
  active_job_id TEXT,
  is_active INTEGER NOT NULL DEFAULT 1
);

CREATE UNIQUE INDEX IF NOT EXISTS idx_workers_pid ON workers(pid);
CREATE INDEX IF NOT EXISTS idx_jobs_state_due ON jobs(state, due_at);

CREATE TABLE IF NOT EXISTS config (
  key TEXT PRIMARY KEY,
  value TEXT NOT NULL
);
"""

DEFAULTS = {
    "max_retries": "3",
    "backoff_base": "2",
    "shutdown": "false",
}

def connect() -> sqlite3.Connection:
    conn = sqlite3.connect(DB_PATH, timeout=30, isolation_level=None)
    conn.row_factory = sqlite3.Row
    return conn

def init_db() -> None:
    conn = connect()
    with conn:
        conn.executescript(SCHEMA)
        for k, v in DEFAULTS.items():
            conn.execute("INSERT OR IGNORE INTO config(key, value) VALUES(?, ?)", (k, v))
    conn.close()

def get_config(key: str, default: Optional[str] = None) -> str:
    conn = connect()
    cur = conn.execute("SELECT value FROM config WHERE key=?", (key,))
    row = cur.fetchone()
    conn.close()
    if row is None:
        return default if default is not None else ""
    return row[0]

def set_config(key: str, value: str) -> None:
    conn = connect()
    with conn:
        conn.execute(
            "INSERT INTO config(key, value) VALUES(?, ?) "
            "ON CONFLICT(key) DO UPDATE SET value=excluded.value",
            (key, value),
        )
    conn.close()
PY

# queuectl/worker.py
cat > queuectl/worker.py <<'PY'
from __future__ import annotations
import os
import subprocess
import time
from datetime import datetime, timedelta, timezone
from typing import Optional
import sqlite3

from .db import connect, get_config

UTC = timezone.utc

def utcnow() -> str:
    return datetime.now(UTC).isoformat()

def read_shutdown() -> bool:
    return get_config("shutdown", "false").lower() == "true"

def claim_job(conn: sqlite3.Connection, worker_rowid: int) -> Optional[sqlite3.Row]:
    now = datetime.now(UTC).isoformat()
    # Atomic claim inside a transaction
    conn.execute("BEGIN IMMEDIATE")
    row = conn.execute(
        "SELECT id FROM jobs WHERE state='pending' AND due_at<=? ORDER BY created_at LIMIT 1",
        (now,),
    ).fetchone()
    if row is None:
        conn.execute("COMMIT")
        return None
    job_id = row[0]
    conn.execute(
        "UPDATE jobs SET state='processing', updated_at=? WHERE id=?",
        (now, job_id),
    )
    conn.execute(
        "UPDATE workers SET active_job_id=?, last_heartbeat=? WHERE id=?",
        (job_id, now, worker_rowid),
    )
    job = conn.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone()
    conn.execute("COMMIT")
    return job

def finish_job(conn: sqlite3.Connection, worker_rowid: int):
    now = utcnow()
    with conn:
        conn.execute(
            "UPDATE workers SET active_job_id=NULL, last_heartbeat=? WHERE id=?",
            (now, worker_rowid),
        )

def compute_backoff(base: int, attempts: int) -> int:
    return int(pow(base, attempts))

def run_command(command: str, timeout: Optional[int] = None) -> tuple[int, str]:
    try:
        cp = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=timeout)
        out = (cp.stdout or "") + (cp.stderr or "")
        return cp.returncode, out
    except Exception as e:
        return 1, f"exception: {e}"

def worker_loop(worker_rowid: int, heartbeat_interval: int = 2):
    conn = connect()
    try:
        while True:
            if read_shutdown():
                job = None
            else:
                job = claim_job(conn, worker_rowid)

            if job is None:
                time.sleep(1)
                with conn:
                    conn.execute("UPDATE workers SET last_heartbeat=? WHERE id=?", (utcnow(), worker_rowid))
                if read_shutdown():
                    break
                continue

            # Execute
            returncode, output = run_command(job["command"], timeout=None)
            now = utcnow()
            max_retries = int(job["max_retries"]) if job["max_retries"] is not None else int(get_config("max_retries", "3"))
            attempts = int(job["attempts"]) + 1

            if returncode == 0:
                with conn:
                    conn.execute(
                        "UPDATE jobs SET state='completed', attempts=?, updated_at=?, last_error=NULL WHERE id=?",
                        (attempts, now, job["id"]),
                    )
            else:
                base = int(get_config("backoff_base", "2"))
                if attempts > max_retries:
                    with conn:
                        conn.execute(
                            "UPDATE jobs SET state='dead', attempts=?, updated_at=?, last_error=? WHERE id=?",
                            (attempts, now, output[-5000:], job["id"]),
                        )
                else:
                    delay = compute_backoff(base, attempts)
                    due_at = (datetime.now(UTC) + timedelta(seconds=delay)).isoformat()
                    with conn:
                        conn.execute(
                            "UPDATE jobs SET state='pending', attempts=?, updated_at=?, due_at=?, last_error=? WHERE id=?",
                            (attempts, now, due_at, output[-5000:], job["id"]),
                        )
            finish_job(conn, worker_rowid)
    finally:
        conn.close()

def run_worker_process():
    conn = connect()
    now = utcnow()
    pid = os.getpid()
    with conn:
        cur = conn.execute(
            "INSERT INTO workers(pid, started_at, last_heartbeat, is_active) VALUES(?, ?, ?, 1)",
            (pid, now, now),
        )
        worker_rowid = cur.lastrowid
    try:
        worker_loop(worker_rowid)
    finally:
        with connect() as c2:
            c2.execute("UPDATE workers SET is_active=0, last_heartbeat=? WHERE id=?", (utcnow(), worker_rowid))
PY

# queuectl/cli.py
cat > queuectl/cli.py <<'PY'
from __future__ import annotations
import json
import subprocess
import sys
from typing import Optional
import sqlite3

import typer
from rich.console import Console
from rich.table import Table

from .db import init_db, connect, get_config, set_config
from .worker import run_worker_process, utcnow

app = typer.Typer(add_completion=False, help="queuectl - background job queue")
console = Console()

@app.callback(invoke_without_command=True)
def _ensure_db(ctx: typer.Context):
    init_db()

@app.command()
def enqueue(job: str = typer.Argument(..., help="Job JSON with at least id, command")):
    """Enqueue a new job from JSON string."""
    try:
        j = json.loads(job)
        jid = j["id"]
        command = j["command"]
        now = utcnow()
        max_retries = int(j.get("max_retries", get_config("max_retries", "3")))
        with connect() as conn:
            conn.execute(
                "INSERT INTO jobs(id, command, state, attempts, max_retries, created_at, updated_at, due_at) "
                "VALUES(?, ?, 'pending', 0, ?, ?, ?, ?)",
                (jid, command, max_retries, now, now, now),
            )
        console.print(f"[green]Enqueued[/] {jid}")
    except sqlite3.IntegrityError:
        console.print(f"[red]Job with id already exists[/]")
    except Exception as e:
        console.print(f"[red]Failed to enqueue:[/] {e}")

worker_app = typer.Typer(help="Manage workers")
app.add_typer(worker_app, name="worker")

@worker_app.command("start")
def worker_start(count: int = typer.Option(1, help="Number of detached workers")):
    """Start N detached worker processes."""
    set_config("shutdown", "false")
    started = 0
    for _ in range(count):
        subprocess.Popen(
            [sys.executable, "-c", "import queuectl.worker as w; w.run_worker_process()"],
            start_new_session=True
        )
        started += 1
    console.print(f"[green]Started[/] {started} worker(s)")

@worker_app.command("stop")
def worker_stop():
    """Signal graceful shutdown. Workers exit once idle."""
    set_config("shutdown", "true")
    console.print("[yellow]Shutdown signaled[/]. Workers will finish current job and exit.")

@worker_app.command("run")
def worker_run():
    """Run a single worker in the foreground (useful for Colab)."""
    set_config("shutdown", "false")
    run_worker_process()

@app.command()
def status():
    """Show counts by job state and running workers."""
    with connect() as conn:
        counts = {s: 0 for s in ["pending", "processing", "completed", "failed", "dead"]}
        for row in conn.execute("SELECT state, COUNT(*) c FROM jobs GROUP BY state"):
            counts[row[0]] = row[1]
        table = Table(title="Jobs")
        table.add_column("state"); table.add_column("count")
        for s, c in counts.items():
            table.add_row(s, str(c))
        console.print(table)

        wtable = Table(title="Workers")
        wtable.add_column("id"); wtable.add_column("pid"); wtable.add_column("active"); wtable.add_column("active_job"); wtable.add_column("last_beat")
        for row in conn.execute("SELECT id, pid, is_active, active_job_id, last_heartbeat FROM workers ORDER BY id"):
            wtable.add_row(str(row[0]), str(row[1]), "yes" if row[2] else "no", str(row[3] or "-"), str(row[4]))
        console.print(wtable)

@app.command("list")
def list_jobs(state: Optional[str] = typer.Option(None, "--state", help="Filter by state")):
    """List jobs (optionally by state)."""
    query = "SELECT id, state, attempts, max_retries, due_at, updated_at, substr(last_error,1,120) AS err FROM jobs"
    params = []
    if state:
        query += " WHERE state=?"
        params.append(state)
    query += " ORDER BY updated_at DESC LIMIT 200"
    with connect() as conn:
        table = Table(title="Jobs")
        for col in ["id", "state", "attempts", "max_retries", "due_at", "updated_at", "err"]:
            table.add_column(col)
        for row in conn.execute(query, params):
            table.add_row(*(str(row[c]) if row[c] is not None else "" for c in ["id","state","attempts","max_retries","due_at","updated_at","err"]))
        console.print(table)

dlq_app = typer.Typer(help="Dead Letter Queue")
app.add_typer(dlq_app, name="dlq")

@dlq_app.command("list")
def dlq_list():
    with connect() as conn:
        table = Table(title="DLQ (dead)")
        for col in ["id", "attempts", "max_retries", "updated_at", "err"]:
            table.add_column(col)
        for row in conn.execute("SELECT id, attempts, max_retries, updated_at, substr(last_error,1,200) AS err FROM jobs WHERE state='dead' ORDER BY updated_at DESC"):
            table.add_row(str(row[0]), str(row[1]), str(row[2]), str(row[3]), str(row[4]))
        console.print(table)

@dlq_app.command("retry")
def dlq_retry(job_id: str = typer.Argument(...)):
    now = utcnow()
    with connect() as conn:
        cur = conn.execute("SELECT id FROM jobs WHERE id=? AND state='dead'", (job_id,))
        if cur.fetchone() is None:
            console.print("[red]No such dead job[/]")
            raise typer.Exit(code=1)
        conn.execute(
            "UPDATE jobs SET state='pending', attempts=0, updated_at=?, due_at=?, last_error=NULL WHERE id=?",
            (now, now, job_id),
        )
    console.print(f"[green]Re-queued[/] {job_id}")

config_app = typer.Typer(help="Config management")
app.add_typer(config_app, name="config")

@config_app.command("get")
def config_get(key: Optional[str] = typer.Argument(None)):
    if key:
        console.print(get_config(key, ""))
    else:
        with connect() as conn:
            table = Table(title="config")
            table.add_column("key"); table.add_column("value")
            for row in conn.execute("SELECT key, value FROM config ORDER BY key"):
                table.add_row(row[0], row[1])
            console.print(table)

@config_app.command("set")
def config_set(key: str = typer.Argument(...), value: str = typer.Argument(...)):
    set_config(key, value)
    console.print(f"[green]set[/] {key}={value}")

if __name__ == "__main__":
    app()
PY

# A tiny demo script (optional)
cat > queuectl/scripts/demo.sh <<'SH'
#!/usr/bin/env bash
set -euo pipefail
queuectl config set max_retries 3 || true
queuectl config set backoff_base 2 || true
queuectl enqueue '{"id":"ok1","command":"echo ok1"}'
queuectl enqueue '{"id":"ok2","command":"sleep 1 && echo ok2"}'
queuectl enqueue '{"id":"bad","command":"not_a_cmd","max_retries":2}'
queuectl worker start --count 2
sleep 3
queuectl status
queuectl list --state completed
queuectl list --state dead
queuectl dlq list
queuectl worker stop
SH
chmod +x queuectl/scripts/demo.sh


In [None]:
%%bash
pip install -e .


Obtaining file:///content
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Checking if build backend supports build_editable: started
  Checking if build backend supports build_editable: finished with status 'done'
  Getting requirements to build editable: started
  Getting requirements to build editable: finished with status 'done'
  Preparing editable metadata (pyproject.toml): started
  Preparing editable metadata (pyproject.toml): finished with status 'done'
Building wheels for collected packages: queuectl
  Building editable for queuectl (pyproject.toml): started
  Building editable for queuectl (pyproject.toml): finished with status 'done'
  Created wheel for queuectl: filename=queuectl-0.1.0-0.editable-py3-none-any.whl size=3038 sha256=45705c21cd13ba1bad5ed46d5e9ae170a7da444739b44076c3253074b99832a8
  Stored in directory: /tmp/pip-ephem-wheel-cache-6x2omp9b/wheels/bd/e2/ad/6557ae2989fbf3d2351bffa42147f9477243538a6ea9803db9
S

In [23]:
%%bash
queuectl status


         Jobs         
┏━━━━━━━━━━━━┳━━━━━━━┓
┃ state      ┃ count ┃
┡━━━━━━━━━━━━╇━━━━━━━┩
│ pending    │ 0     │
│ processing │ 0     │
│ completed  │ 2     │
│ failed     │ 0     │
│ dead       │ 1     │
└────────────┴───────┘
                               Workers                                
┏━━━━┳━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ id ┃ pid  ┃ active ┃ active_job ┃ last_beat                        ┃
┡━━━━╇━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ 1  │ 2773 │ no     │ -          │ 2025-11-10T05:20:00.911377+00:00 │
│ 2  │ 2774 │ no     │ -          │ 2025-11-10T05:20:00.927228+00:00 │
│ 3  │ 3019 │ yes    │ -          │ 2025-11-10T05:19:24.926012+00:00 │
└────┴──────┴────────┴────────────┴──────────────────────────────────┘


In [24]:
%%bash
queuectl config set max_retries 3
queuectl config set backoff_base 2
queuectl config get



set max_retries=3
set backoff_base=2
         config         
┏━━━━━━━━━━━━━━┳━━━━━━━┓
┃ key          ┃ value ┃
┡━━━━━━━━━━━━━━╇━━━━━━━┩
│ backoff_base │ 2     │
│ max_retries  │ 3     │
│ shutdown     │ true  │
└──────────────┴───────┘


In [25]:
%%bash
queuectl enqueue '{"id":"job1","command":"echo Hello from job1"}'
queuectl enqueue '{"id":"job2","command":"sleep 2 && echo job2 done"}'
queuectl enqueue '{"id":"bad1","command":"does_not_exist","max_retries":2}'


Job with id already exists
Job with id already exists
Job with id already exists


In [27]:
%%bash
timeout 10s queuectl worker run || true


In [29]:
%%bash
queuectl status
queuectl list --state pending
queuectl list --state processing
queuectl list --state completed
queuectl dlq list


         Jobs         
┏━━━━━━━━━━━━┳━━━━━━━┓
┃ state      ┃ count ┃
┡━━━━━━━━━━━━╇━━━━━━━┩
│ pending    │ 0     │
│ processing │ 0     │
│ completed  │ 2     │
│ failed     │ 0     │
│ dead       │ 1     │
└────────────┴───────┘
                                Workers                                
┏━━━━┳━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ id ┃ pid   ┃ active ┃ active_job ┃ last_beat                        ┃
┡━━━━╇━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ 1  │ 2773  │ no     │ -          │ 2025-11-10T05:20:00.911377+00:00 │
│ 2  │ 2774  │ no     │ -          │ 2025-11-10T05:20:00.927228+00:00 │
│ 3  │ 3019  │ yes    │ -          │ 2025-11-10T05:19:24.926012+00:00 │
│ 4  │ 30745 │ yes    │ -          │ 2025-11-10T07:15:31.064515+00:00 │
│ 5  │ 30746 │ yes    │ -          │ 2025-11-10T07:15:31.074000+00:00 │
│ 6  │ 30809 │ yes    │ -          │ 2025-11-10T07:15:00.126970+00:00 │
│ 7  │ 30852 │ yes    │ -          │ 2025-11-10T07

In [30]:
%%bash
queuectl worker stop


Shutdown signaled. Workers will finish current job and exit.
