Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MKP-329 Sync changes back to upstream #7

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ There are benefits of using Que to route jobs:
## Install

```bash
go get github.com/tnclong/go-que
go get github.com/theplant/go-que
```

## Doc

https://godoc.org/github.com/tnclong/go-que
https://godoc.org/github.com/theplant/go-que

## Quickstart

Expand Down
6 changes: 3 additions & 3 deletions bm/go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
module github.com/tnclong/go-que/bm
module github.com/theplant/go-que/bm

go 1.13

replace github.com/tnclong/go-que => ../
replace github.com/theplant/go-que => ../

require (
github.com/lib/pq v1.3.0
github.com/tnclong/go-que v0.0.0-00010101000000-000000000000
github.com/theplant/go-que v0.0.0-00010101000000-000000000000
gopkg.in/yaml.v2 v2.3.0
)
4 changes: 2 additions & 2 deletions bm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"time"

_ "github.com/lib/pq"
"github.com/tnclong/go-que"
"github.com/tnclong/go-que/pg"
"github.com/theplant/go-que"
"github.com/theplant/go-que/pg"
"gopkg.in/yaml.v2"
)

Expand Down
2 changes: 1 addition & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"time"

"github.com/tnclong/go-que"
"github.com/theplant/go-que"
)

func ExampleWorker() {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/tnclong/go-que
module github.com/theplant/go-que

go 1.13

Expand Down
4 changes: 2 additions & 2 deletions mock/queue.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 12 additions & 5 deletions pg/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"database/sql"
"time"

"github.com/tnclong/go-que"
"github.com/theplant/go-que"
)

type job struct {
Expand Down Expand Up @@ -51,13 +51,15 @@ func (j *job) In(tx *sql.Tx) {
}

const doneJob = `UPDATE goque_jobs
SET done_at = now()
WHERE id = $1::bigint`
SET done_at = now(),
result = $1::jsonb
WHERE id = $2::bigint`

const doneUniqueIDJob = `UPDATE goque_jobs
SET done_at = now(),
result = $1::jsonb,
unique_id = null
WHERE id = $1::bigint`
WHERE id = $2::bigint`

func (j *job) Done(ctx context.Context) error {
var execSQL string
Expand All @@ -66,7 +68,12 @@ func (j *job) Done(ctx context.Context) error {
} else {
execSQL = doneJob
}
_, err := j.exec(j.tx)(ctx, execSQL, j.id)
resultStr := "{}"
if result, ok := ctx.Value("result").(string); ok {
resultStr = result
}

_, err := j.exec(j.tx)(ctx, execSQL, resultStr, j.id)
return err
}

Expand Down
10 changes: 6 additions & 4 deletions pg/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"sync"
"time"

"github.com/tnclong/go-que"
"github.com/theplant/go-que"
)

type mutex struct {
Expand Down Expand Up @@ -214,18 +214,20 @@ func (m *mutex) lockInMux(ctx context.Context, conn *sql.Conn, queue string, cou
if err != nil {
return nil, nil, err
}
var doneAt, exipredAt sql.NullTime
var createdAt, doneAt, exipredAt sql.NullTime
var locked bool
var remaining int
var pushNotification string
var result []byte
defer rows.Close()
for rows.Next() {
var jb job
rp := (*jsonRetryPolicy)(&jb.plan.RetryPolicy)
var uniqueID sql.NullString
err = rows.Scan(
&jb.id, &jb.plan.Queue, &jb.plan.Args, &jb.plan.RunAt, rp, &doneAt, &exipredAt,
&jb.id, &jb.plan.Queue, &jb.plan.Args, &createdAt, &jb.plan.RunAt, rp, &doneAt, &exipredAt, &pushNotification,
&jb.retryCount, &jb.lastErrMsg, &jb.lastErrStack,
&uniqueID, &jb.plan.UniqueLifecycle,
&uniqueID, &jb.plan.UniqueLifecycle, &result,
&locked, &remaining,
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pg/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"fmt"
"strings"

"github.com/tnclong/go-que"
"github.com/theplant/go-que"
)

func New(db *sql.DB) (que.Queue, error) {
Expand Down
7 changes: 6 additions & 1 deletion pg/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ create table if not exists goque_jobs
args jsonb default '[]'::jsonb not null
constraint valid_args
check (jsonb_typeof(args) = 'array'::text),

created_at timestamp with time zone default now() not null,
run_at timestamp with time zone default now() not null,
retry_policy jsonb not null,
done_at timestamp with time zone,
expired_at timestamp with time zone,
push_notification text default '',

retry_count integer default 0 not null,
last_err_msg text,
Expand All @@ -27,6 +28,10 @@ create table if not exists goque_jobs
constraint valid_unique_lifecycle
check(unique_lifecycle>=0 AND unique_lifecycle<=3),

result jsonb default '{}'::jsonb not null
constraint result
check (jsonb_typeof(result) = 'object'::text),

constraint err_length
check ((char_length(last_err_msg) <= 512) AND (char_length(last_err_stack) <= 8192))
);
Expand Down
4 changes: 2 additions & 2 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"time"

_ "github.com/lib/pq"
"github.com/tnclong/go-que"
"github.com/tnclong/go-que/pg"
"github.com/theplant/go-que"
"github.com/theplant/go-que/pg"
)

func TestEnqueueLockUnlock(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion scheduler/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"testing"
"time"

"github.com/tnclong/go-que"
"github.com/theplant/go-que"
)

var wantSchedule = Schedule{
Expand Down
2 changes: 1 addition & 1 deletion scheduler/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"errors"

"github.com/robfig/cron/v3"
"github.com/tnclong/go-que"
"github.com/theplant/go-que"
)

// Schedule is a set of named items.
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"log"
"time"

"github.com/tnclong/go-que"
"github.com/theplant/go-que"
)

// Scheduler enqueue jobs according to provided schedule.
Expand Down
4 changes: 2 additions & 2 deletions scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (

"github.com/golang/mock/gomock"
_ "github.com/lib/pq"
"github.com/tnclong/go-que"
"github.com/tnclong/go-que/mock"
"github.com/theplant/go-que"
"github.com/theplant/go-que/mock"
)

func TestDecodeArgs(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions stack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func TestStack(t *testing.T) {
t.Log(stack)

stacks := []string{
"github.com/tnclong/go-que/stack_test.go:36",
"github.com/tnclong/go-que/stack_test.go:17",
"github.com/tnclong/go-que/stack_test.go:31",
"github.com/theplant/go-que/stack_test.go:36",
"github.com/theplant/go-que/stack_test.go:17",
"github.com/theplant/go-que/stack_test.go:31",
"tnclong/go-que/stack_test.go:25",
"tnclong/go-que/stack_test.go:14",
}
Expand Down