Skip to content

Commit

Permalink
Event Hub Scaler: Remove or replace usages of Event Hub offsets
Browse files Browse the repository at this point in the history
Signed-off-by: Walker Crouse <walker.crouse@acuitybrands.com>
  • Loading branch information
Walker Crouse committed Mar 30, 2024
1 parent f2d86a8 commit 5b3cd97
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 102 deletions.
29 changes: 8 additions & 21 deletions pkg/scalers/azure/azure_eventhub_checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,15 @@ import (
// goCheckpoint struct to adapt goSdk Checkpoint
type goCheckpoint struct {
Checkpoint struct {
SequenceNumber int64 `json:"sequenceNumber"`
Offset string `json:"offset"`
SequenceNumber int64 `json:"sequenceNumber"`
} `json:"checkpoint"`
PartitionID string `json:"partitionId"`
}

type baseCheckpoint struct {
Epoch int64 `json:"Epoch"`
Offset string `json:"Offset"`
Owner string `json:"Owner"`
Token string `json:"Token"`
Epoch int64 `json:"Epoch"`
Owner string `json:"Owner"`
Token string `json:"Token"`
}

// Checkpoint in a common format
Expand Down Expand Up @@ -92,8 +90,8 @@ type defaultCheckpointer struct {
containerName string
}

func NewCheckpoint(offset string, sequenceNumber int64) Checkpoint {
return Checkpoint{baseCheckpoint: baseCheckpoint{Offset: offset}, SequenceNumber: sequenceNumber}
func NewCheckpoint(sequenceNumber int64) Checkpoint {
return Checkpoint{baseCheckpoint: baseCheckpoint{}, SequenceNumber: sequenceNumber}
}

// GetCheckpointFromBlobStorage reads depending of the CheckpointStrategy the checkpoint from a azure storage
Expand Down Expand Up @@ -222,10 +220,8 @@ func newGoSdkCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {

return Checkpoint{
SequenceNumber: checkpoint.Checkpoint.SequenceNumber,
baseCheckpoint: baseCheckpoint{
Offset: checkpoint.Checkpoint.Offset,
},
PartitionID: checkpoint.PartitionID,
baseCheckpoint: baseCheckpoint{},
PartitionID: checkpoint.PartitionID,
}, nil
}

Expand Down Expand Up @@ -318,15 +314,6 @@ func getCheckpointFromStorageMetadata(get *azblob.DownloadResponse, partitionID
}
}

if offset, ok := metadata["offset"]; ok {
if !ok {
if offset, ok = metadata["Offset"]; !ok {
return Checkpoint{}, fmt.Errorf("offset on blob not found")
}
}
checkpoint.Offset = offset
}

return checkpoint, nil
}

Expand Down
63 changes: 16 additions & 47 deletions pkg/scalers/azure/azure_eventhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,20 @@ func TestCheckpointFromBlobStorageAzureFunction(t *testing.T) {
}

partitionID := "0"
offset := "1001"
consumerGroup := "$Default1"

sequencenumber := int64(1)

containerName := "azure-webjobs-eventhub"
checkpointFormat := "{\"Offset\":\"%s\",\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID)
checkpointFormat := "{\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
checkpoint := fmt.Sprintf(checkpointFormat, sequencenumber, partitionID)
urlPath := fmt.Sprintf("eventhubnamespace.servicebus.windows.net/hub/%s/", consumerGroup)

ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil)
assert.Equal(t, err, nil)

expectedCheckpoint := Checkpoint{
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
baseCheckpoint: baseCheckpoint{},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}
Expand All @@ -54,8 +51,6 @@ func TestCheckpointFromBlobStorageAzureFunction(t *testing.T) {
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0")
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

Expand All @@ -65,23 +60,20 @@ func TestCheckpointFromBlobStorageDefault(t *testing.T) {
}

partitionID := "1"
offset := "1005"
consumerGroup := "$Default2"

sequencenumber := int64(1)

containerName := "defaultcontainer"
checkpointFormat := "{\"Offset\":\"%s\",\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID)
checkpointFormat := "{\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
checkpoint := fmt.Sprintf(checkpointFormat, sequencenumber, partitionID)
urlPath := fmt.Sprintf("%s/", consumerGroup)

ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil)
assert.Equal(t, err, nil)

