-
Notifications
You must be signed in to change notification settings - Fork 0
/
invest_operation_types.go
70 lines (56 loc) · 1.67 KB
/
invest_operation_types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package loaders
import (
"github.com/jfk9w/hoarder/internal/database"
"github.com/jfk9w/hoarder/internal/jobs"
. "github.com/jfk9w/hoarder/internal/jobs/tinkoff/internal/entities"
)
type InvestOperationTypes struct {
BatchSize int
}
func (l InvestOperationTypes) TableName() string {
return new(InvestOperationType).TableName()
}
func (l InvestOperationTypes) Load(ctx jobs.Context, client Client, db database.DB) (ls []Interface, errs error) {
out, err := client.InvestOperationTypes(ctx)
if ctx.Error(&errs, err, "failed to get data from api") {
return
}
if len(out.OperationsTypes) == 0 {
return
}
var (
uniqueTypes = make(map[string]bool)
entities []InvestOperationType
types []string
)
for _, out := range out.OperationsTypes {
ctx := ctx.With("operation_type", out.OperationType)
if uniqueTypes[out.OperationType] {
ctx.Debug("skipping duplicate")
continue
}
entity, err := database.ToViaJSON[InvestOperationType](out)
if ctx.Error(&errs, err, "entity conversion failed") {
return
}
uniqueTypes[out.OperationType] = true
entities = append(entities, entity)
types = append(types, out.OperationType)
}
if errs = db.WithContext(ctx).Transaction(func(tx database.DB) (errs error) {
if err := tx.UpsertInBatches(entities, l.BatchSize).Error; ctx.Error(&errs, err, "failed to update entities in db") {
return
}
if err := tx.Model(new(InvestOperationType)).
Where("operation_type not in ?", types).
Update("deleted", true).
Error; ctx.Error(&errs, err, "failed to mark deleted entities in db") {
return
}
return
}); errs != nil {
return
}
ctx.Info("updated entities in db", "count", len(entities))
return
}