/
policy_syncer.go
197 lines (155 loc) · 5.74 KB
/
policy_syncer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
// Copyright (c) 2021 Red Hat, Inc.
// Copyright Contributors to the Open Cluster Management project
package dbsyncers
import (
"context"
"fmt"
"time"
"github.com/go-logr/logr"
"github.com/jackc/pgx/v4/pgxpool"
policiesv1 "github.com/open-cluster-management/governance-policy-propagator/api/v1"
"k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
const (
dbEnumCompliant = "compliant"
dbEnumNonCompliant = "non_compliant"
)
type policyDBSyncer struct {
client client.Client
log logr.Logger
databaseConnectionPool *pgxpool.Pool
syncInterval time.Duration
tableName string
specTableName string
}
func (syncer *policyDBSyncer) Start(ctx context.Context) error {
ctxWithCancel, cancelContext := context.WithCancel(ctx)
defer cancelContext()
go syncer.periodicSync(ctxWithCancel)
<-ctx.Done() // blocking wait for stop event
syncer.log.Info("stop performing sync", "table", syncer.tableName)
return nil // context cancel is called before exiting this function
}
func (syncer *policyDBSyncer) periodicSync(ctx context.Context) {
ticker := time.NewTicker(syncer.syncInterval)
var (
cancelFunc context.CancelFunc
ctxWithTimeout context.Context
)
for {
select {
case <-ctx.Done(): // we have received a signal to stop
ticker.Stop()
if cancelFunc != nil {
cancelFunc()
}
return
case <-ticker.C:
if cancelFunc != nil {
cancelFunc()
}
ctxWithTimeout, cancelFunc = context.WithTimeout(ctx, syncer.syncInterval)
syncer.sync(ctxWithTimeout)
}
}
}
func (syncer *policyDBSyncer) sync(ctx context.Context) {
syncer.log.Info("performing sync", "table", syncer.tableName)
rows, err := syncer.databaseConnectionPool.Query(ctx,
fmt.Sprintf(`SELECT id, payload -> 'metadata' ->> 'name' as name, payload -> 'metadata' ->> 'namespace'
as namespace FROM spec.%s WHERE deleted = FALSE`, syncer.specTableName))
if err != nil {
syncer.log.Error(err, "error in getting policies spec")
return
}
for rows.Next() {
var id, name, namespace string
err := rows.Scan(&id, &name, &namespace)
if err != nil {
syncer.log.Error(err, "error in select", "table", syncer.specTableName)
continue
}
instance := &policiesv1.Policy{}
err = syncer.client.Get(ctx, client.ObjectKey{Name: name, Namespace: namespace}, instance)
if err != nil {
syncer.log.Error(err, "error in getting CR", "name", name, "namespace", namespace)
continue
}
go syncer.handlePolicy(ctx, instance)
}
}
func (syncer *policyDBSyncer) handlePolicy(ctx context.Context, instance *policiesv1.Policy) {
syncer.log.Info("handling a policy", "policy", instance, "uid", string(instance.GetUID()))
rows, err := syncer.databaseConnectionPool.Query(ctx,
fmt.Sprintf(`SELECT cluster_name, leaf_hub_name, compliance FROM status.%s
WHERE id = '%s' ORDER BY leaf_hub_name, cluster_name`,
syncer.tableName, string(instance.GetUID())))
if err != nil {
syncer.log.Error(err, "error in getting policy statuses from DB")
}
compliancePerClusterStatuses := []*policiesv1.CompliancePerClusterStatus{}
hasNonCompliantClusters := false
dbEnumToPolicyComplianceStateMap := map[string]policiesv1.ComplianceState{
dbEnumCompliant: policiesv1.Compliant,
dbEnumNonCompliant: policiesv1.NonCompliant,
}
for rows.Next() {
var clusterName, leafHubName, complianceInDB string
err := rows.Scan(&clusterName, &leafHubName, &complianceInDB)
if err != nil {
syncer.log.Error(err, "error in select", "table", syncer.tableName)
continue
}
syncer.log.Info("handling a line in compliance table", "clusterName", clusterName,
"leafHubName", leafHubName, "compliance", complianceInDB)
var compliance policiesv1.ComplianceState
if mappedCompliance, ok := dbEnumToPolicyComplianceStateMap[complianceInDB]; ok {
compliance = mappedCompliance
}
if compliance == policiesv1.NonCompliant {
hasNonCompliantClusters = true
}
compliancePerClusterStatuses = append(compliancePerClusterStatuses, &policiesv1.CompliancePerClusterStatus{
ComplianceState: compliance,
ClusterName: clusterName,
ClusterNamespace: clusterName,
})
}
syncer.log.Info("calculated compliance", "compliancePerClusterStatuses", compliancePerClusterStatuses)
err = syncer.updateComplianceStatus(ctx, instance, instance.DeepCopy(), compliancePerClusterStatuses,
hasNonCompliantClusters)
if err != nil {
syncer.log.Error(err, "Failed to update compliance status")
}
}
func (syncer *policyDBSyncer) updateComplianceStatus(ctx context.Context, instance, originalInstance *policiesv1.Policy,
compliancePerClusterStatuses []*policiesv1.CompliancePerClusterStatus, hasNonCompliantClusters bool) error {
instance.Status.Status = compliancePerClusterStatuses
instance.Status.ComplianceState = ""
if hasNonCompliantClusters {
instance.Status.ComplianceState = policiesv1.NonCompliant
} else if len(compliancePerClusterStatuses) > 0 {
instance.Status.ComplianceState = policiesv1.Compliant
}
err := syncer.client.Status().Patch(ctx, instance, client.MergeFrom(originalInstance))
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to update policy CR: %w", err)
}
return nil
}
func addPolicyDBSyncer(mgr ctrl.Manager, databaseConnectionPool *pgxpool.Pool, syncInterval time.Duration) error {
err := mgr.Add(&policyDBSyncer{
client: mgr.GetClient(),
log: ctrl.Log.WithName("policy-db-syncer"),
databaseConnectionPool: databaseConnectionPool,
syncInterval: syncInterval,
tableName: "compliance",
specTableName: "policies",
})
if err != nil {
return fmt.Errorf("failed to add policy syncer to the manager: %w", err)
}
return nil
}