-
Notifications
You must be signed in to change notification settings - Fork 1
Scheduler
Tiny fiber-based cooperative scheduler. Lets a long-running response (SSE stream, long-poll, slow batch) yield back to the worker so the process doesn't get pinned for the connection's whole lifetime.
-
Timer parking:
Tep::Scheduler.pause(seconds)suspends the current fiber until at-or-aftersecondsfrom now. -
I/O parking:
Tep::Scheduler.io_wait(fd, mode, timeout)suspends until the fd is ready (mode 1=READ, 2=WRITE, 3=both) or the timeout fires. -
Cooperation: a per-app fiber registry,
tickto resume the next-ready fiber,run_until_emptyto drain.
What it's NOT:
- Not a green-threads implementation. Fibers yield explicitly; there's no preemption.
- Not an
EventMachine-style reactor. Sockets the framework's HTTP server uses don't auto-park on read/write — only the fibers you spawn from inside a handler do.
Tep::Scheduler.spawn_fiber(Fiber.new {
Tep::Logger.new.info("background tick")
Tep::Scheduler.pause(60)
Tep::Logger.new.info("background tick after 60s")
})spawn_fiber(f) registers the fiber in the per-process scheduler.
The fiber runs on the next tick / run_for / run_until_empty
call.
You generally don't have to call tick yourself; the worker's
event loop drives it. But for tests, batch jobs, or one-off scripts,
the building blocks are:
Tep::Scheduler.tick(poll_timeout_ms) # one pass
Tep::Scheduler.run_until_empty # drain all ready fibers
Tep::Scheduler.run_for(seconds) # drain for N secondsrun_for blocks in poll(2) between empty passes, so it doesn't
busy-spin while waiting for the next timer / I/O event.
Inside a fiber:
Tep::Scheduler.spawn_fiber(Fiber.new {
i = 0
while i < 10
Tep::Logger.new.info("step " + i.to_s)
Tep::Scheduler.pause(1)
i += 1
end
})pause(1) yields the fiber for at least 1 second. Other fibers
ready in that window run first. From outside any fiber (the parent
scheduler), pause falls back to plain sleep.
The method is named pause rather than sleep because spinel
fixed a Kernel.cmeth-style call routing bug (#428) — the framework
side was renamed defensively and the name stuck. They're the same
thing if you're not coming from sidekiq's API.
Tep::Scheduler.spawn_fiber(Fiber.new {
fd = Sock.sphttp_connect("api.local", 8080)
Sock.sphttp_set_nonblock(fd)
Sock.sphttp_write_str(fd, "GET / HTTP/1.0\r\n\r\n")
Tep::Scheduler.io_wait(fd, Tep::Scheduler::READ, 5)
bytes = Sock.sphttp_recv_all(fd, 0)
Sock.sphttp_close(fd)
})Mode constants:
| Constant | Value |
|---|---|
Tep::Scheduler::READ |
1 |
Tep::Scheduler::WRITE |
2 |
Pass 1 | 2 == 3 for both.
io_wait returns the observed-ready bits (0 on timeout, the
mode bits OR'd together on ready). The parent scheduler runs the
underlying poll(2); the fiber is resumed when its fd is ready.
- SSE streams. Park between event sends so the request doesn't pin the worker for the whole connection.
-
Long-poll endpoints.
io_waiton an upstream socket; resume on its readiness. - Batch refresh inside an HTTP handler. Run a tick during a blocking step; other fibers (e.g. the LLM commentary refresh) make progress.
-
pausedoesn't yield from outside a fiber. A top-levelTep::Scheduler.pause(5)is justsleep(5). Wrap in a fiber if you want cooperative behaviour. -
io_wait's fd must be non-blocking. Otherwise the recv after it blocks the whole worker.Sock.sphttp_set_nonblock(fd)before the firstio_wait. -
Worker scheduler is per-process. Under
--workers Nprefork, each worker has its own registry. Fibers don't migrate. (This is usually the right thing — fibers are tied to the request that spawned them.) - No explicit cancellation. Killing a fiber means letting its body fall through. The cooperative model assumes the body has a way to check for "should I keep running?" — typically a wake-at guard or an external file/atomic.