-
Notifications
You must be signed in to change notification settings - Fork 577
/
receiver.go
191 lines (159 loc) · 5.42 KB
/
receiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package receiver
import (
"context"
"fmt"
"log"
"net"
"runtime"
"time"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/golang/protobuf/ptypes/timestamp"
"google.golang.org/protobuf/types/known/timestamppb"
"knative.dev/eventing/test/performance/infra/common"
pb "knative.dev/eventing/test/performance/infra/event_state"
)
const shutdownWaitTime = time.Second * 5
// Receiver records the received events and sends to the aggregator.
// Since sender implementations can put id and type of event inside the event payload,
// then the Receiver uses IdExtractor and TypeExtractor to extract them
type Receiver struct {
typeExtractor TypeExtractor
idExtractor IdExtractor
timeout time.Duration
receivedCh chan common.EventTimestamp
endCh chan struct{}
receivedEvents *pb.EventsRecord
// aggregator GRPC client
aggregatorClient *pb.AggregatorClient
}
func NewReceiver(paceFlag string, aggregAddr string, warmupSeconds uint, typeExtractor TypeExtractor, idExtractor IdExtractor) (common.Executor, error) {
pace, err := common.ParsePaceSpec(paceFlag)
if err != nil {
return nil, err
}
// create a connection to the aggregator
aggregatorClient, err := pb.NewAggregatorClient(aggregAddr)
if err != nil {
return nil, err
}
channelSize, totalMessages := common.CalculateMemoryConstraintsForPaceSpecs(pace)
// Calculate timeout for receiver
var timeout time.Duration
timeout = time.Second * time.Duration(warmupSeconds)
if timeout != 0 {
timeout += common.WaitAfterWarmup
}
for _, p := range pace {
timeout += p.Duration + common.WaitForFlush + common.WaitForReceiverGC
}
// The timeout is doubled because the sender is slowed down by the SUT when the load is too high and test requires more than needed.
// Coefficient of 2 is based on experimental evidence.
// More: https://github.com/knative/eventing/pull/2195#discussion_r348368914
timeout *= 2
return &Receiver{
typeExtractor: typeExtractor,
idExtractor: idExtractor,
timeout: timeout,
receivedCh: make(chan common.EventTimestamp, channelSize),
endCh: make(chan struct{}, 1),
receivedEvents: &pb.EventsRecord{
Type: pb.EventsRecord_RECEIVED,
Events: make(map[string]*timestamp.Timestamp, totalMessages),
},
aggregatorClient: aggregatorClient,
}, nil
}
func (r *Receiver) Run(ctx context.Context) {
// Wait the port before starting the ce receiver
waitForPortAvailable(common.CEReceiverPort)
receiverCtx, closeReceiver := context.WithCancel(ctx)
go func() {
if err := r.startCloudEventsReceiver(receiverCtx); err != nil {
log.Fatalf("Failed to start CloudEvents receiver: %v", err)
}
}()
// When the testing service is degraded, there is a chance that the end message is not received
// This timer sends to endCh a signal to stop processing events and start tear down of receiver
timeoutTimer := time.AfterFunc(r.timeout, func() {
log.Printf("Receiver timeout")
r.endCh <- struct{}{}
})
log.Printf("Started receiver timeout timer of duration %v", r.timeout)
r.processEvents()
// Stop the timeoutTimer in case the tear down was triggered by end message
timeoutTimer.Stop()
closeReceiver()
log.Println("Receiver closed")
log.Printf("%-15s: %d", "Received count", len(r.receivedEvents.Events))
if err := r.aggregatorClient.Publish(&pb.EventsRecordList{Items: []*pb.EventsRecord{
r.receivedEvents,
}}); err != nil {
log.Fatalf("Failed to send events record: %v\n", err)
}
close(r.receivedCh)
}
func (r *Receiver) processEvents() {
for {
select {
case e, ok := <-r.receivedCh:
if !ok {
return
}
r.receivedEvents.Events[e.EventId] = e.At
case <-r.endCh:
return
}
}
}
func (r *Receiver) startCloudEventsReceiver(ctx context.Context) error {
cli, err := cloudevents.NewClientHTTP()
if err != nil {
return fmt.Errorf("failed to create CloudEvents client: %v", err)
}
log.Printf("CloudEvents receiver started")
return cli.StartReceiver(ctx, r.processReceiveEvent)
}
// processReceiveEvent processes the event received by the CloudEvents receiver.
func (r *Receiver) processReceiveEvent(event cloudevents.Event) {
t := r.typeExtractor(event)
switch t {
case common.MeasureEventType:
r.receivedCh <- common.EventTimestamp{EventId: r.idExtractor(event), At: timestamppb.Now()}
case common.GCEventType:
runtime.GC()
case common.EndEventType:
log.Printf("End message received correctly")
// Wait a bit so all messages on wire are processed
time.AfterFunc(shutdownWaitTime, func() {
r.endCh <- struct{}{}
})
}
}
// waitForPortAvailable waits until the given TCP port is available.
func waitForPortAvailable(port string) {
var free bool
for i := 0; i < 30; i++ {
conn, err := net.Dial("tcp", ":"+port)
if _, ok := err.(*net.OpError); ok {
free = true
break
}
_ = conn.Close()
time.Sleep(10 * time.Millisecond)
}
if !free {
log.Fatalf("Timeout waiting for TCP port %s to become available\n", port)
}
}