-
Notifications
You must be signed in to change notification settings - Fork 1
Parallel
Fork-based fan-out, shaped after the grosser/parallel gem's
Parallel.map(items) { ... } surface. Spinel can't lower closures,
so the per-item body lifts into a Worker class instead.
Concurrent within one request lifetime:
- Probe N upstream services in parallel.
- Render N small templates against N different contexts.
- Compute a digest over N files.
For work that should survive the request, see Tep::Job.
For a single one-off background task, just fork() directly (see
Sock.sphttp_fork / sphttp_exit in lib/tep/sphttp.c).
Result-collecting fan-out:
results = Tep::Parallel.new(MyWorker.new).map_processes(items)
# => [String, String, ...] one entry per input, in input orderFire-and-forget (no result capture):
Tep::Parallel.new(MyWorker.new).each_process(items)
# => 0 once every child has exitedBoth forks one child per item, waits for every child to exit before returning. Fixed pools aren't supported in v1 — fine up to a few dozen items; for larger fan-outs the caller chunks beforehand.
Subclass Tep::ParallelWorker and override process(item). The
return type must be String (anything you want to surface to the
parent goes through the file-based IPC channel as bytes; structured
data is JSON in / JSON out).
class Doubler < Tep::ParallelWorker
def process(item)
n = item.to_i
(n * 2).to_s
end
end
class HttpProbe < Tep::ParallelWorker
def process(url)
res = Tep::Http.get(url)
res.status.to_s
end
endThe method is named process rather than the more natural run
because Tep::Server#run(port, workers, quiet) already exists in
the same binary and spinel unifies same-named imeths across
unrelated classes for type-inference purposes.
The expected shape is:
Tep::Parallel.map_processes(items, MyWorker.new) # not this
Tep::Parallel.new(MyWorker.new).map_processes(items) # thisClass-method (cmeth) parameters in spinel unify across all call
sites; multiple distinct Worker subclasses pushed through one
cmeth arg widen it to poly, which then disables virtual dispatch
inside the cmeth body. Storing the worker in an instance field of
Tep::Parallel (typed-slot imeth dispatch) keeps the dispatch
concrete — the same trick Tep::App#set_before(Filter) uses.
-
Strings only across fork. Children write
Stringreturns to per-index files under/tmp/tep_par_<pid>_<ts>/<idx>; the parent reads them back. Non-string returns would need aMarshal-ish shim spinel doesn't have. - One child per item. A 100-item input forks 100 children. Chunk in the caller if the per-item work is too small to justify the fork.
-
No thread mode. Spinel can't lower MRI's
Threadreliably, so the gem'sin_threads:option is absent. -
Per-call scratch dir. The dir is keyed on pid + monotonic
timestamp so concurrent
map_processescalls in different workers don't trample each other. Cleaned up at the end of every call.
class Probe < Tep::ParallelWorker
def process(url)
res = Tep::Http.get(url)
url + " -> " + res.status.to_s
end
end
services = [
"http://svc-a.local/healthz",
"http://svc-b.local/healthz",
"http://svc-c.local/healthz",
]
out = Tep::Parallel.new(Probe.new).map_processes(services)
out.join("\n")class Sha256 < Tep::ParallelWorker
def process(path)
bytes = Tep::Shell.read(path)
path + " " + Sock.sphttp_hmac_sha256_hex("", bytes)
end
end
paths = ["/etc/hosts", "/etc/passwd", "/etc/resolv.conf"]
puts Tep::Parallel.new(Sha256.new).map_processes(paths).join("\n")class Notify < Tep::ParallelWorker
def process(item)
Tep::Http.post("http://hook.local/event", item)
"" # return value is discarded
end
end
Tep::Parallel.new(Notify.new).each_process(events)-
map_processesblocks until every child exits. If any child hangs (e.g. an unbounded HTTP request), the parent hangs too. Use the system-level constraint (timeouts insideprocess, request caps in your code) to bound the work. -
Worker subclasses must be defined before
Parallel.new. The worker's class is set at construction time and spinel pins the slot type then. -
Each child re-enters the program from scratch up to
process. Don't rely on the parent's in-memory state insideprocess; pass every input throughitem.