expectedCheckpoint := Checkpoint{
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
baseCheckpoint: baseCheckpoint{},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}
Expand All @@ -95,8 +87,6 @@ func TestCheckpointFromBlobStorageDefault(t *testing.T) {
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

Expand All @@ -106,23 +96,20 @@ func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T
}

partitionID := "2"
offset := "1006"
consumerGroup := "$Default3"

sequencenumber := int64(1)

containerName := "defaultcontainerpython"
checkpointFormat := "{\"Offset\":\"%s\",\"sequence_number\":%d,\"partition_id\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID)
checkpointFormat := "{\"sequence_number\":%d,\"partition_id\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
checkpoint := fmt.Sprintf(checkpointFormat, sequencenumber, partitionID)
urlPath := fmt.Sprintf("%s/", consumerGroup)

ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil)
assert.Equal(t, err, nil)

expectedCheckpoint := Checkpoint{
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
baseCheckpoint: baseCheckpoint{},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}
Expand All @@ -136,8 +123,6 @@ func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

Expand All @@ -147,13 +132,11 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) {
}

partitionID := "4"
offset := "1002"
consumerGroup := "$default"

sequencenumber := int64(1)

metadata := map[string]string{
"offset": offset,
"sequencenumber": strconv.FormatInt(sequencenumber, 10),
}

Expand All @@ -164,9 +147,7 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) {
assert.Equal(t, err, nil)

expectedCheckpoint := Checkpoint{
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
baseCheckpoint: baseCheckpoint{},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}
Expand All @@ -181,8 +162,6 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) {
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

Expand All @@ -192,23 +171,20 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) {
}

partitionID := "0"
offset := "1003"

sequencenumber := int64(1)

containerName := "gosdkcontainer"
checkpointFormat := "{\"partitionID\":\"%s\",\"epoch\":0,\"owner\":\"\",\"checkpoint\":{\"offset\":\"%s\",\"sequenceNumber\":%d,\"enqueueTime\":\"\"},\"state\":\"\",\"token\":\"\"}"
checkpoint := fmt.Sprintf(checkpointFormat, partitionID, offset, sequencenumber)
checkpointFormat := "{\"partitionID\":\"%s\",\"epoch\":0,\"owner\":\"\",\"checkpoint\":{\"sequenceNumber\":%d,\"enqueueTime\":\"\"},\"state\":\"\",\"token\":\"\"}"
checkpoint := fmt.Sprintf(checkpointFormat, partitionID, sequencenumber)

urlPath := ""

ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil)
assert.Equal(t, err, nil)

expectedCheckpoint := Checkpoint{
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
baseCheckpoint: baseCheckpoint{},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}
Expand All @@ -222,8 +198,6 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) {
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

Expand All @@ -233,25 +207,22 @@ func TestCheckpointFromBlobStorageDapr(t *testing.T) {
}

partitionID := "0"
offset := "1004"
consumerGroup := "$default"
eventhubName := "hub"

sequencenumber := int64(1)

containerName := fmt.Sprintf("dapr-%s-%s-%s", eventhubName, consumerGroup, partitionID)
checkpointFormat := "{\"partitionID\":\"%s\",\"epoch\":0,\"owner\":\"\",\"checkpoint\":{\"offset\":\"%s\",\"sequenceNumber\":%d,\"enqueueTime\":\"\"},\"state\":\"\",\"token\":\"\"}"
checkpoint := fmt.Sprintf(checkpointFormat, partitionID, offset, sequencenumber)
checkpointFormat := "{\"partitionID\":\"%s\",\"epoch\":0,\"owner\":\"\",\"checkpoint\":{\"sequenceNumber\":%d,\"enqueueTime\":\"\"},\"state\":\"\",\"token\":\"\"}"
checkpoint := fmt.Sprintf(checkpointFormat, partitionID, sequencenumber)

urlPath := ""

ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil)
assert.Equal(t, err, nil)

