Skip to content

Commit

Permalink
generic worker
Browse files Browse the repository at this point in the history
  • Loading branch information
henrod committed Aug 8, 2019
1 parent a2fab1a commit db3649f
Showing 1 changed file with 31 additions and 0 deletions.
31 changes: 31 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,24 @@ func (w *Worker) EnqueueRPCWithOptions(
}, w.enqueueOptions(opts))
}

// EnqueueJob enqueues generic job to worker
func (w *Worker) EnqueueJob(
queue string,
payload interface{},
) (jid string, err error) {
opts := w.enqueueOptions(w.opts)
return workers.EnqueueWithOptions(queue, class, payload, opts)
}

// EnqueueJobWithOptions enqueues generic job to worker
func (w *Worker) EnqueueJobWithOptions(
queue string,
payload interface{},
opts *EnqueueOpts,
) (jid string, err error) {
return workers.EnqueueWithOptions(queue, class, payload, w.enqueueOptions(opts))
}

// RegisterRPCJob registers a RPC job
func (w *Worker) RegisterRPCJob(rpcJob RPCJob) error {
if w.registered {
Expand All @@ -122,8 +140,21 @@ func (w *Worker) RegisterRPCJob(rpcJob RPCJob) error {
return nil
}

// RegisterJob registers a generic job
func (w *Worker) RegisterJob(
queue string,
callback func(interface{}),
concurrency int,
) {
workers.Process(queue, func(payload *workers.Msg) {
callback(payload.Interface())
}, concurrency)
}

func (w *Worker) parsedRPCJob(rpcJob RPCJob) func(*workers.Msg) {
return func(jobArg *workers.Msg) {
jobArg.Interface()

logger.Log.Debug("executing rpc job")
bts, rpcRoute, err := w.unmarshalRouteMetadata(jobArg)
if err != nil {
Expand Down

0 comments on commit db3649f

Please sign in to comment.