-
Notifications
You must be signed in to change notification settings - Fork 12
/
postgres.go
105 lines (95 loc) · 2.95 KB
/
postgres.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package postgres
import (
"github.com/pkg/errors"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"gorm.io/gorm/logger"
"github.com/machinefi/sprout/types"
)
type projectProcessedTask struct {
gorm.Model
TaskID uint64 `gorm:"not null"`
ProjectID uint64 `gorm:"uniqueIndex:project_id,not null"`
}
type taskStateLog struct {
gorm.Model
TaskID uint64 `gorm:"index:state_fetch,not null"`
ProjectID uint64 `gorm:"index:state_fetch,not null"`
ProjectVersion string `gorm:"index:state_fetch,not null"`
State types.TaskState `gorm:"not null"`
Comment string
Result []byte
}
type Postgres struct {
db *gorm.DB
}
func (p *Postgres) ProcessedTaskID(projectID uint64) (uint64, error) {
t := projectProcessedTask{}
if err := p.db.Where("project_id = ?", projectID).First(&t).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return 0, nil
}
return 0, errors.Wrapf(err, "failed to query project processed task_id, project_id %v", projectID)
}
return t.TaskID, nil
}
func (p *Postgres) UpsertProcessedTask(projectID, taskID uint64) error {
t := projectProcessedTask{
ProjectID: projectID,
TaskID: taskID,
}
if err := p.db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "project_id"}},
DoUpdates: clause.AssignmentColumns([]string{"task_id"}),
}).Create(&t).Error; err != nil {
return errors.Wrapf(err, "failed to upsert project processed task, project_id %v, task_id %v", projectID, taskID)
}
return nil
}
func (p *Postgres) Create(tl *types.TaskStateLog, t *types.Task) error {
l := &taskStateLog{
TaskID: tl.TaskID,
ProjectID: t.ProjectID,
ProjectVersion: t.ProjectVersion,
State: tl.State,
Comment: tl.Comment,
Result: tl.Result,
Model: gorm.Model{
CreatedAt: tl.CreatedAt,
},
}
if err := p.db.Create(l).Error; err != nil {
return errors.Wrap(err, "failed to create task state log")
}
return nil
}
func (p *Postgres) Fetch(taskID, projectID uint64) ([]*types.TaskStateLog, error) {
ls := []*taskStateLog{}
if err := p.db.Order("created_at").Where("task_id = ? AND project_id = ?", taskID, projectID).Find(&ls).Error; err != nil {
return nil, errors.Wrapf(err, "failed to query task state log, task_id %v, project_id %v", taskID, projectID)
}
tls := []*types.TaskStateLog{}
for _, l := range ls {
tls = append(tls, &types.TaskStateLog{
TaskID: taskID,
State: l.State,
Comment: l.Comment,
Result: l.Result,
CreatedAt: l.CreatedAt,
})
}
return tls, nil
}
func New(pgEndpoint string) (*Postgres, error) {
db, err := gorm.Open(postgres.Open(pgEndpoint), &gorm.Config{
Logger: logger.Default.LogMode(logger.Silent),
})
if err != nil {
return nil, errors.Wrap(err, "failed to connect postgres")
}
if err := db.AutoMigrate(&taskStateLog{}, &projectProcessedTask{}); err != nil {
return nil, errors.Wrap(err, "failed to migrate model")
}
return &Postgres{db}, nil
}