From db3649f9f50185ccacfde4a204e641db9060c0a1 Mon Sep 17 00:00:00 2001 From: Henrique Rodrigues Date: Thu, 8 Aug 2019 20:24:58 -0300 Subject: [PATCH] generic worker --- worker/worker.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/worker/worker.go b/worker/worker.go index 9e499241..c310ef72 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -110,6 +110,24 @@ func (w *Worker) EnqueueRPCWithOptions( }, w.enqueueOptions(opts)) } +// EnqueueJob enqueues generic job to worker +func (w *Worker) EnqueueJob( + queue string, + payload interface{}, +) (jid string, err error) { + opts := w.enqueueOptions(w.opts) + return workers.EnqueueWithOptions(queue, class, payload, opts) +} + +// EnqueueJobWithOptions enqueues generic job to worker +func (w *Worker) EnqueueJobWithOptions( + queue string, + payload interface{}, + opts *EnqueueOpts, +) (jid string, err error) { + return workers.EnqueueWithOptions(queue, class, payload, w.enqueueOptions(opts)) +} + // RegisterRPCJob registers a RPC job func (w *Worker) RegisterRPCJob(rpcJob RPCJob) error { if w.registered { @@ -122,8 +140,21 @@ func (w *Worker) RegisterRPCJob(rpcJob RPCJob) error { return nil } +// RegisterJob registers a generic job +func (w *Worker) RegisterJob( + queue string, + callback func(interface{}), + concurrency int, +) { + workers.Process(queue, func(payload *workers.Msg) { + callback(payload.Interface()) + }, concurrency) +} + func (w *Worker) parsedRPCJob(rpcJob RPCJob) func(*workers.Msg) { return func(jobArg *workers.Msg) { + jobArg.Interface() + logger.Log.Debug("executing rpc job") bts, rpcRoute, err := w.unmarshalRouteMetadata(jobArg) if err != nil {