diff --git a/worker/worker.go b/worker/worker.go index 9e499241..c310ef72 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -110,6 +110,24 @@ func (w *Worker) EnqueueRPCWithOptions( }, w.enqueueOptions(opts)) } +// EnqueueJob enqueues generic job to worker +func (w *Worker) EnqueueJob( + queue string, + payload interface{}, +) (jid string, err error) { + opts := w.enqueueOptions(w.opts) + return workers.EnqueueWithOptions(queue, class, payload, opts) +} + +// EnqueueJobWithOptions enqueues generic job to worker +func (w *Worker) EnqueueJobWithOptions( + queue string, + payload interface{}, + opts *EnqueueOpts, +) (jid string, err error) { + return workers.EnqueueWithOptions(queue, class, payload, w.enqueueOptions(opts)) +} + // RegisterRPCJob registers a RPC job func (w *Worker) RegisterRPCJob(rpcJob RPCJob) error { if w.registered { @@ -122,8 +140,21 @@ func (w *Worker) RegisterRPCJob(rpcJob RPCJob) error { return nil } +// RegisterJob registers a generic job +func (w *Worker) RegisterJob( + queue string, + callback func(interface{}), + concurrency int, +) { + workers.Process(queue, func(payload *workers.Msg) { + callback(payload.Interface()) + }, concurrency) +} + func (w *Worker) parsedRPCJob(rpcJob RPCJob) func(*workers.Msg) { return func(jobArg *workers.Msg) { + jobArg.Interface() + logger.Log.Debug("executing rpc job") bts, rpcRoute, err := w.unmarshalRouteMetadata(jobArg) if err != nil {