/
ack.go
109 lines (99 loc) · 2.73 KB
/
ack.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package handler
import (
"context"
"encoding/json"
"net/http"
apiv1 "github.com/jeffrom/job-manager/mjob/api/v1"
"github.com/jeffrom/job-manager/mjob/resource"
jobv1 "github.com/jeffrom/job-manager/mjob/resource/job/v1"
"github.com/jeffrom/job-manager/mjob/schema"
"github.com/jeffrom/job-manager/pkg/backend"
)
func Ack(w http.ResponseWriter, r *http.Request) error {
ctx := r.Context()
be := backend.FromMiddleware(ctx)
var params apiv1.AckJobsRequest
if err := UnmarshalBody(r, ¶ms, true); err != nil {
return err
}
resources := &resource.Acks{Acks: make([]*resource.Ack, len(params.Acks))}
results := &jobv1.Acks{Acks: make([]*jobv1.Ack, len(params.Acks))}
for i, ackParam := range params.Acks {
id := ackParam.Id
jobData, err := be.GetJobByID(ctx, id, nil)
if err != nil {
return err
}
queue, err := be.GetQueue(ctx, jobData.Name, nil)
if err != nil {
return err
}
scm, err := schema.Parse(queue.SchemaRaw)
if err != nil {
return err
}
if err := scm.ValidateResult(ctx, ackParam.Data); err != nil {
return err
}
finalStatus := getFinalStatus(queue, jobData, jobv1.JobStatusFromProto(ackParam.Status))
ack := &jobv1.Ack{
Id: id,
Status: jobv1.JobStatusToProto(finalStatus),
Data: ackParam.Data,
Error: ackParam.Error,
}
results.Acks[i] = ack
ackRes := jobv1.AckFromProto(ack)
resources.Acks[i] = ackRes
}
if err := be.AckJobs(ctx, resources); err != nil {
return handleBackendErrors(err, "ack", "")
}
if err := deleteArgUniqueness(ctx, be, resources.Acks); err != nil {
return err
}
return nil
}
func getFinalStatus(queue *resource.Queue, jb *resource.Job, status *resource.Status) *resource.Status {
if status != nil && *status == resource.StatusFailed &&
queue.Retries > 0 &&
jb.Attempt > queue.Retries {
return resource.NewStatus(resource.StatusDead)
}
return status
}
func deleteArgUniqueness(ctx context.Context, be backend.Interface, acks []*resource.Ack) error {
// log := logger.FromContext(ctx)
var keys []string
for _, ack := range acks {
// fmt.Printf("ack request: %+v\n", ack)
if !resource.StatusIsDone(ack.Status) {
continue
}
jobData, err := be.GetJobByID(ctx, ack.JobID, nil)
if err != nil {
return err
}
argsRaw, err := canonicalizeArgBytes(jobData.ArgsRaw)
if err != nil {
return err
}
ukey, err := uniquenessKeyFromArgs(argsRaw)
if err != nil {
return err
}
// log.Debug().
// Str("job_id", ack.JobID).
// Str("key", ukey).
// Msg("deleting job uniqueness")
keys = append(keys, ukey)
}
return be.DeleteJobUniqueArgs(ctx, nil, keys)
}
func canonicalizeArgBytes(b []byte) ([]byte, error) {
var v interface{}
if err := json.Unmarshal(b, &v); err != nil {
return nil, err
}
return json.Marshal(v)
}