Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
steps:
- uses: actions/setup-go@v2
with:
go-version: "1.17"
go-version: "1.18"
- uses: actions/checkout@v2
with:
fetch-depth: 0
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
fetch-depth: 0
- uses: actions/setup-go@v2
with:
go-version: "1.17"
go-version: "1.18"
- name: Login to DockerHub
uses: docker/login-action@v1
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: "1.17"
go-version: "1.18"
- name: run tests
run: make test
4 changes: 4 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ output:
linters:
enable-all: true
disable:
- exhaustruct
- cyclop
- exhaustive
- exhaustivestruct
Expand All @@ -28,6 +29,9 @@ linters:
- scopelint
- tagliatelle
- testpackage
- paralleltest
- tparallel
- containedctx
- varnamelen
- wrapcheck
- wsl
Expand Down
47 changes: 29 additions & 18 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,43 @@ COVERAGE_DIR=coverage
BUILD_DIR=dist
EXE=entropy

.PHONY: all build clean
.PHONY: all build clean tidy format test test-coverage

all: clean test build format lint

build:
mkdir -p ${BUILD_DIR}
CGO_ENABLED=0 go build -ldflags '-X "${NAME}/pkg/version.Version=${VERSION}" -X "${NAME}/pkg/version.Commit=${COMMIT}" -X "${NAME}/pkg/version.BuildTime=${BUILD_TIME}"' -o ${BUILD_DIR}/${EXE}
tidy:
@echo "Tidy up go.mod..."
@go mod tidy -v

clean:
rm -rf ${COVERAGE_DIR} ${BUILD_DIR}
format:
@echo "Running gofumpt..."
@gofumpt -l -w .

download:
go mod download
lint:
@echo "Running lint checks using golangci-lint..."
@golangci-lint run

clean: tidy
@echo "Cleaning up build directories..."
@rm -rf ${COVERAGE_DIR} ${BUILD_DIR}
@echo "Running go-generate..."
@go generate ./...

test:
mkdir -p ${COVERAGE_DIR}
go test ./... -coverprofile=${COVERAGE_DIR}/coverage.out
test: tidy
@mkdir -p ${COVERAGE_DIR}
@echo "Running unit tests..."
@go test ./... -coverprofile=${COVERAGE_DIR}/coverage.out

test-coverage: test
go tool cover -html=${COVERAGE_DIR}/coverage.out
@echo "Generating coverage report..."
@go tool cover -html=${COVERAGE_DIR}/coverage.out

generate:
go generate ./...
build: clean
@mkdir -p ${BUILD_DIR}
@echo "Running build for '${VERSION}' in '${BUILD_DIR}/'..."
@CGO_ENABLED=0 go build -ldflags '-X "${NAME}/pkg/version.Version=${VERSION}" -X "${NAME}/pkg/version.Commit=${COMMIT}" -X "${NAME}/pkg/version.BuildTime=${BUILD_TIME}"' -o ${BUILD_DIR}/${EXE}

download:
go mod download

format:
gofumpt -l -w .

lint:
golangci-lint run
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ module github.com/odpf/entropy
go 1.18

