-
Notifications
You must be signed in to change notification settings - Fork 0
/
job.go
132 lines (112 loc) · 3.68 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package queue
import (
cryptorand "crypto/rand"
"encoding/base64"
"encoding/json"
mathrand "math/rand"
"time"
)
type UniqueUntil string
var (
RetryPolicyDefault = 25
RetryPolicyEmphemeral = 0
RetryPolicyDirectToMorgue = -1
)
const (
UntilSuccess UniqueUntil = "success" // default
UntilStart UniqueUntil = "start"
)
type Failure struct {
RetryCount int `json:"retry_count"`
RetryRemaining int `json:"remaining"`
FailedAt string `json:"failed_at"`
NextAt string `json:"next_at,omitempty"`
ErrorMessage string `json:"message,omitempty"`
ErrorType string `json:"errtype,omitempty"`
Backtrace []string `json:"backtrace,omitempty"`
}
type Job struct {
// required
Jid string `json:"jid"`
Queue string `json:"queue"`
Type string `json:"jobtype"`
Args []interface{} `json:"args"`
// optional
CreatedAt string `json:"created_at,omitempty"`
EnqueuedAt string `json:"enqueued_at,omitempty"`
At string `json:"at,omitempty"`
ReserveFor int `json:"reserve_for,omitempty"`
Retry *int `json:"retry"`
Backtrace int `json:"backtrace,omitempty"`
Failure *Failure `json:"failure,omitempty"`
Custom map[string]interface{} `json:"custom,omitempty"`
}
// Clients should use this constructor to build a Job, not allocate
// a bare struct directly.
func NewJob(jobtype string, args ...interface{}) *Job {
return &Job{
Type: jobtype,
Queue: "default",
Args: args,
Jid: RandomJid(),
CreatedAt: time.Now().UTC().Format(time.RFC3339Nano),
Retry: &RetryPolicyDefault,
}
}
func RandomJid() string {
bytes := make([]byte, 12)
_, err := cryptorand.Read(bytes)
if err != nil {
//nolint:gosec
mathrand.Read(bytes)
}
return base64.RawURLEncoding.EncodeToString(bytes)
}
func (j *Job) GetCustom(name string) (interface{}, bool) {
if j.Custom == nil {
return nil, false
}
val, ok := j.Custom[name]
return val, ok
}
// Set custom metadata for this job. Faktory reserves all
// element names starting with "_" for internal use, e.g.
// SetCustom("_txid", "12345")
func (j *Job) SetCustom(name string, value interface{}) *Job {
if j.Custom == nil {
j.Custom = map[string]interface{}{}
}
j.Custom[name] = value
return j
}
////////////////////////////////////////////
// Faktory Enterprise helpers
//
// These helpers allow you to configure several Faktory Enterprise features.
// They will have no effect unless you are running Faktory Enterprise.
// Configure this job to be unique for +secs+ seconds or until the job
// has been successfully processed.
func (j *Job) SetUniqueFor(secs uint) *Job {
return j.SetCustom("unique_for", secs)
}
// Configure the uniqueness deadline for this job, legal values
// are:
//
// - "success" - the job will be considered unique until it has successfully processed
// or the +unique_for+ TTL has passed, this is the default value.
// - "start" - the job will be considered unique until it starts processing. Retries
// may lead to multiple copies of the job running.
func (j *Job) SetUniqueness(until UniqueUntil) *Job {
return j.SetCustom("unique_until", until)
}
// Configure the TTL for this job. After this point in time, the job will be
// discarded rather than executed.
func (j *Job) SetExpiresAt(expiresAt time.Time) *Job {
return j.SetCustom("expires_at", expiresAt.Format(time.RFC3339Nano))
}
func (j *Job) SetExpiresIn(expiresIn time.Duration) *Job {
return j.SetCustom("expires_at", time.Now().Add(expiresIn).Format(time.RFC3339Nano))
}
func (j *Job) JsonBytes() ([]byte, error) {
return json.Marshal(j)
}