diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index e613c653..be32c474 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -7,7 +7,7 @@ jobs: steps: - uses: actions/setup-go@v2 with: - go-version: "1.17" + go-version: "1.18" - uses: actions/checkout@v2 with: fetch-depth: 0 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index e905cb8b..3bd2d80d 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -14,7 +14,7 @@ jobs: fetch-depth: 0 - uses: actions/setup-go@v2 with: - go-version: "1.17" + go-version: "1.18" - name: Login to DockerHub uses: docker/login-action@v1 with: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index aee85572..642fbb6a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,6 +8,6 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-go@v2 with: - go-version: "1.17" + go-version: "1.18" - name: run tests run: make test diff --git a/.golangci.yml b/.golangci.yml index cdd96998..6ea09073 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -6,6 +6,7 @@ output: linters: enable-all: true disable: + - exhaustruct - cyclop - exhaustive - exhaustivestruct @@ -28,6 +29,9 @@ linters: - scopelint - tagliatelle - testpackage + - paralleltest + - tparallel + - containedctx - varnamelen - wrapcheck - wsl diff --git a/Makefile b/Makefile index eb037b47..b53fd077 100644 --- a/Makefile +++ b/Makefile @@ -6,32 +6,43 @@ COVERAGE_DIR=coverage BUILD_DIR=dist EXE=entropy -.PHONY: all build clean +.PHONY: all build clean tidy format test test-coverage all: clean test build format lint -build: - mkdir -p ${BUILD_DIR} - CGO_ENABLED=0 go build -ldflags '-X "${NAME}/pkg/version.Version=${VERSION}" -X "${NAME}/pkg/version.Commit=${COMMIT}" -X "${NAME}/pkg/version.BuildTime=${BUILD_TIME}"' -o ${BUILD_DIR}/${EXE} +tidy: + @echo "Tidy up go.mod..." + @go mod tidy -v -clean: - rm -rf ${COVERAGE_DIR} ${BUILD_DIR} +format: + @echo "Running gofumpt..." + @gofumpt -l -w . -download: - go mod download +lint: + @echo "Running lint checks using golangci-lint..." + @golangci-lint run + +clean: tidy + @echo "Cleaning up build directories..." + @rm -rf ${COVERAGE_DIR} ${BUILD_DIR} + @echo "Running go-generate..." + @go generate ./... -test: - mkdir -p ${COVERAGE_DIR} - go test ./... -coverprofile=${COVERAGE_DIR}/coverage.out +test: tidy + @mkdir -p ${COVERAGE_DIR} + @echo "Running unit tests..." + @go test ./... -coverprofile=${COVERAGE_DIR}/coverage.out test-coverage: test - go tool cover -html=${COVERAGE_DIR}/coverage.out + @echo "Generating coverage report..." + @go tool cover -html=${COVERAGE_DIR}/coverage.out -generate: - go generate ./... +build: clean + @mkdir -p ${BUILD_DIR} + @echo "Running build for '${VERSION}' in '${BUILD_DIR}/'..." + @CGO_ENABLED=0 go build -ldflags '-X "${NAME}/pkg/version.Version=${VERSION}" -X "${NAME}/pkg/version.Commit=${COMMIT}" -X "${NAME}/pkg/version.BuildTime=${BUILD_TIME}"' -o ${BUILD_DIR}/${EXE} + +download: + go mod download -format: - gofumpt -l -w . -lint: - golangci-lint run diff --git a/go.mod b/go.mod index ddd74f69..6b4d11db 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,10 @@ module github.com/odpf/entropy go 1.18 require ( - github.com/davecgh/go-spew v1.1.1 + github.com/Masterminds/squirrel v1.5.2 github.com/google/go-cmp v0.5.8 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 + github.com/lib/pq v1.10.4 github.com/mcuadros/go-defaults v1.2.0 github.com/newrelic/go-agent/v3 v3.15.2 github.com/newrelic/go-agent/v3/integrations/nrgrpc v1.3.1 @@ -33,7 +34,6 @@ require ( github.com/Masterminds/goutils v1.1.1 // indirect github.com/Masterminds/semver/v3 v3.1.1 // indirect github.com/Masterminds/sprig/v3 v3.2.2 // indirect - github.com/Masterminds/squirrel v1.5.2 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/alecthomas/chroma v0.8.2 // indirect @@ -47,6 +47,7 @@ require ( github.com/containerd/containerd v1.6.3 // indirect github.com/cyphar/filepath-securejoin v0.2.3 // indirect github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dlclark/regexp2 v1.2.0 // indirect github.com/docker/cli v20.10.11+incompatible // indirect github.com/docker/distribution v2.8.1+incompatible // indirect @@ -92,7 +93,6 @@ require ( github.com/klauspost/compress v1.13.6 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect - github.com/lib/pq v1.10.4 // indirect github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/magiconair/properties v1.8.5 // indirect diff --git a/pkg/worker/job.go b/pkg/worker/job.go new file mode 100644 index 00000000..4b82d393 --- /dev/null +++ b/pkg/worker/job.go @@ -0,0 +1,159 @@ +package worker + +//go:generate mockery --name=JobQueue -r --case underscore --with-expecter --structname JobQueue --filename=job_queue.go --output=./mocks + +import ( + "context" + "errors" + "fmt" + "strings" + "time" +) + +const minRetryBackoff = 5 * time.Second + +const ( + // StatusDone indicates the Job is successfully finished. + StatusDone = "DONE" + + // StatusPanic indicates there was a panic during job-execution. + // This is a terminal status and will NOT be retried. + StatusPanic = "PANIC" + + // StatusFailed indicates job failed to succeed even after retries. + // This is a terminal status and will NOT be retried. + StatusFailed = "FAILED" + + // StatusPending indicates at-least 1 attempt is still pending. + StatusPending = "PENDING" +) + +var ( + ErrInvalidJob = errors.New("job is not valid") + ErrKindExists = errors.New("handler for given kind exists") + ErrUnknownKind = errors.New("job kind is invalid") +) + +// Job represents the specification for async processing and also maintains +// the progress so far. +type Job struct { + // Specification of the job. + ID string `json:"id"` + Kind string `json:"kind"` + RunAt time.Time `json:"run_at"` + Payload []byte `json:"payload"` + + // Internal metadata. + Status string `json:"status"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + + // Execution information. + Result []byte `json:"result,omitempty"` + AttemptsDone int64 `json:"attempts_done"` + LastAttemptAt time.Time `json:"last_attempt_at,omitempty"` + LastError string `json:"last_error,omitempty"` +} + +// JobQueue represents a special queue that holds jobs and releases them via +// Dequeue() only after their RunAt time. +type JobQueue interface { + // Enqueue all jobs. Enqueue must ensure all-or-nothing behaviour. + // Jobs with zero-value or historical value for ReadyAt must be + // executed immediately. + Enqueue(ctx context.Context, jobs ...Job) error + + // Dequeue one job having one of the given kinds and invoke `fn`. + // The job should be 'locked' until `fn` returns. Refer DequeueFn. + Dequeue(ctx context.Context, kinds []string, fn DequeueFn) error +} + +// DequeueFn is invoked by the JobQueue for ready jobs. It is responsible for +// handling a ready job and returning the updated version after the attempt. +type DequeueFn func(ctx context.Context, j Job) (*Job, error) + +// RetryableError can be returned by a JobFn to instruct the worker to attempt +// retry after time specified by the RetryAfter field. RetryAfter can have min +// of 5 seconds. +type RetryableError struct { + Cause error + RetryAfter time.Duration +} + +func (j *Job) Sanitise() error { + now := time.Now() + + j.ID = strings.TrimSpace(j.ID) + j.Kind = strings.TrimSpace(strings.ToLower(j.Kind)) + + if j.ID == "" { + return fmt.Errorf("%w: job id must be set", ErrInvalidJob) + } + + if j.Kind == "" { + return fmt.Errorf("%w: job kind must be set", ErrInvalidJob) + } + + j.Status = StatusPending + j.CreatedAt = now + j.UpdatedAt = now + + if j.RunAt.IsZero() { + j.RunAt = now + } + + j.AttemptsDone = 0 + j.LastAttemptAt = time.Time{} + j.LastError = "" + return nil +} + +// Attempt attempts to safely invoke `fn` for this job. Handles success, failure +// and panic scenarios and updates the job with result in-place. +func (j *Job) Attempt(ctx context.Context, now time.Time, fn JobFn) { + defer func() { + if v := recover(); v != nil { + j.LastError = fmt.Sprintf("panic: %v", v) + j.Status = StatusPanic + } + + j.AttemptsDone++ + j.LastAttemptAt = now + j.UpdatedAt = now + }() + + select { + case <-ctx.Done(): + j.Status = StatusPending + j.RunAt = now.Add(minRetryBackoff) + j.LastError = fmt.Sprintf("cancelled: %v", ctx.Err()) + + default: + res, err := fn(ctx, *j) + if err != nil { + re := &RetryableError{} + if errors.As(err, &re) { + j.RunAt = now.Add(re.backoff()) + j.LastError = re.Error() + j.Status = StatusPending + } else { + j.LastError = err.Error() + j.Status = StatusFailed + } + } else { + j.Result = res + j.Status = StatusDone + } + } +} + +func (re *RetryableError) Error() string { + return fmt.Sprintf("retryable-error: %v", re.Cause) +} + +func (re RetryableError) backoff() time.Duration { + if re.RetryAfter <= minRetryBackoff { + return minRetryBackoff + } + return re.RetryAfter +} diff --git a/pkg/worker/job_test.go b/pkg/worker/job_test.go new file mode 100644 index 00000000..70ecf616 --- /dev/null +++ b/pkg/worker/job_test.go @@ -0,0 +1,174 @@ +package worker_test + +import ( + "context" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + + "github.com/odpf/entropy/pkg/errors" + "github.com/odpf/entropy/pkg/worker" +) + +func TestJob_Attempt(t *testing.T) { + cancelledCtx, cancel := context.WithDeadline(context.Background(), time.Unix(0, 0)) + defer cancel() + + createdAt := time.Unix(1654081526, 0) + frozenTime := time.Unix(1654082526, 0) + + table := []struct { + title string + ctx context.Context + job worker.Job + fn worker.JobFn + want worker.Job + }{ + { + title: "ContextCancelled", + ctx: cancelledCtx, + job: worker.Job{ + UpdatedAt: createdAt, + AttemptsDone: 0, + LastAttemptAt: frozenTime, + Result: nil, + LastError: "", + }, + fn: func(ctx context.Context, job worker.Job) ([]byte, error) { + return nil, nil + }, + want: worker.Job{ + Status: worker.StatusPending, + RunAt: frozenTime.Add(5 * time.Second), + UpdatedAt: frozenTime, + AttemptsDone: 1, + LastAttemptAt: frozenTime, + Result: nil, + LastError: "cancelled: context deadline exceeded", + }, + }, + { + title: "Panic", + job: worker.Job{ + UpdatedAt: createdAt, + AttemptsDone: 0, + LastAttemptAt: frozenTime, + Result: nil, + LastError: "", + }, + fn: func(ctx context.Context, job worker.Job) ([]byte, error) { + panic("blown up") + }, + want: worker.Job{ + Status: worker.StatusPanic, + UpdatedAt: frozenTime, + AttemptsDone: 1, + LastAttemptAt: frozenTime, + Result: nil, + LastError: "panic: blown up", + }, + }, + { + title: "NonRetryableError", + job: worker.Job{ + UpdatedAt: createdAt, + AttemptsDone: 0, + LastAttemptAt: frozenTime, + Result: nil, + LastError: "", + }, + fn: func(ctx context.Context, job worker.Job) ([]byte, error) { + return nil, errors.New("a non-retryable error occurred") + }, + want: worker.Job{ + Status: worker.StatusFailed, + UpdatedAt: frozenTime, + AttemptsDone: 1, + LastAttemptAt: frozenTime, + Result: nil, + LastError: "a non-retryable error occurred", + }, + }, + { + title: "RetryableError", + job: worker.Job{ + UpdatedAt: createdAt, + AttemptsDone: 0, + LastAttemptAt: frozenTime, + Result: nil, + LastError: "", + }, + fn: func(ctx context.Context, job worker.Job) ([]byte, error) { + return nil, &worker.RetryableError{ + Cause: errors.New("some retryable error occurred"), + RetryAfter: 10 * time.Second, + } + }, + want: worker.Job{ + Status: worker.StatusPending, + RunAt: frozenTime.Add(10 * time.Second), + UpdatedAt: frozenTime, + AttemptsDone: 1, + LastAttemptAt: frozenTime, + Result: nil, + LastError: "retryable-error: some retryable error occurred", + }, + }, + { + title: "Successful_FirstAttempt", + job: worker.Job{ + UpdatedAt: createdAt, + AttemptsDone: 0, + LastAttemptAt: frozenTime, + Result: nil, + LastError: "", + }, + fn: func(ctx context.Context, job worker.Job) ([]byte, error) { + return []byte("The answer to life is 42"), nil + }, + want: worker.Job{ + Status: worker.StatusDone, + UpdatedAt: frozenTime, + AttemptsDone: 1, + LastAttemptAt: frozenTime, + Result: []byte("The answer to life is 42"), + LastError: "", + }, + }, + { + title: "Successful_SecondAttempt", + job: worker.Job{ + UpdatedAt: createdAt, + AttemptsDone: 1, + LastAttemptAt: frozenTime, + Result: nil, + LastError: "attempt 1 failed with some retryable error", + }, + fn: func(ctx context.Context, job worker.Job) ([]byte, error) { + return []byte("The answer to life is 42"), nil + }, + want: worker.Job{ + Status: worker.StatusDone, + UpdatedAt: frozenTime, + AttemptsDone: 2, + LastAttemptAt: frozenTime, + Result: []byte("The answer to life is 42"), + LastError: "attempt 1 failed with some retryable error", + }, + }, + } + + for _, tt := range table { + t.Run(tt.title, func(t *testing.T) { + if tt.ctx == nil { + tt.ctx = context.Background() + } + + tt.job.Attempt(tt.ctx, frozenTime, tt.fn) + + assert.Truef(t, cmp.Equal(tt.want, tt.job), cmp.Diff(tt.want, tt.job)) + }) + } +} diff --git a/pkg/worker/mocks/job_queue.go b/pkg/worker/mocks/job_queue.go new file mode 100644 index 00000000..19e4bb79 --- /dev/null +++ b/pkg/worker/mocks/job_queue.go @@ -0,0 +1,114 @@ +// Code generated by mockery v2.10.4. DO NOT EDIT. + +package mocks + +import ( + context "context" + + worker "github.com/odpf/entropy/pkg/worker" + mock "github.com/stretchr/testify/mock" +) + +// JobQueue is an autogenerated mock type for the JobQueue type +type JobQueue struct { + mock.Mock +} + +type JobQueue_Expecter struct { + mock *mock.Mock +} + +func (_m *JobQueue) EXPECT() *JobQueue_Expecter { + return &JobQueue_Expecter{mock: &_m.Mock} +} + +// Dequeue provides a mock function with given fields: ctx, kinds, fn +func (_m *JobQueue) Dequeue(ctx context.Context, kinds []string, fn worker.DequeueFn) error { + ret := _m.Called(ctx, kinds, fn) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []string, worker.DequeueFn) error); ok { + r0 = rf(ctx, kinds, fn) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// JobQueue_Dequeue_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Dequeue' +type JobQueue_Dequeue_Call struct { + *mock.Call +} + +// Dequeue is a helper method to define mock.On call +// - ctx context.Context +// - kinds []string +// - fn worker.DequeueFn +func (_e *JobQueue_Expecter) Dequeue(ctx interface{}, kinds interface{}, fn interface{}) *JobQueue_Dequeue_Call { + return &JobQueue_Dequeue_Call{Call: _e.mock.On("Dequeue", ctx, kinds, fn)} +} + +func (_c *JobQueue_Dequeue_Call) Run(run func(ctx context.Context, kinds []string, fn worker.DequeueFn)) *JobQueue_Dequeue_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]string), args[2].(worker.DequeueFn)) + }) + return _c +} + +func (_c *JobQueue_Dequeue_Call) Return(_a0 error) *JobQueue_Dequeue_Call { + _c.Call.Return(_a0) + return _c +} + +// Enqueue provides a mock function with given fields: ctx, jobs +func (_m *JobQueue) Enqueue(ctx context.Context, jobs ...worker.Job) error { + _va := make([]interface{}, len(jobs)) + for _i := range jobs { + _va[_i] = jobs[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, ...worker.Job) error); ok { + r0 = rf(ctx, jobs...) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// JobQueue_Enqueue_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Enqueue' +type JobQueue_Enqueue_Call struct { + *mock.Call +} + +// Enqueue is a helper method to define mock.On call +// - ctx context.Context +// - jobs ...worker.Job +func (_e *JobQueue_Expecter) Enqueue(ctx interface{}, jobs ...interface{}) *JobQueue_Enqueue_Call { + return &JobQueue_Enqueue_Call{Call: _e.mock.On("Enqueue", + append([]interface{}{ctx}, jobs...)...)} +} + +func (_c *JobQueue_Enqueue_Call) Run(run func(ctx context.Context, jobs ...worker.Job)) *JobQueue_Enqueue_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]worker.Job, len(args)-1) + for i, a := range args[1:] { + if a != nil { + variadicArgs[i] = a.(worker.Job) + } + } + run(args[0].(context.Context), variadicArgs...) + }) + return _c +} + +func (_c *JobQueue_Enqueue_Call) Return(_a0 error) *JobQueue_Enqueue_Call { + _c.Call.Return(_a0) + return _c +} diff --git a/pkg/worker/pgq/pgq.go b/pkg/worker/pgq/pgq.go new file mode 100644 index 00000000..f6e9ddaa --- /dev/null +++ b/pkg/worker/pgq/pgq.go @@ -0,0 +1,132 @@ +package pgq + +import ( + "context" + "database/sql" + _ "embed" + "fmt" + "regexp" + "strings" + "time" + + sq "github.com/Masterminds/squirrel" + _ "github.com/lib/pq" // postgres driver. + + "github.com/odpf/entropy/pkg/errors" + "github.com/odpf/entropy/pkg/worker" +) + +const ( + pgDriverName = "postgres" + tableNamePlaceholder = "__queueTable__" + + extendInterval = 30 * time.Second + refreshInterval = 20 * time.Second +) + +var ( + //go:embed schema.sql + sqlSchemaTemplate string + + queueNamePattern = regexp.MustCompile(`^[A-Za-z][A-Za-z0-9_]*$`) + errInvalidQueueName = fmt.Errorf("queue name must match pattern '%s'", queueNamePattern) +) + +// Queue implements a JobQueue backed by PostgreSQL. Refer Open() for initialising. +type Queue struct { + db *sql.DB + queueName string + tableName string + extendInterval time.Duration + refreshInterval time.Duration +} + +// Open returns a JobQueue implementation backed by the PostgreSQL instance +// discovered by the conString. The table used for the queue will be based +// on the queueName. Necessary migrations will be done automatically. +func Open(conString, queueName string) (*Queue, error) { + if !queueNamePattern.MatchString(queueName) { + return nil, errInvalidQueueName + } + tableName := fmt.Sprintf("pgq_%s", queueName) + + db, err := sql.Open(pgDriverName, conString) + if err != nil { + return nil, err + } + + q := &Queue{ + db: db, + queueName: queueName, + tableName: tableName, + extendInterval: extendInterval, + refreshInterval: refreshInterval, + } + + if err := q.prepareDB(); err != nil { + _ = q.Close() + return nil, err + } + + return q, nil +} + +func (q *Queue) Enqueue(ctx context.Context, jobs ...worker.Job) error { + insertQuery := sq.Insert(q.tableName).Columns( + "id", "kind", "status", "run_at", + "payload", "created_at", "updated_at", + ) + + for _, job := range jobs { + insertQuery = insertQuery.Values( + job.ID, job.Kind, job.Status, job.RunAt, + job.Payload, job.CreatedAt, job.UpdatedAt, + ) + } + + _, err := insertQuery.RunWith(q.db).PlaceholderFormat(sq.Dollar).ExecContext(ctx) + if err != nil { + return err + } + return nil +} + +func (q *Queue) Dequeue(ctx context.Context, kinds []string, fn worker.DequeueFn) error { + job, err := q.pickupJob(ctx, kinds) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil + } + return err + } + + resultJob, err := q.handleDequeued(ctx, *job, fn) + if err != nil { + return err + } + + return q.saveJobResult(ctx, *resultJob) +} + +func (q *Queue) handleDequeued(baseCtx context.Context, job worker.Job, fn worker.DequeueFn) (*worker.Job, error) { + jobCtx, cancel := context.WithCancel(baseCtx) + defer cancel() + + go func() { + q.runHeartbeat(jobCtx, job.ID) + + // Heartbeat process stopped for some reason. job should be + // released as soon as possible. so cancel context. + cancel() + }() + + return fn(jobCtx, job) +} + +func (q *Queue) Close() error { return q.db.Close() } + +func (q *Queue) prepareDB() error { + sqlSchema := strings.ReplaceAll(sqlSchemaTemplate, tableNamePlaceholder, q.tableName) + _, execErr := q.db.Exec(sqlSchema) + return execErr +} diff --git a/pkg/worker/pgq/pgq_test.go b/pkg/worker/pgq/pgq_test.go new file mode 100644 index 00000000..4dc41e34 --- /dev/null +++ b/pkg/worker/pgq/pgq_test.go @@ -0,0 +1 @@ +package pgq diff --git a/pkg/worker/pgq/pgq_utils.go b/pkg/worker/pgq/pgq_utils.go new file mode 100644 index 00000000..66445df1 --- /dev/null +++ b/pkg/worker/pgq/pgq_utils.go @@ -0,0 +1,131 @@ +package pgq + +import ( + "context" + "database/sql" + "time" + + sq "github.com/Masterminds/squirrel" + + "github.com/odpf/entropy/pkg/worker" +) + +type txnFn func(ctx context.Context, tx *sql.Tx) error + +func (q *Queue) withTx(ctx context.Context, readOnly bool, fn txnFn) error { + opts := &sql.TxOptions{ReadOnly: readOnly} + + tx, err := q.db.BeginTx(ctx, opts) + if err != nil { + return err + } + + if fnErr := fn(ctx, tx); fnErr != nil { + _ = tx.Rollback() + return fnErr + } + + return tx.Commit() +} + +func (q *Queue) runHeartbeat(ctx context.Context, id string) { + tick := time.NewTicker(q.refreshInterval) + defer tick.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-tick.C: + if err := q.extendWaitTime(ctx, q.db, id); err != nil { + return + } + } + } +} + +func (q *Queue) pickupJob(ctx context.Context, kinds []string) (*worker.Job, error) { + var job worker.Job + + txErr := q.withTx(ctx, false, func(ctx context.Context, tx *sql.Tx) error { + j, err := fetchReadyJob(ctx, tx, q.tableName, kinds) + if err != nil { + return err + } else if extendErr := q.extendWaitTime(ctx, tx, j.ID); extendErr != nil { + return extendErr + } + job = *j + + return nil + }) + + return &job, txErr +} + +func (q *Queue) saveJobResult(ctx context.Context, job worker.Job) error { + updateQuery := sq.Update(q.tableName). + Where(sq.Eq{ + "id": job.ID, + "status": worker.StatusPending, + }). + Set("updated_at", job.UpdatedAt). + Set("status", job.Status). + Set("result", job.Result). + Set("attempts_done", sq.Expr("attempts_done + 1")). + Set("last_error", job.LastError). + Set("last_attempt_at", job.LastAttemptAt) + + _, err := updateQuery.PlaceholderFormat(sq.Dollar).RunWith(q.db).ExecContext(ctx) + return err +} + +func fetchReadyJob(ctx context.Context, r sq.BaseRunner, tableName string, kinds []string) (*worker.Job, error) { + selectQuery := sq.Select().From(tableName). + Columns("id", "kind", "status", "run_at", + "payload", "created_at", "updated_at", + "result", "attempts_done", "last_attempt_at", "last_error"). + Where(sq.Eq{ + "kind": kinds, + "status": worker.StatusPending, + }). + Where(sq.Expr("run_at < current_timestamp")). + Limit(1). + Suffix("FOR UPDATE SKIP LOCKED") + + row := selectQuery.PlaceholderFormat(sq.Dollar).RunWith(r).QueryRowContext(ctx) + return rowIntoJob(row) +} + +func (q *Queue) extendWaitTime(ctx context.Context, r sq.BaseRunner, id string) error { + extendTo := sq.Expr("current_timestamp + (? ||' seconds')::interval ", q.extendInterval.Seconds()) + extendQuery := sq.Update(q.tableName). + Set("run_at", extendTo). + Where(sq.Eq{"id": id}) + + _, err := extendQuery.PlaceholderFormat(sq.Dollar).RunWith(r).ExecContext(ctx) + return err +} + +func rowIntoJob(row sq.RowScanner) (*worker.Job, error) { + var job worker.Job + var lastErr sql.NullString + var lastAttemptAt sql.NullTime + + fieldPtrs := []interface{}{ + &job.ID, &job.Kind, &job.Status, &job.RunAt, + &job.Payload, &job.CreatedAt, &job.UpdatedAt, + + // execution results. + &job.Result, &job.AttemptsDone, &lastAttemptAt, &lastErr, + } + + if err := row.Scan(fieldPtrs...); err != nil { + return nil, err + } + + job.LastAttemptAt = lastAttemptAt.Time + job.LastError = lastErr.String + + return &job, nil +} diff --git a/pkg/worker/pgq/pgq_utils_test.go b/pkg/worker/pgq/pgq_utils_test.go new file mode 100644 index 00000000..4dc41e34 --- /dev/null +++ b/pkg/worker/pgq/pgq_utils_test.go @@ -0,0 +1 @@ +package pgq diff --git a/pkg/worker/pgq/schema.sql b/pkg/worker/pgq/schema.sql new file mode 100644 index 00000000..cbc87740 --- /dev/null +++ b/pkg/worker/pgq/schema.sql @@ -0,0 +1,21 @@ +CREATE TABLE IF NOT EXISTS __queueTable__ +( + -- Job specification. + id TEXT NOT NULL PRIMARY KEY, + kind TEXT NOT NULL, + status TEXT NOT NULL, + run_at TIMESTAMP NOT NULL, + payload bytea NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT current_timestamp, + updated_at TIMESTAMP NOT NULL DEFAULT current_timestamp, + + -- Result generated by execution. + result bytea, + attempts_done INTEGER NOT NULL DEFAULT 0, + last_attempt_at TIMESTAMP, + last_error TEXT +); + +CREATE INDEX IF NOT EXISTS idx___queueTable___kind ON __queueTable__ (kind); +CREATE INDEX IF NOT EXISTS idx___queueTable___status ON __queueTable__ (status); +CREATE INDEX IF NOT EXISTS idx___queueTable___run_at ON __queueTable__ (run_at); \ No newline at end of file diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go new file mode 100644 index 00000000..cb53dfad --- /dev/null +++ b/pkg/worker/worker.go @@ -0,0 +1,130 @@ +package worker + +import ( + "context" + "fmt" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/odpf/entropy/pkg/errors" +) + +// Worker provides asynchronous job processing using a job-queue. +type Worker struct { + workers int + pollInt time.Duration + + queue JobQueue + logger *zap.Logger + handlers map[string]JobFn +} + +// JobFn is invoked by the Worker for ready jobs. If it returns no error, +// job will be marked with StatusDone. If it returns RetryableError, the +// job will remain in StatusPending and will be enqueued for retry. If +// it returns any other error, job will be marked as StatusFailed. In case +// if a panic occurs, job will be marked as StatusPanic. +type JobFn func(ctx context.Context, job Job) ([]byte, error) + +type Option func(w *Worker) error + +func New(queue JobQueue, opts ...Option) (*Worker, error) { + w := &Worker{queue: queue} + for _, opt := range withDefaults(opts) { + if err := opt(w); err != nil { + return nil, err + } + } + + if len(w.handlers) == 0 { + return nil, errors.New("at-least one job handler must be registered") + } + return w, nil +} + +// Enqueue enqueues all jobs for processing. +func (w *Worker) Enqueue(ctx context.Context, jobs ...Job) error { + for i, job := range jobs { + if err := job.Sanitise(); err != nil { + return err + } else if _, knownKind := w.handlers[job.Kind]; !knownKind { + return fmt.Errorf("%w: kind '%s'", ErrUnknownKind, job.Kind) + } + jobs[i] = job + } + + return w.queue.Enqueue(ctx, jobs...) +} + +// Run starts the worker threads that dequeue and process ready jobs. Run blocks +// until all workers exit or context is cancelled. Context cancellation will do +// graceful shutdown of the worker threads. +func (w *Worker) Run(baseCtx context.Context) error { + ctx, cancel := context.WithCancel(baseCtx) + defer cancel() + + wg := &sync.WaitGroup{} + for i := 0; i < w.workers; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + w.runWorker(ctx) + w.logger.Info("worker exited", zap.Int("worker_id", id)) + }(i) + } + wg.Wait() + + w.logger.Info("all workers-threads exited") + return cleanupCtxErr(ctx.Err()) +} + +func (w *Worker) runWorker(ctx context.Context) { + timer := time.NewTimer(w.pollInt) + defer timer.Stop() + + var kinds []string + for kind := range w.handlers { + kinds = append(kinds, kind) + } + + for { + select { + case <-ctx.Done(): + return + + case <-timer.C: + timer.Reset(w.pollInt) + + w.logger.Info("looking for a job") + if err := w.queue.Dequeue(ctx, kinds, w.handleJob); err != nil { + w.logger.Error("dequeue failed", zap.Error(err)) + } + } + } +} + +func (w *Worker) handleJob(ctx context.Context, job Job) (*Job, error) { + const invalidKindBackoff = 5 * time.Minute + + fn, exists := w.handlers[job.Kind] + if !exists { + // Note: This should never happen since Dequeue() has `kinds` filter. + // It is only kept as a safety net to prevent nil-dereferences. + return nil, &RetryableError{ + Cause: errors.New("job kind is invalid"), + RetryAfter: invalidKindBackoff, + } + } + + job.Attempt(ctx, time.Now(), fn) + return &job, nil +} + +func cleanupCtxErr(err error) error { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return nil + } + return err +} diff --git a/pkg/worker/worker_option.go b/pkg/worker/worker_option.go new file mode 100644 index 00000000..98e9288f --- /dev/null +++ b/pkg/worker/worker_option.go @@ -0,0 +1,57 @@ +package worker + +import ( + "fmt" + "time" + + "go.uber.org/zap" +) + +func WithJobKind(kind string, fn JobFn) Option { + return func(w *Worker) error { + if w.handlers == nil { + w.handlers = map[string]JobFn{} + } + + if _, exists := w.handlers[kind]; exists { + return fmt.Errorf("%w: kind '%s'", ErrKindExists, kind) + } + + w.handlers[kind] = fn + return nil + } +} + +func WithLogger(l *zap.Logger) Option { + return func(w *Worker) error { + if l == nil { + l = zap.NewNop() + } + w.logger = l + return nil + } +} + +func WithRunConfig(workers int, pollInterval time.Duration) Option { + return func(w *Worker) error { + if workers == 0 { + workers = 1 + } + + const minPollInterval = 100 * time.Millisecond + if pollInterval < minPollInterval { + pollInterval = minPollInterval + } + + w.pollInt = pollInterval + w.workers = workers + return nil + } +} + +func withDefaults(opts []Option) []Option { + return append([]Option{ + WithLogger(nil), + WithRunConfig(1, 1*time.Second), + }, opts...) +} diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go new file mode 100644 index 00000000..b17dc77d --- /dev/null +++ b/pkg/worker/worker_test.go @@ -0,0 +1,240 @@ +package worker_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/odpf/entropy/pkg/errors" + "github.com/odpf/entropy/pkg/worker" + "github.com/odpf/entropy/pkg/worker/mocks" +) + +func Test_New(t *testing.T) { + t.Parallel() + + q := &mocks.JobQueue{} + + t.Run("NoJobKind", func(t *testing.T) { + w, err := worker.New(q) + assert.Error(t, err) + assert.EqualError(t, err, "at-least one job handler must be registered") + assert.Nil(t, w) + }) + + t.Run("DuplicateKind", func(t *testing.T) { + w, err := worker.New(q, + worker.WithJobKind("test", nil), + worker.WithJobKind("test", nil), + ) + assert.Error(t, err) + assert.EqualError(t, err, "handler for given kind exists: kind 'test'") + assert.Nil(t, w) + }) + + t.Run("Success", func(t *testing.T) { + w, err := worker.New(q, + worker.WithJobKind("test", nil), + worker.WithRunConfig(0, 0), + ) + assert.NoError(t, err) + assert.NotNil(t, w) + }) +} + +func TestWorker_Enqueue(t *testing.T) { + t.Parallel() + + table := []struct { + title string + queue func(t *testing.T) worker.JobQueue + opts []worker.Option + jobs []worker.Job + wantErr error + }{ + { + title: "InvalidJobID", + queue: func(t *testing.T) worker.JobQueue { + t.Helper() + return &mocks.JobQueue{} + }, + opts: []worker.Option{ + worker.WithJobKind("test", func(ctx context.Context, job worker.Job) ([]byte, error) { + return nil, nil + }), + }, + jobs: []worker.Job{ + {ID: "", Kind: "test"}, + }, + wantErr: worker.ErrInvalidJob, + }, + { + title: "UnknownJobKind", + queue: func(t *testing.T) worker.JobQueue { + t.Helper() + return &mocks.JobQueue{} + }, + opts: []worker.Option{ + worker.WithJobKind("test", func(ctx context.Context, job worker.Job) ([]byte, error) { + return nil, nil + }), + }, + jobs: []worker.Job{ + {ID: "foo1", Kind: "test"}, + {ID: "foo2", Kind: "unknown_kind"}, + }, + wantErr: worker.ErrUnknownKind, + }, + { + title: "Success", + queue: func(t *testing.T) worker.JobQueue { + t.Helper() + + q := &mocks.JobQueue{} + q.EXPECT(). + Enqueue(mock.Anything, mock.Anything, mock.Anything). + Run(func(ctx context.Context, jobs ...worker.Job) { + require.Len(t, jobs, 2) + assert.Equal(t, "foo1", jobs[0].ID) + assert.Equal(t, "foo2", jobs[1].ID) + }). + Return(nil). + Once() + return q + }, + opts: []worker.Option{ + worker.WithJobKind("test", func(ctx context.Context, job worker.Job) ([]byte, error) { + return nil, nil + }), + }, + jobs: []worker.Job{ + {ID: "foo1", Kind: "test"}, + {ID: "foo2", Kind: "test"}, + }, + wantErr: nil, + }, + } + + for _, tt := range table { + t.Run(tt.title, func(t *testing.T) { + w, err := worker.New(tt.queue(t), tt.opts...) + require.NoError(t, err) + require.NotNil(t, w) + + got := w.Enqueue(context.Background(), tt.jobs...) + if tt.wantErr != nil { + assert.Error(t, got) + assert.True(t, errors.Is(got, tt.wantErr)) + } else { + assert.NoError(t, got) + } + }) + } +} + +func TestWorker_Run(t *testing.T) { + t.Parallel() + + opts := []worker.Option{ + worker.WithJobKind("test", func(ctx context.Context, job worker.Job) ([]byte, error) { + return []byte("test_result"), nil + }), + worker.WithRunConfig(1, 10*time.Millisecond), + } + + t.Run("ContextCancelled", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() // immediately cancel the context. + + q := &mocks.JobQueue{} + + w, err := worker.New(q, opts...) + require.NoError(t, err) + require.NotNil(t, w) + + got := w.Run(ctx) + assert.NoError(t, got) + }) + + t.Run("ContextDeadline", func(t *testing.T) { + ctx, cancel := context.WithDeadline(context.Background(), time.Unix(0, 0)) + defer cancel() + + q := &mocks.JobQueue{} + + w, err := worker.New(q, opts...) + require.NoError(t, err) + require.NotNil(t, w) + + got := w.Run(ctx) + assert.NoError(t, got) + }) + + t.Run("Dequeue_ReturnsUnknownKind", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dequeued := 0 + sampleJob := worker.Job{ + ID: "test1", + Kind: "unknown_kind", + } + + q := &mocks.JobQueue{} + q.EXPECT(). + Dequeue(mock.Anything, []string{"test"}, mock.Anything). + Run(func(ctx context.Context, kinds []string, fn worker.DequeueFn) { + _, err := fn(ctx, sampleJob) + assert.Error(t, err) + assert.EqualError(t, err, "retryable-error: job kind is invalid") + + dequeued++ + cancel() // cancel context to stop the worker. + }). + Return(nil) + + w, err := worker.New(q, opts...) + require.NoError(t, err) + require.NotNil(t, w) + + got := w.Run(ctx) + assert.NoError(t, got) + assert.Equal(t, 1, dequeued) + }) + + t.Run("Success", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dequeued := 0 + sampleJob := worker.Job{ + ID: "test1", + Kind: "test", + } + + q := &mocks.JobQueue{} + q.EXPECT(). + Dequeue(mock.Anything, []string{"test"}, mock.Anything). + Run(func(ctx context.Context, kinds []string, fn worker.DequeueFn) { + res, err := fn(ctx, sampleJob) + assert.NoError(t, err) + assert.Equal(t, []byte("test_result"), res.Result) + + dequeued++ + cancel() // cancel context to stop the worker. + }). + Return(nil) + + w, err := worker.New(q, opts...) + require.NoError(t, err) + require.NotNil(t, w) + + got := w.Run(ctx) + assert.NoError(t, got) + assert.Equal(t, 1, dequeued) + }) +}