-
Notifications
You must be signed in to change notification settings - Fork 141
/
runner.go
138 lines (118 loc) · 3.66 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright 2023 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package platformprovidermigration
import (
"context"
"fmt"
"time"
"go.uber.org/zap"
"github.com/pipe-cd/pipecd/pkg/datastore"
"github.com/pipe-cd/pipecd/pkg/model"
)
const (
migrationRunInterval = 30 * time.Minute
)
type applicationStore interface {
List(ctx context.Context, opts datastore.ListOptions) ([]*model.Application, string, error)
UpdatePlatformProvider(ctx context.Context, id string, provider string) error
}
type Runner struct {
applicationStore applicationStore
logger *zap.Logger
}
func NewRunner(ds datastore.DataStore, logger *zap.Logger) *Runner {
w := datastore.OpsCommander
return &Runner{
applicationStore: datastore.NewApplicationStore(ds, w),
logger: logger.Named("platform-provider-migrate-runner"),
}
}
func (r *Runner) Migrate(ctx context.Context) error {
r.logger.Info("start running application migration task")
// Run migration task once on start this ops migration.
cursor, err := r.migrate(ctx, "")
if err == nil {
r.logger.Info("application migration task finished successfully")
return nil
}
r.logger.Error("unable to finish application platform provider migration task in first run", zap.Error(err))
taskRunTicker := time.NewTicker(migrationRunInterval)
defer taskRunTicker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-taskRunTicker.C:
cursor, err = r.migrate(ctx, cursor)
if err == nil {
r.logger.Info("application migration task finished successfully")
return nil
}
}
}
}
// migrate runs the migration task to update all applications in the database.
// In case of error occurred, it returns error and a cursor string which contains
// the information so that next time we can pass that value to keep migrating from
// failed application, not from start.
func (r *Runner) migrate(ctx context.Context, cursor string) (string, error) {
const limit = 100
for {
apps, nextCur, err := r.applicationStore.List(ctx, datastore.ListOptions{
Filters: []datastore.ListFilter{
{
Field: "Deleted",
Operator: datastore.OperatorEqual,
Value: false,
},
},
Orders: []datastore.Order{
{
Field: "CreatedAt",
Direction: datastore.Asc,
},
{
Field: "Id",
Direction: datastore.Asc,
},
},
Limit: limit,
Cursor: cursor,
})
if err != nil {
r.logger.Error("failed to fetch applications to run migrate task", zap.Error(err))
return cursor, err
}
if len(apps) == 0 {
return "", nil
}
r.logger.Info(fmt.Sprintf("migrate platform provider value for %d application(s)", len(apps)))
for _, app := range apps {
if app.PlatformProvider != "" {
continue
}
//lint:ignore SA1019 app.CloudProvider is deprecated.
if err = r.applicationStore.UpdatePlatformProvider(ctx, app.Id, app.CloudProvider); err != nil {
r.logger.Error("failed to update application platform provider value",
zap.String("id", app.Id),
//lint:ignore SA1019 app.CloudProvider is deprecated.
zap.String("provider", app.CloudProvider),
zap.Error(err),
)
return cursor, err
}
}
cursor = nextCur
}
}