This repository has been archived by the owner on Nov 16, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 58
/
entity_handler.go
183 lines (149 loc) · 5.17 KB
/
entity_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
///////////////////////////////////////////////////////////////////////
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
///////////////////////////////////////////////////////////////////////
package drivers
import (
"context"
"reflect"
"strings"
"time"
ewrapper "github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/vmware/dispatch/pkg/controller"
"github.com/vmware/dispatch/pkg/entity-store"
"github.com/vmware/dispatch/pkg/event-manager/drivers/entities"
"github.com/vmware/dispatch/pkg/trace"
)
// EntityHandler handles driver entity operations
type EntityHandler struct {
store entitystore.EntityStore
backend Backend
}
// NewEntityHandler creates new instance of EntityHandler
func NewEntityHandler(store entitystore.EntityStore, backend Backend) *EntityHandler {
return &EntityHandler{
store: store,
backend: backend,
}
}
// Type returns Entity Handler type
func (h *EntityHandler) Type() reflect.Type {
return reflect.TypeOf(&entities.Driver{})
}
// Add adds new driver to the store, and executes its deployment.
func (h *EntityHandler) Add(ctx context.Context, obj entitystore.Entity) (err error) {
span, ctx := trace.Trace(ctx, "")
defer span.Finish()
driver := obj.(*entities.Driver)
defer func() { h.store.Update(ctx, driver.GetRevision(), driver) }()
log.Infof("Adding driver %s - expose: %v", driver.Name, driver.Expose)
if err := h.backend.Deploy(ctx, driver); err != nil {
translateErrorToEntityState(driver, err)
return ewrapper.Wrap(err, "error deploying driver")
}
driver.Status = entitystore.StatusREADY
log.Infof("%s-driver %s has been deployed", driver.Type, driver.Name)
return nil
}
// Update updates the driver by updating the deployment
func (h *EntityHandler) Update(ctx context.Context, obj entitystore.Entity) (err error) {
span, ctx := trace.Trace(ctx, "")
defer span.Finish()
driver := obj.(*entities.Driver)
defer func() { h.store.Update(ctx, driver.GetRevision(), driver) }()
if err := h.backend.Update(ctx, driver); err != nil {
translateErrorToEntityState(driver, err)
return ewrapper.Wrap(err, "error updating driver")
}
driver.Status = entitystore.StatusREADY
driver.SetReason([]string{})
log.Infof("%s-driver %s has been updated", driver.Type, driver.Name)
return nil
}
// Delete deletes the driver from the backend
func (h *EntityHandler) Delete(ctx context.Context, obj entitystore.Entity) error {
span, ctx := trace.Trace(ctx, "")
defer span.Finish()
driver := obj.(*entities.Driver)
// delete the deployment from backend
err := h.backend.Delete(ctx, driver)
driver.SetDelete(true)
if err != nil {
translateErrorToEntityState(driver, err)
h.store.Update(ctx, driver.GetRevision(), driver)
return h.Error(ctx, driver)
}
if err := h.store.Delete(ctx, driver.OrganizationID, driver.Name, driver); err != nil {
return ewrapper.Wrap(err, "store error when deleting driver")
}
log.Infof("driver %s deleted from backend and the entity store", driver.Name)
return nil
}
// Sync Executes sync loop
func (h *EntityHandler) Sync(ctx context.Context, resyncPeriod time.Duration) ([]entitystore.Entity, error) {
span, ctx := trace.Trace(ctx, "")
defer span.Finish()
// list entity filter
filter := entitystore.FilterEverything().Add(
entitystore.FilterStat{
Scope: entitystore.FilterScopeField,
Subject: "ModifiedTime",
Verb: entitystore.FilterVerbBefore,
Object: time.Now().Add(-resyncPeriod),
})
syncingEntities, err := controller.DefaultSync(ctx, h.store, h.Type(), resyncPeriod, filter)
if err != nil {
return nil, err
}
return syncingEntities, nil
}
// Error handles error state
func (h *EntityHandler) Error(ctx context.Context, obj entitystore.Entity) error {
span, ctx := trace.Trace(ctx, "")
defer span.Finish()
driver := obj.(*entities.Driver)
var err error
defer func() { h.store.UpdateWithError(ctx, driver, err) }()
if len(driver.GetReason()) == 0 {
return ewrapper.Errorf("%s without error reason", driver.GetName())
}
log.Debugf("%s error: reasons are: %s", driver.GetName(), driver.GetReason())
// TODO: recover/handle error state
recover := false
switch driver.GetReason()[0] {
case errReasonDeploymentNotFound:
if driver.GetDelete() {
// in DELETE status, delete driver entity
log.Debugf("%s in delete state, deployment not found, delete entity", driver.GetName())
h.store.Delete(ctx, driver.OrganizationID, driver.Name, driver)
recover = true
}
case errReasonDeploymentAlreadyExists, errReasonDeploymentNotAvaialble:
// do update
h.Update(ctx, driver)
default:
log.Debug("other error")
}
if recover {
log.Debugf("%s recovered", driver.Name)
} else {
log.Debugf("%s failed to recover: %s", driver.Name, strings.Join(driver.GetReason(), ", "))
}
return err
}
func translateErrorToEntityState(driver *entities.Driver, e error) {
if e == nil {
return
}
log.Debugf("put driver to error state %s: %s", driver.GetName(), e)
reason := []string{
e.Error(),
}
if c, ok := e.(Causer); ok {
log.Debugf("%s -- underlying error reason: %s", driver.GetName(), c.Cause().Error())
reason = append(reason, c.Cause().Error())
}
driver.SetReason(reason)
driver.SetStatus(entitystore.StatusERROR)
}