/
redis_check.go
117 lines (98 loc) · 3.66 KB
/
redis_check.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fixtures
import (
"context"
"time"
)
// Retry checking redis every 5 seconds.
const retryInterval = time.Second * 5
// RedisNotContains verifies that there is no occurrence of targetStr in redis that is written by pipelineName, sinkName.
func RedisNotContains(ctx context.Context, pipelineName, sinkName, targetStr string, opts ...SinkCheckOption) bool {
o := defaultRedisCheckOptions()
for _, opt := range opts {
if opt != nil {
opt(o)
}
}
ctx, cancel := context.WithTimeout(ctx, o.timeout)
defer cancel()
return runChecks(ctx, func() bool {
return !redisContains(pipelineName, sinkName, targetStr, 1)
})
}
// RedisContains verifies that there are targetStr in redis written by pipelineName, sinkName.
func RedisContains(ctx context.Context, pipelineName, sinkName, targetStr string, opts ...SinkCheckOption) bool {
o := defaultRedisCheckOptions()
for _, opt := range opts {
if opt != nil {
opt(o)
}
}
ctx, cancel := context.WithTimeout(ctx, o.timeout)
defer cancel()
return runChecks(ctx, func() bool {
return redisContains(pipelineName, sinkName, targetStr, o.count)
})
}
func redisContains(pipelineName, sinkName, targetStr string, expectedCount int) bool {
// If number of matches is higher than expected, we treat it as passing the check.
return GetMsgCountContains(pipelineName, sinkName, targetStr) >= expectedCount
}
type redisCheckOptions struct {
count int
timeout time.Duration
}
func defaultRedisCheckOptions() *redisCheckOptions {
return &redisCheckOptions{
count: 1,
timeout: defaultTimeout,
}
}
type SinkCheckOption func(*redisCheckOptions)
// SinkCheckWithContainCount updates the redisCheckOptions to specify count.
// The count is the expected number of matches for the check.
func SinkCheckWithContainCount(c int) SinkCheckOption {
return func(o *redisCheckOptions) {
o.count = c
}
}
// SinkCheckWithTimeout updates the redisCheckOptions to specify timeout.
// The timeout specifies how long the redis check will wait for expected data to be ready in redis.
func SinkCheckWithTimeout(t time.Duration) SinkCheckOption {
return func(o *redisCheckOptions) {
o.timeout = t
}
}
type CheckFunc func() bool
// runChecks executes a performChecks function with retry strategy (retryInterval with timeout).
// If performChecks doesn't pass within timeout, runChecks returns false indicating the checks have failed.
// This is to mitigate the problem that we don't know exactly when a numaflow pipeline finishes processing our test data.
// Please notice such approach is not strictly accurate as there can be case where runChecks passes before pipeline finishes processing data.
// Which could result in false positive test results. e.g. checking data doesn't exist can pass before data gets persisted to redis.
func runChecks(ctx context.Context, performChecks CheckFunc) bool {
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
// Timeout reached, meaning checks did not pass within timeout, return false
return false
case <-ticker.C:
if performChecks() {
// All checks passed, return true
return true
}
}
}
}