-
Notifications
You must be signed in to change notification settings - Fork 1
/
statusChanges.go
114 lines (94 loc) · 3.86 KB
/
statusChanges.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package job
import (
"context"
"github.com/pixlise/core/v4/api/dbCollections"
"github.com/pixlise/core/v4/core/logger"
"github.com/pixlise/core/v4/core/timestamper"
protos "github.com/pixlise/core/v4/generated-protos"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// Expected to be called from the thing running the job. This updates the DB status, which hopefully the go thread started by
// AddJob will notice and fire off an update
func UpdateJob(jobId string, status protos.JobStatus_Status, message string, logId string, db *mongo.Database, ts timestamper.ITimeStamper, logger logger.ILogger) error {
ctx := context.TODO()
coll := db.Collection(dbCollections.JobStatusName)
filter := bson.D{{Key: "_id", Value: jobId}}
opt := options.Replace()
jobStatus := &protos.JobStatus{
JobId: jobId,
Status: status,
Message: message,
LogId: logId,
LastUpdateUnixTimeSec: uint32(ts.GetTimeNowSec()),
}
existingStatus, err := readJobStatus(jobId, coll)
if err != nil {
logger.Errorf("Failed to read existing job status when writing UpdateJob %v: %v", jobId, err)
} else {
jobStatus.StartUnixTimeSec = existingStatus.StartUnixTimeSec
}
replaceResult, err := coll.ReplaceOne(ctx, filter, jobStatus, opt)
if err != nil {
logger.Errorf("UpdateJob %v: %v", jobId, err)
return err
}
if replaceResult.MatchedCount != 1 && replaceResult.UpsertedCount != 1 {
logger.Errorf("UpdateJob result had unexpected counts %+v id: %v", replaceResult, jobId)
} else {
logger.Infof("UpdateJob: %v with status %v, message: %v", jobId, protos.JobStatus_Status_name[int32(status.Number())], message)
}
return nil
}
// Expected to be called from the thing running the job. This allows setting some output fields
func CompleteJob(jobId string, success bool, message string, outputFilePath string, otherLogFiles []string, db *mongo.Database, ts timestamper.ITimeStamper, logger logger.ILogger) error {
status := protos.JobStatus_COMPLETE
if !success {
status = protos.JobStatus_ERROR
}
logger.Infof("Job: %v completed with status: %v, message: %v", jobId, status.String(), message)
now := uint32(ts.GetTimeNowSec())
ctx := context.TODO()
coll := db.Collection(dbCollections.JobStatusName)
filter := bson.D{{Key: "_id", Value: jobId}}
opt := options.Replace()
jobStatus := &protos.JobStatus{
JobId: jobId,
Status: status,
Message: message,
LogId: "",
StartUnixTimeSec: 0,
LastUpdateUnixTimeSec: now,
EndUnixTimeSec: now,
OutputFilePath: outputFilePath,
OtherLogFiles: otherLogFiles,
}
existingStatus, err := readJobStatus(jobId, coll)
if err != nil {
logger.Errorf("Failed to read existing job status when writing CompleteJob %v: %v", jobId, err)
} else {
jobStatus.LogId = existingStatus.LogId
jobStatus.StartUnixTimeSec = existingStatus.StartUnixTimeSec
}
replaceResult, err := coll.ReplaceOne(ctx, filter, jobStatus, opt)
if err != nil {
logger.Errorf("CompleteJob %v: %v", jobId, err)
return err
}
if replaceResult.MatchedCount != 1 && replaceResult.UpsertedCount != 1 {
logger.Errorf("CompleteJob result had unexpected counts %+v id: %v", replaceResult, jobId)
} else {
logger.Infof("CompleteJob: %v with status %v, message: %v", jobId, protos.JobStatus_Status_name[int32(status.Number())], message)
}
defer activeJobLock.Unlock()
activeJobLock.Lock()
// Only update the job status if we have an entry for this job
// HINT: If we don't this code may be running in say a Lambda function and
// not a part of the API instance, so nothing in our memory space cares
// about the state of this job, we're just notifying out via the DB above!
if _, ok := activeJobs[jobId]; ok {
activeJobs[jobId] = false
}
return nil
}