/
job.go
107 lines (92 loc) · 2.33 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package repository
import (
"context"
"github.com/resource-aware-jds/resource-aware-jds/models"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"time"
)
const (
JobCollectionName = "job"
)
type job struct {
database *mongo.Database
collection *mongo.Collection
}
type IJob interface {
Insert(ctx context.Context, job models.Job) (insertedJobID *primitive.ObjectID, err error)
FindAll(ctx context.Context) ([]models.Job, error)
FindOneByDocumentID(ctx context.Context, id primitive.ObjectID) (*models.Job, error)
FindJobToDistribute(ctx context.Context) ([]models.Job, error)
UpdateJobStatusByID(ctx context.Context, job models.Job) error
}
func ProvideJob(database *mongo.Database) IJob {
return &job{
database: database,
collection: database.Collection(JobCollectionName),
}
}
func (j *job) Insert(ctx context.Context, job models.Job) (insertedJobID *primitive.ObjectID, err error) {
result, err := j.collection.InsertOne(ctx, job)
if err != nil {
return nil, err
}
objID := result.InsertedID.(primitive.ObjectID)
return &objID, nil
}
func (j *job) FindAll(ctx context.Context) ([]models.Job, error) {
var result []models.Job
data, err := j.collection.Find(ctx, bson.M{})
if err != nil {
return nil, err
}
err = data.All(ctx, &result)
if err != nil {
return nil, err
}
return result, nil
}
func (j *job) FindOneByDocumentID(ctx context.Context, id primitive.ObjectID) (*models.Job, error) {
result := j.collection.FindOne(ctx, bson.M{
"_id": id,
})
if result.Err() != nil {
return nil, result.Err()
}
var jobResult models.Job
err := result.Decode(&jobResult)
return &jobResult, err
}
func (j *job) FindJobToDistribute(ctx context.Context) ([]models.Job, error) {
result, err := j.collection.Find(ctx, bson.M{
"status": bson.M{
"$in": []models.JobStatus{
models.DistributingJobStatus,
models.ExperimentingJobStatus,
},
},
})
if err != nil {
return nil, err
}
var res []models.Job
err = result.All(ctx, &res)
return res, err
}
func (j *job) UpdateJobStatusByID(ctx context.Context, job models.Job) error {
_, err := j.collection.UpdateOne(
ctx,
bson.M{
"_id": job.ID,
},
bson.M{
"$set": bson.M{
"status": job.Status,
"logs": job.Logs,
"updated_at": time.Now(),
},
},
)
return err
}