Skip to content
Ori Pekelman edited this page May 11, 2026 · 1 revision

Tep::Job

A sidekiq-shaped job queue backed by SQLite. Use it when work should survive the request lifetime: slow LLM calls, follow-up emails, periodic cache refreshes.

For synchronous fan-out within a single request, use Tep::Parallel instead.

Storage

One table, created on demand by init_schema:

CREATE TABLE tep_jobs (
  id          INTEGER PRIMARY KEY,
  job_name    TEXT,      -- registered class identifier
  arg         TEXT,      -- single string payload
  status      TEXT,      -- queued|running|done|failed
  created_at  INTEGER,
  finished_at INTEGER,
  result      TEXT       -- written by user code; see notes below
)

The single-arg payload (arg) is intentional: structured data goes through Tep::Json. Sidekiq's perform_async(a, b, c) translates to encoding the tuple as a JSON string and decoding it in perform.

API

Method What it does
Tep::Job.init_schema(db_path) Idempotent. Creates the table if missing.
Tep::Job.enqueue(name, arg, db_path) Append a queued row. Returns the new id.
Tep::Job.fetch_next(db_path) Claim the oldest queued row → running. Returns "row_id|name|arg" or "".
Tep::Job.mark_done(db_path, row_id) Flip the row to done + record finished_at.
Tep::Job.mark_failed(db_path, row_id) Flip the row to failed + record finished_at.

The user writes the perform body and the dispatch ladder; the framework owns the row-state transitions.

Walkthrough

DB_PATH = "/var/lib/tep/jobs.db"

on_start do
  Tep::Job.init_schema(DB_PATH)
end

# Enqueue from anywhere -- a route handler, a filter, an on_start hook.
post '/users' do
  Tep::Job.enqueue("WelcomeEmail", params[:email], DB_PATH)
  status 202
  "queued"
end

# Worker route -- you can also call this from a tmux'd loop.
get '/work/drain_one' do
  claim = Tep::Job.fetch_next(DB_PATH)
  if claim.length == 0
    "idle"
  else
    parts  = claim.split("|", 3)
    row_id = parts[0].to_i
    name   = parts[1]
    arg    = parts[2]

    ok = false
    if name == "WelcomeEmail"
      ok = WelcomeEmail.new.perform(arg)
    end

    if ok
      Tep::Job.mark_done(DB_PATH, row_id)
    else
      Tep::Job.mark_failed(DB_PATH, row_id)
    end
    name + "=" + (ok ? "ok" : "fail")
  end
end

class WelcomeEmail
  def perform(email)
    # ... do the work; return true/false.
    true
  end
end

Why the user-side dispatch ladder?

In a virtual-dispatch world, Tep::Job.process_one(db_path) would look up the handler by job_name and call handler.perform(arg) through a base-class slot. Spinel's PtrArray is homogeneously typed and doesn't carry per-element class-id tags, so an array of mixed Tep::Job subclasses falls through to the base class's perform — the user's override never runs.

The fix here is the same one routes use: dispatch by class-id at the compile-time-known set of subclasses. For routes, bin/tep auto-emits the class-id switch (one TepRoute_N < Tep::Handler per get '/x' do ... end block). A future bin/tep pass could do the same for Tep::Job subclasses; until then, the user writes the if name == "..." ladder.

Result storage

Tep::Job.mark_done only flips the row's status. To persist a job result string, write it via a separate db.bind_str (or a side channel, as the tep test suite does):

# Inside your worker handler, after running the job:
db = Tep::SQLite.new
if db.open(DB_PATH)
  db.prepare("UPDATE tep_jobs SET result = ? WHERE id = ?")
  db.bind_str(1, result_string)
  db.bind_int(2, row_id)
  db.step
  db.finalize
  db.close
end
Tep::Job.mark_done(DB_PATH, row_id)

The reason: when the framework's mark_done itself binds a String parameter, spinel's analyzer widens the cmeth's result param to poly (a spinel followup-to-#429 issue with cross-class param widening cascading into bind_str callsites), and the FFI bind loses its :str typing. Writing the result column from user code sidesteps that cascade.

Drain modes

One-shot per HTTP hit

The simplest worker is a route — a control endpoint or a cron job running curl localhost:4567/work/drain_one. No prefork required; just call until the response is "idle".

Long-running worker process

Spawn a sidecar with Sock.sphttp_fork:

on_start do
  Tep::Job.init_schema(DB_PATH)

  pid = Sock.sphttp_fork
  if pid == 0
    # Child: drain forever.
    while true
      claim = Tep::Job.fetch_next(DB_PATH)
      if claim.length == 0
        Tep::Scheduler.pause(2)
      else
        # ... dispatch + mark_done ...
      end
    end
  end
end

Tep::Scheduler.pause(seconds) yields the current fiber back; from outside any fiber it falls back to plain sleep.

Pitfalls

  • One worker per fetch_next. No row locking is involved; if you run two workers against the same DB, the same row can be claimed twice before the first UPDATE ... SET status='running' commits. Make the perform side idempotent or single-thread the worker (--workers 1).
  • No retry shape. A failed row stays failed. Re-enqueue yourself if you want exponential backoff; the schema's created_at column is the input to that policy.
  • String args only. Any structured payload is Tep::Json in on enqueue, Tep::Json out in perform.
  • No scheduled jobs (yet). enqueue puts the row at the current time and fetch_next is FIFO. For "run X at time T", add a run_at INTEGER column and a SQL WHERE run_at <= ? in your custom fetch_next — there's nothing magic to hand-roll.

Clone this wiki locally