/
broker.go
145 lines (118 loc) · 3.36 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package broker
import (
"context"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/kyma-project/kyma/tests/function-controller/pkg/helpers"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"github.com/kyma-project/kyma/tests/function-controller/pkg/resource"
"github.com/kyma-project/kyma/tests/function-controller/pkg/shared"
"github.com/pkg/errors"
)
const DefaultName = "default"
type Broker struct {
resCli *resource.Resource
name string
namespace string
waitTimeout time.Duration
log *logrus.Entry
verbose bool
}
func New(c shared.Container) *Broker {
gvr := schema.GroupVersionResource{
Group: "eventing.knative.dev", Version: "v1alpha1",
Resource: "brokers",
}
return &Broker{
resCli: resource.New(c.DynamicCli, gvr, c.Namespace, c.Log, c.Verbose),
name: DefaultName,
namespace: c.Namespace,
waitTimeout: c.WaitTimeout,
log: c.Log,
verbose: c.Verbose,
}
}
func (b *Broker) Get() (*unstructured.Unstructured, error) {
u, err := b.resCli.Get(b.name)
if err != nil {
return &unstructured.Unstructured{}, errors.Wrapf(err, "while getting Broker %s in namespace %s", b.name, b.namespace)
}
return u, nil
}
func (b *Broker) Delete() error {
err := b.resCli.Delete(b.name)
if err != nil {
return errors.Wrapf(err, "while deleting Broker %s in namespace %s", b.name, b.namespace)
}
return nil
}
func (b *Broker) LogResource() error {
broker, err := b.Get()
if err != nil {
return err
}
out, err := helpers.PrettyMarshall(broker)
if err != nil {
return err
}
b.log.Infof("Broker resource: %s", out)
return nil
}
func (b *Broker) WaitForStatusRunning() error {
broker, err := b.Get()
if err != nil {
return err
}
if b.isStateReady(*broker) {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), b.waitTimeout)
defer cancel()
condition := b.isBrokerReady(b.name)
return resource.WaitUntilConditionSatisfied(ctx, b.resCli.ResCli, condition)
}
func (b *Broker) isBrokerReady(name string) func(event watch.Event) (bool, error) {
return func(event watch.Event) (bool, error) {
broker, ok := event.Object.(*unstructured.Unstructured)
if !ok {
return false, shared.ErrInvalidDataType
}
if broker.GetName() != name {
b.log.Infof("names mismatch, object's name %s, supplied %s", broker.GetName(), name)
return false, nil
}
return b.isStateReady(*broker), nil
}
}
func (b Broker) isStateReady(broker unstructured.Unstructured) bool {
conditions, found, err := unstructured.NestedSlice(broker.Object, "status", "conditions")
if err != nil {
// status.conditions may have not been added by eventing controller by now
b.log.Warn("Broker does not have status.conditions")
return false
}
if !found {
// or it may not even exist, but it should not be the case
b.log.Warn("Broker not found")
return false
}
ready := false
for _, cond := range conditions {
// casting to map[string]string here doesn't work, ok is false
cond, ok := cond.(map[string]interface{})
if !ok {
b.log.Warn("couldn't cast broker's condition to map[string]interface{}")
ready = false
break
}
if cond["type"].(string) != "Ready" {
continue
}
ready = cond["status"].(string) == "True"
break
}
shared.LogReadiness(ready, b.verbose, b.name, b.log, broker)
return ready
}