expectedCheckpoint := Checkpoint{
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
baseCheckpoint: baseCheckpoint{},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}
Expand All @@ -265,8 +236,6 @@ func TestCheckpointFromBlobStorageDapr(t *testing.T) {
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

Expand Down
12 changes: 2 additions & 10 deletions pkg/scalers/azure_eventhub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ func parseAzureEventHubAuthenticationMetadata(logger logr.Logger, config *scaler

// GetUnprocessedEventCountInPartition gets number of unprocessed events in a given partition
func (s *azureEventHubScaler) GetUnprocessedEventCountInPartition(ctx context.Context, partitionInfo *eventhub.HubPartitionRuntimeInformation) (newEventCount int64, checkpoint azure.Checkpoint, err error) {
// if partitionInfo.LastEnqueuedOffset = -1, that means event hub partition is empty
if partitionInfo == nil || partitionInfo.LastEnqueuedOffset == "-1" {
// if partitionInfo.LastEnqueuedSequenceNumber = -1, that means event hub partition is empty
if partitionInfo == nil || partitionInfo.LastEnqueuedSequenceNumber == -1 {
return 0, azure.Checkpoint{}, nil
}

Expand All @@ -306,14 +306,6 @@ func (s *azureEventHubScaler) GetUnprocessedEventCountInPartition(ctx context.Co
func calculateUnprocessedEvents(partitionInfo *eventhub.HubPartitionRuntimeInformation, checkpoint azure.Checkpoint, stalePartitionInfoThreshold int64) int64 {
unprocessedEventCount := int64(0)

// If checkpoint.Offset is empty that means no messages has been processed from an event hub partition
// And since partitionInfo.LastSequenceNumber = 0 for the very first message hence
// total unprocessed message will be partitionInfo.LastSequenceNumber + 1
if checkpoint.Offset == "" {
unprocessedEventCount = partitionInfo.LastSequenceNumber + 1
return unprocessedEventCount
}

if partitionInfo.LastSequenceNumber >= checkpoint.SequenceNumber {
unprocessedEventCount = partitionInfo.LastSequenceNumber - checkpoint.SequenceNumber
} else {
Expand Down
36 changes: 18 additions & 18 deletions pkg/scalers/azure_eventhub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,51 +211,51 @@ var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestDat

var calculateUnprocessedEventsDataset = []calculateUnprocessedEventsTestData{
{
checkpoint: azure.NewCheckpoint("1", 5),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 10, LastEnqueuedOffset: "2"},
checkpoint: azure.NewCheckpoint(5),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 10},
unprocessedEvents: 5,
},
{
checkpoint: azure.NewCheckpoint("1002", 4611686018427387903),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905, LastEnqueuedOffset: "1000"},
checkpoint: azure.NewCheckpoint(4611686018427387903),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905},
unprocessedEvents: 2,
},
{
checkpoint: azure.NewCheckpoint("900", 4611686018427387900),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905, LastEnqueuedOffset: "1000"},
checkpoint: azure.NewCheckpoint(4611686018427387900),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905},
unprocessedEvents: 5,
},
{
checkpoint: azure.NewCheckpoint("800", 4000000000000200000),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4000000000000000000, LastEnqueuedOffset: "750"},
checkpoint: azure.NewCheckpoint(4000000000000200000),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4000000000000000000},
unprocessedEvents: 9223372036854575807,
},
// Empty checkpoint
{
checkpoint: azure.NewCheckpoint("", 0),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 1, LastEnqueuedOffset: "1"},
checkpoint: azure.NewCheckpoint(0),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 1},
unprocessedEvents: 2,
},
// Stale PartitionInfo
{
checkpoint: azure.NewCheckpoint("5", 15),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 10, LastEnqueuedOffset: "2"},
checkpoint: azure.NewCheckpoint(15),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 10},
unprocessedEvents: 0,
},
{
checkpoint: azure.NewCheckpoint("1000", 4611686018427387910),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905, LastEnqueuedOffset: "900"},
checkpoint: azure.NewCheckpoint(4611686018427387910),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905},
unprocessedEvents: 0,
},
{
checkpoint: azure.NewCheckpoint("1", 5),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 9223372036854775797, LastEnqueuedOffset: "10000"},
checkpoint: azure.NewCheckpoint(5),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 9223372036854775797},
unprocessedEvents: 0,
},
// Circular buffer reset
{
checkpoint: azure.NewCheckpoint("100000", 9223372036854775797),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 5, LastEnqueuedOffset: "1"},
checkpoint: azure.NewCheckpoint(9223372036854775797),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 5},
unprocessedEvents: 15,
},
}
Expand Down
12 changes: 6 additions & 6 deletions vendor/github.com/Azure/azure-event-hubs-go/v3/amqp_mgmt.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5b3cd97

Please sign in to comment.