Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: batch event schema messages for faster processing #3406

Merged
merged 1 commit into from May 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 35 additions & 0 deletions proto/event-schema/types.helper.go
@@ -1,7 +1,11 @@
package proto

import (
"crypto/md5"
"encoding/hex"
"fmt"
"sort"
"strings"

"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -30,3 +34,34 @@ func UnmarshalEventSchemaMessage(raw []byte) (*EventSchemaMessage, error) {
}
return p, nil
}

// SchemaHash returns a hash of the schema. Keys are sorted lexicographically during hashing.
func SchemaHash(schema map[string]string) string {
keys := make([]string, 0, len(schema))
for k := range schema {
keys = append(keys, k)
}
sort.Strings(keys)
var sb strings.Builder
for _, k := range keys {
sb.WriteString(k)
sb.WriteString(":")
sb.WriteString(schema[k])
sb.WriteString(",")
}
md5Sum := md5.Sum([]byte(sb.String()))
schemaHash := hex.EncodeToString(md5Sum[:])
return schemaHash
}

// Merge merges the other event schema message into this one.
func (sm *EventSchemaMessage) Merge(other *EventSchemaMessage) {
sm.BatchCount += other.BatchCount + 1
if len(other.Sample) < len(sm.Sample) { // keep the smallest sample
sm.Sample = other.Sample
}
sm.Sample = other.Sample
if other.ObservedAt.AsTime().After(sm.ObservedAt.AsTime()) { // keep the laters observed time
sm.ObservedAt = other.ObservedAt
}
}
35 changes: 27 additions & 8 deletions proto/event-schema/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions proto/event-schema/types.proto
Expand Up @@ -16,4 +16,6 @@ message EventSchemaMessage {
google.protobuf.Timestamp observedAt = 4;
bytes sample = 5;
bytes correlationID = 6;
string hash = 7;
int64 batchCount = 8;
}
72 changes: 72 additions & 0 deletions schema-forwarder/internal/batcher/batcher.go
@@ -0,0 +1,72 @@
package batcher

import (
"github.com/rudderlabs/rudder-server/jobsdb"
proto "github.com/rudderlabs/rudder-server/proto/event-schema"
"github.com/rudderlabs/rudder-server/schema-forwarder/internal/transformer"
)

// A batch of jobs that share the same schema.
type EventSchemaMessageBatch struct {
Message *proto.EventSchemaMessage
Jobs []*jobsdb.JobT
}

// NewEventSchemaMessageBatcher creates a new batcher.
func NewEventSchemaMessageBatcher(transformer transformer.Transformer) *EventSchemaMessageBatcher {
return &EventSchemaMessageBatcher{
transformer: transformer,
batchIndex: make(map[batchKey]*EventSchemaMessageBatch),
}
}

// EventSchemaMessageBatcher batches jobs by their schema.
type EventSchemaMessageBatcher struct {
transformer transformer.Transformer

batchOrder []batchKey
batchIndex map[batchKey]*EventSchemaMessageBatch
}

// Add adds a job to the batcher after transforming it to an [EventSchemaMessage].
// If the message is already in the batcher, the two messages will be merged to one.
func (sb *EventSchemaMessageBatcher) Add(job *jobsdb.JobT) error {
msg, err := sb.transformer.Transform(job)
if err != nil {
return err
}
key := batchKey{
writeKey: msg.Key.WriteKey,
eventType: msg.Key.EventType,
eventIdentifier: msg.Key.EventIdentifier,
hash: msg.Hash,
}
if _, ok := sb.batchIndex[key]; !ok {
sb.batchOrder = append(sb.batchOrder, key)
sb.batchIndex[key] = &EventSchemaMessageBatch{
Message: msg,
Jobs: []*jobsdb.JobT{job},
}
} else {
sb.batchIndex[key].Jobs = append(sb.batchIndex[key].Jobs, job)
sb.batchIndex[key].Message.Merge(msg)
}
return nil
}

