From 26b96278677620450f21ba82af5660b23949d692 Mon Sep 17 00:00:00 2001 From: Vadim Inshakov Date: Wed, 1 Feb 2023 19:50:03 +0500 Subject: [PATCH] add fetcher and prioritySampler interfaces, change fetchJob --- worker.go | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 66 insertions(+), 6 deletions(-) diff --git a/worker.go b/worker.go index d66a0ba4..ac865a6c 100644 --- a/worker.go +++ b/worker.go @@ -11,6 +11,16 @@ import ( const fetchKeysPerJobType = 6 +type fetcher interface { + Do(c redis.Conn, keysAndArgs ...interface{}) (interface{}, error) +} + +type prioritySampler interface { + Add(priority uint, redisJobs string, redisJobsInProg string, redisJobsPaused string, redisJobsLock string, redisJobsLockInfo string, redisJobsMaxConcurrency string) + Sample() []sampleItem + GetSamples() []sampleItem +} + type worker struct { workerID string poolID string @@ -20,7 +30,7 @@ type worker struct { middleware []*middlewareHandler contextType reflect.Type - redisFetchScript *redis.Script + redisFetchScript fetcher sampler prioritySampler *observer @@ -64,9 +74,9 @@ func newWorker(namespace string, poolID string, pool Pool, contextType reflect.T // note: can't be called while the thing is started func (w *worker) updateMiddlewareAndJobTypes(middleware []*middlewareHandler, jobTypes map[string]*jobType) { w.middleware = middleware - sampler := prioritySampler{} + sampler := &prioritySamplerInPlaceImpl{} for _, jt := range jobTypes { - sampler.add(jt.Priority, + sampler.Add(jt.Priority, redisKeyJobs(w.namespace, jt.Name), redisKeyJobsInProgress(w.namespace, w.poolID, jt.Name), redisKeyJobsPaused(w.namespace, jt.Name), @@ -140,14 +150,64 @@ func (w *worker) loop() { } } +func (w *worker) fetchJobOld() (*Job, error) { + // resort queues + // NOTE: we could optimize this to only resort every second, or something. + w.sampler.Sample() + samples := w.sampler.GetSamples() + numKeys := len(samples) * fetchKeysPerJobType + var scriptArgs = make([]interface{}, 0, numKeys+1) + + for _, s := range samples { + scriptArgs = append(scriptArgs, s.redisJobs, s.redisJobsInProg, s.redisJobsPaused, s.redisJobsLock, s.redisJobsLockInfo, s.redisJobsMaxConcurrency) // KEYS[1-6 * N] + } + scriptArgs = append(scriptArgs, w.poolID) // ARGV[1] + conn := w.pool.Get() + defer conn.Close() + + values, err := redis.Values(w.redisFetchScript.Do(conn, scriptArgs...)) + if err == redis.ErrNil { + return nil, nil + } else if err != nil { + return nil, err + } + + if len(values) != 3 { + return nil, fmt.Errorf("need 3 elements back") + } + + rawJSON, ok := values[0].([]byte) + if !ok { + return nil, fmt.Errorf("response msg not bytes") + } + + dequeuedFrom, ok := values[1].([]byte) + if !ok { + return nil, fmt.Errorf("response queue not bytes") + } + + inProgQueue, ok := values[2].([]byte) + if !ok { + return nil, fmt.Errorf("response in prog not bytes") + } + + job, err := newJobOld(rawJSON, dequeuedFrom, inProgQueue) + if err != nil { + return nil, err + } + + return job, nil +} + func (w *worker) fetchJob() (*Job, error) { // resort queues // NOTE: we could optimize this to only resort every second, or something. - w.sampler.sample() - numKeys := len(w.sampler.samples) * fetchKeysPerJobType + w.sampler.Sample() + samples := w.sampler.GetSamples() + numKeys := len(samples) * fetchKeysPerJobType var scriptArgs = make([]interface{}, 0, numKeys+1) - for _, s := range w.sampler.samples { + for _, s := range samples { scriptArgs = append(scriptArgs, s.redisJobs, s.redisJobsInProg, s.redisJobsPaused, s.redisJobsLock, s.redisJobsLockInfo, s.redisJobsMaxConcurrency) // KEYS[1-6 * N] } scriptArgs = append(scriptArgs, w.poolID) // ARGV[1]