-
Notifications
You must be signed in to change notification settings - Fork 1
/
job.go
244 lines (209 loc) · 5.62 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
package sqsjkr
import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"strconv"
"strings"
"syscall"
"time"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/kayac/sqsjkr/lock"
)
// Duration struct
type Duration struct {
time.Duration
}
// UnmarshalJSON Duration field to decode json
func (d *Duration) UnmarshalJSON(b []byte) (err error) {
if b[0] == '"' {
sd := string(b[1 : len(b)-1])
d.Duration, err = time.ParseDuration(sd)
return
}
var sec int64
sec, err = json.Number(string(b)).Int64()
d.Duration = time.Duration(sec) * time.Second
return
}
// DefaultJob is created by one SQS message's body
type DefaultJob struct {
jobID string // JobID is created by sqs messageID
environment map[string]string
command string
eventID string
lifeTime time.Duration
sentTimestamp time.Time
abortIfLocked bool
lockID string
trigger string
}
func (j *DefaultJob) String() string {
return fmt.Sprintf("%#v", j)
}
// Job is sqsjkr job struct
type Job interface {
Execute(lock.Locker) ([]byte, error)
JobID() string
EventID() string
Command() string
String() string
}
// MessageBody for decoding json
type MessageBody struct {
Command string `json:"command"`
Environments map[string]string `json:"envs"`
EventID string `json:"event_id"`
LifeTime Duration `json:"life_time"`
LockID string `json:"lock_id"`
AbortIfLocked bool `json:"abort_if_locked"`
DisableLifeTimeTrigger bool `json:"disable_life_time_trigger"`
}
func (m MessageBody) String() string {
var b strings.Builder
json.NewEncoder(&b).Encode(m)
return strings.TrimSuffix(b.String(), "\n")
}
// Execute executes command
func (j *DefaultJob) Execute(lkr lock.Locker) ([]byte, error) {
// 1. Checks job's lifetime.
if j.isOverLifeTime() {
if j.trigger == "" {
// trigger is disabled or not defined
return nil, ErrOverLifeTime
}
msg := fmt.Sprintf("job_id:%s, event_id:%s, command:%s, life_time:%s, sent_timestamp:%s",
j.jobID, j.eventID, j.command, j.lifeTime.String(), j.sentTimestamp.String())
out, err := invokeTrigger(j.trigger, msg)
logger.Debugf("trigger output: %s", string(out))
if err != nil {
return nil, err
}
return nil, ErrOverLifeTime
}
// 2. Locks Job (if job's lockID have been locked already, retry to Execute() after 5sec).
if j.lockID != "" && j.eventID != "" && lkr != nil {
err := lkr.Lock(j.lockID, j.eventID)
if err != nil {
logger.Errorf(err.Error())
if j.abortIfLocked {
return nil, err
}
time.Sleep(JobRetryInterval)
return j.Execute(lkr)
}
}
// 3. Validation.
if err := j.validate(); err != nil {
return nil, err
}
// 4. Executes job.
env := os.Environ()
for key, val := range j.environment {
env = append(env, fmt.Sprintf("%s=%s", key, val))
}
cmd := exec.Command("sh", "-c", j.command)
cmd.Env = env
output, err := cmd.CombinedOutput()
// 5. Unlocks job.
if j.lockID != "" && lkr != nil {
if derr := lkr.Unlock(j.lockID); derr != nil {
// TODO: should implement notification
logger.Errorf(derr.Error())
}
}
return output, err
}
// JobID return job's id which is unique (sqs message id).
func (j DefaultJob) JobID() string {
return j.jobID
}
// Command return job's command
func (j DefaultJob) Command() string {
return j.command
}
// EventID return event_id
func (j DefaultJob) EventID() string {
return j.eventID
}
func (j DefaultJob) isOverLifeTime() bool {
diffTime := time.Now().Sub(j.sentTimestamp)
if j.lifeTime == 0 {
return false
}
if diffTime > j.lifeTime {
logger.Warnf("over life time: life_time:%v, diffTime:%v, overTime:%v",
j.lifeTime, diffTime, diffTime-j.lifeTime)
return true
}
return false
}
// Validates job can exec command
func (j *DefaultJob) validate() error {
if j.command == "" {
return fmt.Errorf("Job command undefined.")
}
if j.jobID == "" {
return fmt.Errorf("JobID undefined.")
}
return nil
}
// NewJob create job
func NewJob(msg *sqs.Message, trigger string) (Job, error) {
var body MessageBody
if err := json.Unmarshal([]byte(*msg.Body), &body); err != nil {
logger.Errorf("Cannot parse message body: %s", err.Error())
return nil, err
}
sentTimestamp, err := strconv.ParseInt(*msg.Attributes["SentTimestamp"], 10, 64)
if err != nil {
logger.Errorf("Cannot parse attribute SentTimestamp: %s", err.Error())
return nil, err
}
sentTime := time.Unix(sentTimestamp/1000, sentTimestamp%1000*int64(time.Millisecond))
logger.Infof(
"new job by message id:%s body:%s sentTimestamp:%s",
*msg.MessageId,
body.String(),
sentTime,
)
dj := &DefaultJob{
jobID: *msg.MessageId,
command: body.Command,
environment: body.Environments,
eventID: body.EventID,
lockID: body.LockID,
abortIfLocked: body.AbortIfLocked,
lifeTime: body.LifeTime.Duration,
sentTimestamp: sentTime,
}
if !body.DisableLifeTimeTrigger {
dj.trigger = trigger
}
if dj.eventID == "" {
dj.eventID = *msg.MessageId
}
return dj, nil
}
// invokeTrigger execute trigger command
func invokeTrigger(command, msg string) ([]byte, error) {
cmd := exec.Command("sh", "-c", command)
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, fmt.Errorf("failed: %v %s", cmd, err.Error())
}
input := []byte(msg)
src := bytes.NewReader(input)
_, err = io.Copy(stdin, src)
if e, ok := err.(*os.PathError); ok && e.Err == syscall.EPIPE {
logger.Errorf(err.Error())
} else if err != nil {
logger.Errorf(err.Error())
logger.Errorf(fmt.Errorf("failed to write STDIN").Error())
}
stdin.Close()
return cmd.CombinedOutput()
}