// GetMessageBatches returns the message batches in the order they were added.
func (sb *EventSchemaMessageBatcher) GetMessageBatches() []*EventSchemaMessageBatch {
batches := make([]*EventSchemaMessageBatch, len(sb.batchOrder))
for i, key := range sb.batchOrder {
batches[i] = sb.batchIndex[key]
}
return batches
}

// batchKey is the key used for batching.
type batchKey struct {
writeKey string
eventType string
eventIdentifier string
hash string
}
98 changes: 98 additions & 0 deletions schema-forwarder/internal/batcher/batcher_test.go
@@ -0,0 +1,98 @@
package batcher_test

import (
"errors"
"testing"

"github.com/rudderlabs/rudder-server/jobsdb"
proto "github.com/rudderlabs/rudder-server/proto/event-schema"
"github.com/rudderlabs/rudder-server/schema-forwarder/internal/batcher"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestEventSchemaMessageBatcher(t *testing.T) {
t.Run("Add same message twice", func(t *testing.T) {
mockTransformer := &mockTransformer{}
mockTransformer.msg = &proto.EventSchemaMessage{
Key: &proto.EventSchemaKey{
WriteKey: "write-key",
EventType: "event-type",
EventIdentifier: "event-identifier",
},
Hash: "hash",
ObservedAt: timestamppb.Now(),
}

b := batcher.NewEventSchemaMessageBatcher(mockTransformer)

require.NoError(t, b.Add(&jobsdb.JobT{JobID: 1}))
require.NoError(t, b.Add(&jobsdb.JobT{JobID: 2}))

batches := b.GetMessageBatches()
require.Len(t, batches, 1)
require.Len(t, batches[0].Jobs, 2)
require.EqualValues(t, 1, batches[0].Message.BatchCount)
})

t.Run("Add different hash", func(t *testing.T) {
mockTransformer := &mockTransformer{}
mockTransformer.msg = &proto.EventSchemaMessage{
Key: &proto.EventSchemaKey{
WriteKey: "write-key",
EventType: "event-type",
EventIdentifier: "event-identifier",
},
Hash: "hash-1",
ObservedAt: timestamppb.Now(),
}

b := batcher.NewEventSchemaMessageBatcher(mockTransformer)

require.NoError(t, b.Add(&jobsdb.JobT{JobID: 1}))

mockTransformer.msg = &proto.EventSchemaMessage{
Key: &proto.EventSchemaKey{
WriteKey: "write-key",
EventType: "event-type",
EventIdentifier: "event-identifier",
},
Hash: "hash-2",
ObservedAt: timestamppb.Now(),
}

require.NoError(t, b.Add(&jobsdb.JobT{JobID: 2}))

batches := b.GetMessageBatches()
require.Len(t, batches, 2)

require.Len(t, batches[0].Jobs, 1)
require.EqualValues(t, 1, batches[0].Jobs[0].JobID)
require.EqualValues(t, 0, batches[0].Message.BatchCount)

require.Len(t, batches[1].Jobs, 1)
require.EqualValues(t, 2, batches[1].Jobs[0].JobID)
require.EqualValues(t, 0, batches[1].Message.BatchCount)
})

t.Run("Transformer error", func(t *testing.T) {
mockTransformer := &mockTransformer{fail: true}
b := batcher.NewEventSchemaMessageBatcher(mockTransformer)
require.Error(t, b.Add(&jobsdb.JobT{JobID: 2}))
})
}

type mockTransformer struct {
fail bool
msg *proto.EventSchemaMessage
}

func (*mockTransformer) Start() {}
func (*mockTransformer) Stop() {}

func (mt *mockTransformer) Transform(job *jobsdb.JobT) (*proto.EventSchemaMessage, error) {
if mt.fail {
return nil, errors.New("failed")
}
return mt.msg, nil
}