Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Lag-dependent implementation of Redis streams scaler #4592

Merged
merged 38 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
94af8c4
Running e2e tests on new scaler
mikelam-us-aixplain May 1, 2023
80ca2e7
First hopefully successful test
mikelam-us-aixplain May 1, 2023
0cb0cb8
Scaler works now
mikelam-us-aixplain May 3, 2023
d3210ff
Pull request
mikelam-us-aixplain May 4, 2023
0686070
Cleaned up redis streams scaler to support xlag, xpending, and xlength
mikelam-us-aixplain May 15, 2023
acba7ef
Added tests and changed 'lag' to 'lagCount'
mikelam-us-aixplain May 18, 2023
f33c76b
Updated E2E tests to reflect lag -> lagCount
mikelam-us-aixplain May 18, 2023
eeea1b8
Updated protoc version and DeleteKubernetes call
mikelam-us-aixplain May 18, 2023
b6e1144
Added pending entries unit tests, corrected lagFactor error message, …
mikelam-us-aixplain May 30, 2023
fd45427
Removed files that no longer exist
mikelam-us-aixplain May 30, 2023
824c17f
Remove unneeded grpc files
mikelam-us-aixplain May 30, 2023
0b8e64e
Added unit tests and version check in scaler
mikelam-us-aixplain May 30, 2023
bb4d922
Fixing extraneous files
mikelam-us-aixplain May 30, 2023
bd97535
Unit tests working
mikelam-us-aixplain May 30, 2023
24bad85
Final saves
mikelam-us-aixplain May 30, 2023
f908ff6
int -> int64
mikelam-us-aixplain May 31, 2023
ac738c2
Fixed all int64 problems
mikelam-us-aixplain May 31, 2023
32b0b49
Moved test directories
mikelam-us-aixplain May 31, 2023
8f1c1ab
Use XINFO server to check version number
mikelam-us-aixplain Jun 1, 2023
3db35a1
Fixing style errors and docker repository
mikelam-us-aixplain Jun 1, 2023
2809d18
Fixed indentation error
mikelam-us-aixplain Jun 1, 2023
bbee9eb
Still fixing indentation erro
mikelam-us-aixplain Jun 1, 2023
e9b888c
Update redis_cluster_streams_pending_entries_test.go
mikelam-us Jun 1, 2023
eb982fb
Indentation should be fixed
mikelam-us-aixplain Jun 1, 2023
c700f85
Updated CHANGELOG and fixed Redis unit test
mikelam-us-aixplain Jun 1, 2023
a32a3ba
More flexible Redis version check
mikelam-us-aixplain Jun 1, 2023
f01dc41
Fixed version check
mikelam-us-aixplain Jun 1, 2023
66c7e51
Added activation value check andmade namespaces unique across tests
mikelam-us-aixplain Jun 6, 2023
8b76e88
Correcting stylistic errors
mikelam-us-aixplain Jun 6, 2023
4d27605
Fixed unnecessary leading newline
mikelam-us-aixplain Jun 6, 2023
6151763
Fixing formatting
mikelam-us-aixplain Jun 6, 2023
e0e771e
Streamlined e2e test
mikelam-us-aixplain Jun 7, 2023
0dcb105
Problem pushing commits
mikelam-us-aixplain Jun 7, 2023
ec2ce46
Fixed redis activation value bug and added standalone tests
mikelam-us-aixplain Jun 19, 2023
d6f442c
Updated in-line redis scaler documentation
mikelam-us-aixplain Jun 19, 2023
2120ecf
Merge branch 'main' into main
JorTurFer Jun 20, 2023
801b55d
Changed activationTargetLag -> activationLagCount
mikelam-us-aixplain Jun 20, 2023
d8b448a
Merge branch 'main' of https://github.com/mikelam-us/keda
mikelam-us-aixplain Jun 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

