/
run_with_contract_config.go
157 lines (136 loc) · 4.91 KB
/
run_with_contract_config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package managed
import (
"context"
"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/internal/loghelper"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/smartcontractkit/libocr/subprocesses"
)
// runWithContractConfig runs fn with a contractConfig and manages its lifecycle
// as contractConfigs change according to contractConfigTracker. It also saves
// and restores contract configs using database.
func runWithContractConfig(
ctx context.Context,
contractConfigTracker types.ContractConfigTracker,
database types.ConfigDatabase,
fn func(context.Context, types.ContractConfig, loghelper.LoggerWithContext),
localConfig types.LocalConfig,
logger loghelper.LoggerWithContext,
offchainConfigDigester types.OffchainConfigDigester,
) {
rwcc := runWithContractConfigState{
ctx,
types.ConfigDigest{},
contractConfigTracker,
database,
fn,
localConfig,
logger,
prefixCheckConfigDigester{offchainConfigDigester},
func() {},
subprocesses.Subprocesses{},
subprocesses.Subprocesses{},
}
rwcc.run()
}
type runWithContractConfigState struct {
ctx context.Context
configDigest types.ConfigDigest
contractConfigTracker types.ContractConfigTracker
database types.ConfigDatabase
fn func(context.Context, types.ContractConfig, loghelper.LoggerWithContext)
localConfig types.LocalConfig
logger loghelper.LoggerWithContext
configDigester prefixCheckConfigDigester
fnCancel context.CancelFunc
fnSubs subprocesses.Subprocesses
otherSubs subprocesses.Subprocesses
}
func (rwcc *runWithContractConfigState) run() {
// Restore config from database, so that we can run even if the ethereum node
// isn't working.
rwcc.restoreFromDatabase()
// Only start tracking config after we attempted to load config from db
chNewConfig := make(chan types.ContractConfig, 5)
rwcc.otherSubs.Go(func() {
TrackConfig(rwcc.ctx, rwcc.configDigester, rwcc.contractConfigTracker, rwcc.configDigest, rwcc.localConfig, rwcc.logger, chNewConfig)
})
for {
select {
case change := <-chNewConfig:
rwcc.logger.Info("runWithContractConfig: switching between configs", commontypes.LogFields{
"oldConfigDigest": rwcc.configDigest.Hex(),
"newConfigDigest": change.ConfigDigest.Hex(),
})
rwcc.configChanged(change)
case <-rwcc.ctx.Done():
rwcc.logger.Info("runWithContractConfig: winding down", nil)
rwcc.fnSubs.Wait()
rwcc.otherSubs.Wait()
rwcc.logger.Info("runWithContractConfig: exiting", nil)
return // Exit managed event loop altogether
}
}
}
func (rwcc *runWithContractConfigState) restoreFromDatabase() {
var contractConfig *types.ContractConfig
ok := rwcc.otherSubs.BlockForAtMost(
rwcc.ctx,
rwcc.localConfig.DatabaseTimeout,
func(ctx context.Context) {
contractConfig = loadConfigFromDatabase(ctx, rwcc.database, rwcc.logger)
},
)
if !ok {
rwcc.logger.Error("runWithContractConfig: database timed out while attempting to restore configuration", commontypes.LogFields{
"timeout": rwcc.localConfig.DatabaseTimeout,
})
return
}
if contractConfig == nil {
rwcc.logger.Info("runWithContractConfig: found no configuration to restore", commontypes.LogFields{})
return
}
rwcc.configChanged(*contractConfig)
}
func (rwcc *runWithContractConfigState) configChanged(contractConfig types.ContractConfig) {
// Cease any operation from earlier configs
rwcc.logger.Info("runWithContractConfig: winding down old configuration", commontypes.LogFields{
"oldConfigDigest": rwcc.configDigest,
"newConfigDigest": contractConfig.ConfigDigest,
})
rwcc.fnCancel()
rwcc.fnSubs.Wait()
rwcc.logger.Info("runWithContractConfig: closed old configuration", commontypes.LogFields{
"oldConfigDigest": rwcc.configDigest,
"newConfigDigest": contractConfig.ConfigDigest,
})
// note that there is an analogous check in TrackConfig, so this should never trigger.
if err := rwcc.configDigester.CheckContractConfig(contractConfig); err != nil {
rwcc.logger.Error("runWithContractConfig: detected corruption while attempting to change configuration", commontypes.LogFields{
"err": err,
"contractConfig": contractConfig,
})
return
}
rwcc.configDigest = contractConfig.ConfigDigest
fnCtx, fnCancel := context.WithCancel(rwcc.ctx)
rwcc.fnCancel = fnCancel
rwcc.fnSubs.Go(func() {
defer fnCancel()
rwcc.fn(
fnCtx,
contractConfig,
rwcc.logger.MakeChild(commontypes.LogFields{"configDigest": contractConfig.ConfigDigest}),
)
})
writeCtx, writeCancel := context.WithTimeout(rwcc.ctx, rwcc.localConfig.DatabaseTimeout)
defer writeCancel()
if err := rwcc.database.WriteConfig(writeCtx, contractConfig); err != nil {
rwcc.logger.ErrorIfNotCanceled("runWithContractConfig: error writing new config to database", writeCtx, commontypes.LogFields{
"configDigest": contractConfig.ConfigDigest,
"config": contractConfig,
"error": err,
})
}
}