require (
github.com/davecgh/go-spew v1.1.1
github.com/Masterminds/squirrel v1.5.2
github.com/google/go-cmp v0.5.8
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/lib/pq v1.10.4
github.com/mcuadros/go-defaults v1.2.0
github.com/newrelic/go-agent/v3 v3.15.2
github.com/newrelic/go-agent/v3/integrations/nrgrpc v1.3.1
Expand Down Expand Up @@ -33,7 +34,6 @@ require (
github.com/Masterminds/goutils v1.1.1 // indirect
github.com/Masterminds/semver/v3 v3.1.1 // indirect
github.com/Masterminds/sprig/v3 v3.2.2 // indirect
github.com/Masterminds/squirrel v1.5.2 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/alecthomas/chroma v0.8.2 // indirect
Expand All @@ -47,6 +47,7 @@ require (
github.com/containerd/containerd v1.6.3 // indirect
github.com/cyphar/filepath-securejoin v0.2.3 // indirect
github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dlclark/regexp2 v1.2.0 // indirect
github.com/docker/cli v20.10.11+incompatible // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
Expand Down Expand Up @@ -92,7 +93,6 @@ require (
github.com/klauspost/compress v1.13.6 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
github.com/lib/pq v1.10.4 // indirect
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/magiconair/properties v1.8.5 // indirect
Expand Down
159 changes: 159 additions & 0 deletions pkg/worker/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package worker

//go:generate mockery --name=JobQueue -r --case underscore --with-expecter --structname JobQueue --filename=job_queue.go --output=./mocks

import (
"context"
"errors"
"fmt"
"strings"
"time"
)

const minRetryBackoff = 5 * time.Second

const (
// StatusDone indicates the Job is successfully finished.
StatusDone = "DONE"

// StatusPanic indicates there was a panic during job-execution.
// This is a terminal status and will NOT be retried.
StatusPanic = "PANIC"

// StatusFailed indicates job failed to succeed even after retries.
// This is a terminal status and will NOT be retried.
StatusFailed = "FAILED"

// StatusPending indicates at-least 1 attempt is still pending.
StatusPending = "PENDING"
)

var (
ErrInvalidJob = errors.New("job is not valid")
ErrKindExists = errors.New("handler for given kind exists")
ErrUnknownKind = errors.New("job kind is invalid")
)

// Job represents the specification for async processing and also maintains
// the progress so far.
type Job struct {
// Specification of the job.
ID string `json:"id"`
Kind string `json:"kind"`
RunAt time.Time `json:"run_at"`
Payload []byte `json:"payload"`

// Internal metadata.
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`

// Execution information.
Result []byte `json:"result,omitempty"`
AttemptsDone int64 `json:"attempts_done"`
LastAttemptAt time.Time `json:"last_attempt_at,omitempty"`
LastError string `json:"last_error,omitempty"`
}

// JobQueue represents a special queue that holds jobs and releases them via
// Dequeue() only after their RunAt time.
type JobQueue interface {
// Enqueue all jobs. Enqueue must ensure all-or-nothing behaviour.
// Jobs with zero-value or historical value for ReadyAt must be
// executed immediately.
Enqueue(ctx context.Context, jobs ...Job) error

// Dequeue one job having one of the given kinds and invoke `fn`.
// The job should be 'locked' until `fn` returns. Refer DequeueFn.
Dequeue(ctx context.Context, kinds []string, fn DequeueFn) error
}

// DequeueFn is invoked by the JobQueue for ready jobs. It is responsible for
// handling a ready job and returning the updated version after the attempt.
type DequeueFn func(ctx context.Context, j Job) (*Job, error)

// RetryableError can be returned by a JobFn to instruct the worker to attempt
// retry after time specified by the RetryAfter field. RetryAfter can have min
// of 5 seconds.
type RetryableError struct {
Cause error
RetryAfter time.Duration
}

func (j *Job) Sanitise() error {
now := time.Now()

j.ID = strings.TrimSpace(j.ID)
j.Kind = strings.TrimSpace(strings.ToLower(j.Kind))

if j.ID == "" {
return fmt.Errorf("%w: job id must be set", ErrInvalidJob)
}

if j.Kind == "" {
return fmt.Errorf("%w: job kind must be set", ErrInvalidJob)
}

j.Status = StatusPending
j.CreatedAt = now
j.UpdatedAt = now

if j.RunAt.IsZero() {
j.RunAt = now
}

j.AttemptsDone = 0
j.LastAttemptAt = time.Time{}
j.LastError = ""
return nil
}

// Attempt attempts to safely invoke `fn` for this job. Handles success, failure
// and panic scenarios and updates the job with result in-place.
func (j *Job) Attempt(ctx context.Context, now time.Time, fn JobFn) {
defer func() {
if v := recover(); v != nil {
j.LastError = fmt.Sprintf("panic: %v", v)
j.Status = StatusPanic
}

j.AttemptsDone++
j.LastAttemptAt = now
j.UpdatedAt = now
}()

select {
case <-ctx.Done():
j.Status = StatusPending
j.RunAt = now.Add(minRetryBackoff)
j.LastError = fmt.Sprintf("cancelled: %v", ctx.Err())

default:
res, err := fn(ctx, *j)
if err != nil {
re := &RetryableError{}
if errors.As(err, &re) {
j.RunAt = now.Add(re.backoff())
j.LastError = re.Error()
j.Status = StatusPending
} else {
j.LastError = err.Error()
j.Status = StatusFailed
}
} else {
j.Result = res
j.Status = StatusDone
}
}
}

func (re *RetryableError) Error() string {
return fmt.Sprintf("retryable-error: %v", re.Cause)
}

func (re RetryableError) backoff() time.Duration {
if re.RetryAfter <= minRetryBackoff {
return minRetryBackoff
}
return re.RetryAfter
}
Loading