-
Notifications
You must be signed in to change notification settings - Fork 203
/
event-handlers.go
176 lines (149 loc) · 5.45 KB
/
event-handlers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package cmd
import (
"context"
"fmt"
"github.com/avast/retry-go"
"github.com/spf13/viper"
"net/http"
"time"
"github.com/cosmos/cosmos-sdk/client"
gateway "github.com/ovrclk/akash/provider/gateway/rest"
"github.com/ovrclk/akash/pubsub"
dtypes "github.com/ovrclk/akash/x/deployment/types"
mtypes "github.com/ovrclk/akash/x/market/types"
"github.com/pkg/errors"
)
// EventHandler is a type of function that handles events coming out of the event bus
type EventHandler func(pubsub.Event) error
// SendManifestHander sends manifests on the lease created event
func SendManifestHander(clientCtx client.Context, dd *DeploymentData, gClientDir *gateway.ClientDirectory, retryConfiguration []retry.Option) func(pubsub.Event) error {
pollingRate := viper.GetDuration(FlagTick)
retryIf := func(err error) bool {
isGatewayError := retryIfGatewayClientResponseError(err)
if isGatewayError {
gwError := err.(gateway.ClientResponseError)
switch gwError.Status {
case http.StatusInternalServerError:
return false // don't retry, the provider can't use this manifest
case http.StatusUnprocessableEntity:
return false // don't retry, the manifest isn't well formed
default:
}
}
return isGatewayError
}
var localRetryConfiguration []retry.Option
localRetryConfiguration = append(localRetryConfiguration, retryConfiguration...)
localRetryConfiguration = append(localRetryConfiguration, retry.RetryIf(retryIf))
return func(ev pubsub.Event) (err error) {
addr := clientCtx.GetFromAddress()
log := logger.With("action", "send-manifest")
evLeaseCreated, ok := ev.(mtypes.EventLeaseCreated)
if ok && addr.String() == evLeaseCreated.ID.Owner && evLeaseCreated.ID.DSeq == dd.DeploymentID.DSeq {
// The provider responds to the same event to get ready for a deployment, so sleep here to
// avoid racing the provider
time.Sleep(pollingRate)
log.Info("sending manifest to provider", "provider", evLeaseCreated.ID.Provider, "dseq", evLeaseCreated.ID.DSeq)
gclient, err := gClientDir.GetClientFromBech32(evLeaseCreated.ID.Provider)
if err != nil {
return err
}
return retry.Do(func() error {
err := gclient.SubmitManifest(context.Background(), dd.DeploymentID.DSeq, dd.Manifest)
if err != nil {
log.Debug("send-manifest failed", "lease", evLeaseCreated.ID, "err", err)
}
return err
}, localRetryConfiguration...)
}
return
}
}
var errUnexpectedEvent = errors.New("unexpected event")
// DeploymentDataUpdateHandler updates a DeploymentData and prints relevant events
func DeploymentDataUpdateHandler(dd *DeploymentData, leasesReady chan<- struct{}) func(pubsub.Event) error {
return func(ev pubsub.Event) (err error) {
addr := dd.DeploymentID.Owner
log := logger.With("addr", addr, "dseq", dd.DeploymentID.DSeq)
switch event := ev.(type) {
// Handle deployment creation events
case dtypes.EventDeploymentCreated:
if event.ID.Equals(dd.DeploymentID) {
log.Info("deployment created")
}
return
// Handle deployment update events
case dtypes.EventDeploymentUpdated:
if event.ID.Equals(dd.DeploymentID) {
log.Info("deployment updated")
}
return
// Handle deployment close events
case dtypes.EventDeploymentClosed:
if event.ID.Equals(dd.DeploymentID) {
log.Error("deployment closed unexpectedly")
// TODO - exit here
return fmt.Errorf("%w: deployment closed", errUnexpectedEvent)
}
return
// Handle deployment group close events
case dtypes.EventGroupClosed:
if event.ID.Owner == addr && event.ID.DSeq == dd.DeploymentID.DSeq {
// TODO: Maybe more housekeeping here?
log.Info("deployment group closed")
}
return
// Handle Order creation events
case mtypes.EventOrderCreated:
if addr == event.ID.Owner && event.ID.DSeq == dd.DeploymentID.DSeq {
dd.AddOrder(event.ID)
log.Info("order for deployment created", "oseq", event.ID.OSeq)
}
return
// Handle Order close events
case mtypes.EventOrderClosed:
if addr == event.ID.Owner && event.ID.DSeq == dd.DeploymentID.DSeq {
dd.RemoveOrder(event.ID)
log.Info("order for deployment closed", "oseq", event.ID.OSeq)
}
return
// Handle Bid creation events
case mtypes.EventBidCreated:
if addr == event.ID.Owner && event.ID.DSeq == dd.DeploymentID.DSeq {
log.Info("bid for order created", "oseq", event.ID.OSeq, "price", event.Price)
}
return
// Handle Bid close events
case mtypes.EventBidClosed:
if addr == event.ID.Owner && event.ID.DSeq == dd.DeploymentID.DSeq {
log.Info("bid for order closed", "oseq", event.ID.OSeq, "price", event.Price)
}
return
// Handle Lease creation events
case mtypes.EventLeaseCreated:
if addr == event.ID.Owner && event.ID.DSeq == dd.DeploymentID.DSeq {
dd.AddLease(event.ID)
if dd.ExpectedLeases() {
// Write to channel without blocking, it is buffered
select {
case leasesReady <- struct{}{}:
log.Info("All expected leases created")
default:
}
}
log.Info("lease for order created", "oseq", event.ID.OSeq, "price", event.Price)
}
return
// Handle Lease close events
case mtypes.EventLeaseClosed:
if addr == event.ID.Owner && event.ID.DSeq == dd.DeploymentID.DSeq {
log.Error("lease for order closed", "oseq", event.ID.OSeq, "price", event.Price)
return fmt.Errorf("%w: lease closed oseq: %v", errUnexpectedEvent, event.ID.OSeq)
}
return
// In any other case we should exit with error
default:
return fmt.Errorf("%w: %T", errUnexpectedEvent, ev)
}
}
}