-
Notifications
You must be signed in to change notification settings - Fork 88
/
monitor.go
138 lines (121 loc) · 4.26 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package embeddedcluster
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/replicatedhq/embedded-cluster-operator/api/v1beta1"
appstatetypes "github.com/replicatedhq/kots/pkg/appstate/types"
"github.com/replicatedhq/kots/pkg/logger"
"github.com/replicatedhq/kots/pkg/store"
"github.com/replicatedhq/kots/pkg/util"
"k8s.io/client-go/kubernetes"
)
var stateMut = sync.Mutex{}
// MaybeStartClusterUpgrade checks if the embedded cluster is in a state that requires an upgrade. If so,
// it starts the upgrade process. We only start an upgrade if the following conditions are met:
// - The app has an embedded cluster configuration.
// - The app embedded cluster configuration differs from the current embedded cluster config.
// - The current cluster config (as part of the Installation object) already exists in the cluster.
func MaybeStartClusterUpgrade(ctx context.Context, store store.Store, conf *v1beta1.Config, appID string) error {
if conf == nil {
return nil
}
if !util.IsEmbeddedCluster() {
return nil
}
spec := conf.Spec
if upgrade, err := RequiresUpgrade(ctx, spec); err != nil {
// if there is no installation object we can't start an upgrade. this is a valid
// scenario specially during cluster bootstrap. as we do not need to upgrade the
// cluster just after its installation we can return nil here.
// (the cluster in the first kots version will match the cluster installed during bootstrap)
if errors.Is(err, ErrNoInstallations) {
return nil
}
return fmt.Errorf("failed to check if upgrade is required: %w", err)
} else if !upgrade {
return nil
}
// we need to wait for the application to be ready before we can start the upgrade.
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
}
appStatus, err := store.GetAppStatus(appID)
if err != nil {
return fmt.Errorf("failed to get app status: %w", err)
}
if appStatus.State != appstatetypes.StateReady {
logger.Infof("waiting for app to be ready before starting cluster upgrade. current state: %s", appStatus.State)
continue
}
if err := startClusterUpgrade(ctx, spec); err != nil {
return fmt.Errorf("failed to start cluster upgrade: %w", err)
}
logger.Info("started cluster upgrade")
go watchClusterState(ctx, store)
return nil
}
}
// InitClusterState initializes the cluster state in the database. This should be called when the
// server launches.
func InitClusterState(ctx context.Context, client kubernetes.Interface, store store.Store) error {
if util.IsEmbeddedCluster() {
go watchClusterState(ctx, store)
return nil
}
return nil
}
// watchClusterState checks the status of the installation object and updates the cluster state
// after the cluster state has been 'installed' for 30 seconds, it will exit the loop.
// this function is blocking and should be run in a goroutine.
// if it is called multiple times, only one instance will run.
func watchClusterState(ctx context.Context, store store.Store) {
stateMut.Lock()
defer stateMut.Unlock()
numReady := 0
lastState := ""
for numReady < 6 {
select {
case <-ctx.Done():
return
case <-time.After(time.Second * 5):
}
state, err := updateClusterState(ctx, store, lastState)
if err != nil {
logger.Errorf("embeddedcluster monitor: fail updating state: %v", err)
}
if state == v1beta1.InstallationStateInstalled {
numReady++
} else {
numReady = 0
}
lastState = state
}
}
// updateClusterState updates the cluster state in the database. Gets the state from the cluster
// by reading the latest embedded cluster installation CRD.
// If the lastState is the same as the current state, it will not update the database.
func updateClusterState(ctx context.Context, store store.Store, lastState string) (string, error) {
installation, err := GetCurrentInstallation(ctx)
if err != nil {
return "", fmt.Errorf("failed to get current installation: %w", err)
}
state := v1beta1.InstallationStateUnknown
if installation.Status.State != "" {
state = installation.Status.State
}
// only update the state if it has changed
if state != lastState {
if err := store.SetEmbeddedClusterState(state); err != nil {
return "", fmt.Errorf("failed to update embedded cluster state: %w", err)
}
}
return state, nil
}