-
Notifications
You must be signed in to change notification settings - Fork 84
/
data_product.go
137 lines (114 loc) · 5.58 KB
/
data_product.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package processor
import (
"context"
"time"
"github.com/kyma-incubator/compass/components/director/pkg/log"
"github.com/kyma-incubator/compass/components/director/internal/model"
"github.com/kyma-incubator/compass/components/director/pkg/persistence"
"github.com/kyma-incubator/compass/components/director/pkg/resource"
"github.com/kyma-incubator/compass/components/director/pkg/str"
"github.com/pkg/errors"
)
// DataProductService is responsible for the service-layer Data Product operations.
//
//go:generate mockery --name=DataProductService --output=automock --outpkg=automock --case=underscore --disable-version-string
type DataProductService interface {
ListByApplicationID(ctx context.Context, appID string) ([]*model.DataProduct, error)
ListByApplicationTemplateVersionID(ctx context.Context, appTemplateVersionID string) ([]*model.DataProduct, error)
Create(ctx context.Context, resourceType resource.Type, resourceID string, packageID *string, in model.DataProductInput, dataProductHash uint64) (string, error)
Update(ctx context.Context, resourceType resource.Type, resourceID string, id string, packageID *string, in model.DataProductInput, dataProductHash uint64) error
Delete(ctx context.Context, resourceType resource.Type, id string) error
}
// DataProductProcessor defines Data Product processor
type DataProductProcessor struct {
transact persistence.Transactioner
dataProductSvc DataProductService
}
// NewDataProductProcessor creates new instance of DataProductProcessor
func NewDataProductProcessor(transact persistence.Transactioner, dataProductSvc DataProductService) *DataProductProcessor {
return &DataProductProcessor{
transact: transact,
dataProductSvc: dataProductSvc,
}
}
// Process re-syncs the data products passed as an argument.
func (id *DataProductProcessor) Process(ctx context.Context, resourceType resource.Type, resourceID string, packagesFromDB []*model.Package, dataProducts []*model.DataProductInput, resourceHashes map[string]uint64) ([]*model.DataProduct, error) {
dataProductsFromDB, err := id.listDataProductsInTx(ctx, resourceType, resourceID)
if err != nil {
return nil, err
}
for _, dataProduct := range dataProducts {
dataProductHash := resourceHashes[str.PtrStrToStr(dataProduct.OrdID)]
if err := id.resyncDataProductInTx(ctx, resourceType, resourceID, dataProductsFromDB, packagesFromDB, dataProduct, dataProductHash); err != nil {
return nil, err
}
}
dataProductsFromDB, err = id.listDataProductsInTx(ctx, resourceType, resourceID)
if err != nil {
return nil, err
}
return dataProductsFromDB, nil
}
func (id *DataProductProcessor) listDataProductsInTx(ctx context.Context, resourceType resource.Type, resourceID string) ([]*model.DataProduct, error) {
tx, err := id.transact.Begin()
if err != nil {
return nil, err
}
defer id.transact.RollbackUnlessCommitted(ctx, tx)
ctx = persistence.SaveToContext(ctx, tx)
var dataProductsFromDB []*model.DataProduct
switch resourceType {
case resource.Application:
dataProductsFromDB, err = id.dataProductSvc.ListByApplicationID(ctx, resourceID)
case resource.ApplicationTemplateVersion:
dataProductsFromDB, err = id.dataProductSvc.ListByApplicationTemplateVersionID(ctx, resourceID)
}
if err != nil {
return nil, errors.Wrapf(err, "error while listing data products for %s with id %q", resourceType, resourceID)
}
return dataProductsFromDB, tx.Commit()
}
func (id *DataProductProcessor) resyncDataProductInTx(ctx context.Context, resourceType resource.Type, resourceID string, dataProductsFromDB []*model.DataProduct, packagesFromDB []*model.Package, dataProduct *model.DataProductInput, dataProductHash uint64) error {
tx, err := id.transact.Begin()
if err != nil {
return err
}
defer id.transact.RollbackUnlessCommitted(ctx, tx)
ctx = persistence.SaveToContext(ctx, tx)
if err := id.resyncDataProduct(ctx, resourceType, resourceID, dataProductsFromDB, packagesFromDB, *dataProduct, dataProductHash); err != nil {
return errors.Wrapf(err, "error while resyncing data product for resource with ORD ID %q", *dataProduct.OrdID)
}
return tx.Commit()
}
func (id *DataProductProcessor) resyncDataProduct(ctx context.Context, resourceType resource.Type, resourceID string, dataProductsFromDB []*model.DataProduct, packagesFromDB []*model.Package, dataProduct model.DataProductInput, dataProductHash uint64) error {
ctx = addFieldToLogger(ctx, "data_product_ord_id", *dataProduct.OrdID)
i, isDataProductFound := searchInSlice(len(dataProductsFromDB), func(i int) bool {
return equalStrings(dataProductsFromDB[i].OrdID, dataProduct.OrdID)
})
var packageID *string
if i, found := searchInSlice(len(packagesFromDB), func(i int) bool {
return equalStrings(&packagesFromDB[i].OrdID, dataProduct.OrdPackageID)
}); found {
packageID = &packagesFromDB[i].ID
}
if !isDataProductFound {
currentTime := time.Now().Format(time.RFC3339)
dataProduct.LastUpdate = ¤tTime
_, err := id.dataProductSvc.Create(ctx, resourceType, resourceID, packageID, dataProduct, dataProductHash)
if err != nil {
return err
}
return nil
}
log.C(ctx).Infof("Calculate the newest lastUpdate time for Data Product")
newestLastUpdateTime, err := NewestLastUpdateTimestamp(dataProduct.LastUpdate, dataProductsFromDB[i].LastUpdate, dataProductsFromDB[i].ResourceHash, dataProductHash)
if err != nil {
return errors.Wrap(err, "error while calculating the newest lastUpdate time for Data Product")
}
dataProduct.LastUpdate = newestLastUpdateTime
err = id.dataProductSvc.Update(ctx, resourceType, resourceID, dataProductsFromDB[i].ID, packageID, dataProduct, dataProductHash)
if err != nil {
return err
}
return nil
}