- **CPU/Memory scaler**: Add support for scale to zero if there are multiple triggers([#4269](https://github.com/kedacore/keda/issues/4269))
- **Redis Scalers**: Allow scaling using redis stream length ([#4277](https://github.com/kedacore/keda/issues/4277))
- **General**: Introduce new Solr Scaler ([#4234](https://github.com/kedacore/keda/issues/4234))
- **Redis Scalers**: Allow scaling using consumer group lag ([#3127](https://github.com/kedacore/keda/issues/3127))
- **General:** Introduce new Solr Scaler ([#4234](https://github.com/kedacore/keda/issues/4234))

### Improvements

Expand Down
146 changes: 124 additions & 22 deletions pkg/scalers/redis_streams_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"

"github.com/go-logr/logr"
"github.com/redis/go-redis/v9"
Expand All @@ -19,22 +20,27 @@ type scaleFactor int8
const (
xPendingFactor scaleFactor = iota + 1
xLengthFactor
lagFactor
)

const (
// defaults
defaultDBIndex = 0
defaultTargetEntries = 5
defaultDBIndex = 0
defaultTargetEntries = 5
defaultTargetLag = 5
defaultActivationTargetLag = 0

// metadata names
pendingEntriesCountMetadata = "pendingEntriesCount"
streamLengthMetadata = "streamLength"
streamNameMetadata = "stream"
consumerGroupNameMetadata = "consumerGroup"
usernameMetadata = "username"
passwordMetadata = "password"
databaseIndexMetadata = "databaseIndex"
enableTLSMetadata = "enableTLS"
lagMetadata = "lagCount"
pendingEntriesCountMetadata = "pendingEntriesCount"
streamLengthMetadata = "streamLength"
streamNameMetadata = "stream"
consumerGroupNameMetadata = "consumerGroup"
usernameMetadata = "username"
passwordMetadata = "password"
databaseIndexMetadata = "databaseIndex"
enableTLSMetadata = "enableTLS"
activationValueTriggerConfigName = "activationTargetLag"
mikelam-us marked this conversation as resolved.
Show resolved Hide resolved
)

type redisStreamsScaler struct {
Expand All @@ -49,11 +55,13 @@ type redisStreamsMetadata struct {
scaleFactor scaleFactor
targetPendingEntriesCount int64
targetStreamLength int64
targetLag int64
streamName string
consumerGroupName string
databaseIndex int
connectionInfo redisConnectionInfo
scalerIndex int
activationTargetLag int64
}

// NewRedisStreamsScaler creates a new redisStreamsScaler
Expand Down Expand Up @@ -87,6 +95,7 @@ func NewRedisStreamsScaler(ctx context.Context, isClustered, isSentinel bool, co

func createClusteredRedisStreamsScaler(ctx context.Context, meta *redisStreamsMetadata, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) {
client, err := getRedisClusterClient(ctx, meta.connectionInfo)

if err != nil {
return nil, fmt.Errorf("connection to redis cluster failed: %w", err)
}
Expand Down Expand Up @@ -166,6 +175,76 @@ func createEntriesCountFn(client redis.Cmdable, meta *redisStreamsMetadata) (ent
}
return entriesLength, nil
}
case lagFactor:
entriesCountFn = func(ctx context.Context) (int64, error) {
// Make sure that redis is version 7+, which is required for xinfo lag
info, err := client.Info(ctx).Result()
if err != nil {
err := errors.New("could not find Redis version")
return -1, err
}
infoLines := strings.Split(info, "\n")
versionFound := false
for i := 0; i < len(infoLines); i++ {
line := infoLines[i]
lineSplit := strings.Split(line, ":")
if len(lineSplit) > 1 {
fieldName := lineSplit[0]
fieldValue := lineSplit[1]
if fieldName == "redis_version" {
versionFound = true
versionNumString := strings.Split(fieldValue, ".")[0]
versionNum, err := strconv.ParseInt(versionNumString, 10, 64)
if err != nil {
err := errors.New("redis version could not be converted to number")
return -1, err
}
if versionNum < int64(7) {
err := errors.New("redis version 7+ required for lag")
return -1, err
}
break
}
}
}
if !versionFound {
err := errors.New("could not find Redis version number")
return -1, err
}
groups, err := client.XInfoGroups(ctx, meta.streamName).Result()

// If XINFO GROUPS can't find the stream key, it hasn't been created
// yet. In that case, we return a lag of 0.
if fmt.Sprint(err) == "ERR no such key" {
return 0, nil
}

// If the stream has been created, then we find the consumer group
// associated with this scaler and return its lag.
numGroups := len(groups)
for i := 0; i < numGroups; i++ {
group := groups[i]
if group.Name == meta.consumerGroupName {
return int64(group.Lag), nil
}
}

// There is an edge case where the Redis producer has set up the
// stream [meta.streamName], but the consumer group [meta.consumerGroupName]
// for that stream isn't registered with Redis. In other words, the
// producer has created messages for the stream, but the consumer group
// hasn't yet registered itself on Redis because scaling starts with 0
// consumers. In this case, it's necessary to use XLEN to return what
// the lag would have been if the consumer group had been created since
// it's not possible to obtain the lag for a nonexistent consumer
// group. From here, the consumer group gets instantiated, and scaling
// again occurs according to XINFO GROUP lag.
entriesLength, err := client.XLen(ctx, meta.streamName).Result()
if err != nil {
return -1, err
}
return entriesLength, nil
}
default:
err = fmt.Errorf("unrecognized scale factor %v", meta.scaleFactor)
}
Expand Down Expand Up @@ -210,16 +289,38 @@ func parseRedisStreamsMetadata(config *ScalerConfig, parseFn redisAddressParser)
return nil, ErrRedisMissingStreamName
}

meta.activationTargetLag = defaultActivationTargetLag

if val, ok := config.TriggerMetadata[consumerGroupNameMetadata]; ok {
meta.consumerGroupName = val
meta.scaleFactor = xPendingFactor
meta.targetPendingEntriesCount = defaultTargetEntries
if val, ok := config.TriggerMetadata[pendingEntriesCountMetadata]; ok {
pendingEntriesCount, err := strconv.ParseInt(val, 10, 64)
if val, ok := config.TriggerMetadata[lagMetadata]; ok {
meta.scaleFactor = lagFactor
lag, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing pending entries count: %w", err)
return nil, fmt.Errorf("error parsing lag: %w", err)
}
meta.targetLag = lag

if val, ok := config.TriggerMetadata[activationValueTriggerConfigName]; ok {
activationVal, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, errors.New("error while parsing activation lag value")
}
meta.activationTargetLag = activationVal
} else {
err := errors.New("activationTargetLag required for Redis lag")
return nil, err
}
} else {
meta.scaleFactor = xPendingFactor
meta.targetPendingEntriesCount = defaultTargetEntries
if val, ok := config.TriggerMetadata[pendingEntriesCountMetadata]; ok {
pendingEntriesCount, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing pending entries count: %w", err)
}
meta.targetPendingEntriesCount = pendingEntriesCount
}
meta.targetPendingEntriesCount = pendingEntriesCount
}
} else {
meta.scaleFactor = xLengthFactor
Expand Down Expand Up @@ -259,6 +360,8 @@ func (s *redisStreamsScaler) GetMetricSpecForScaling(context.Context) []v2.Metri
metricValue = s.metadata.targetPendingEntriesCount
case xLengthFactor:
metricValue = s.metadata.targetStreamLength
case lagFactor:
metricValue = s.metadata.targetLag
}

externalMetric := &v2.ExternalMetricSource{
Expand All @@ -271,16 +374,15 @@ func (s *redisStreamsScaler) GetMetricSpecForScaling(context.Context) []v2.Metri
return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity fetches the number of pending entries for a consumer group in a stream
// GetMetricsAndActivity fetches the metric value for a consumer group in a stream
func (s *redisStreamsScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
pendingEntriesCount, err := s.getEntriesCountFn(ctx)
metricCount, err := s.getEntriesCountFn(ctx)

if err != nil {
s.logger.Error(err, "error fetching pending entries count")
s.logger.Error(err, "error fetching metric count")
return []external_metrics.ExternalMetricValue{}, false, err
}

metric := GenerateMetricInMili(metricName, float64(pendingEntriesCount))

return []external_metrics.ExternalMetricValue{metric}, pendingEntriesCount > 0, nil
metric := GenerateMetricInMili(metricName, float64(metricCount))
return []external_metrics.ExternalMetricValue{metric}, int64(metricCount) > s.metadata.activationTargetLag, nil
}