Skip to content

Commit

Permalink
chore: collect less sample_events for reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
satishrudderstack committed Apr 24, 2024
1 parent ba98beb commit 44d2319
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 0 deletions.
48 changes: 48 additions & 0 deletions enterprise/reporting/event_sampler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package reporting

import (
"sync"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
)

type EventSampler struct {
collectedSamples map[string]bool
mu sync.Mutex
resetDuration config.ValueLoader[time.Duration]
}

func NewEventSampler(resetDuration config.ValueLoader[time.Duration]) *EventSampler {
return &EventSampler{
collectedSamples: make(map[string]bool),
resetDuration: resetDuration,
}
}

func (es *EventSampler) IsSampleEventCollected(groupingColumns string) bool {
es.mu.Lock()
defer es.mu.Unlock()

_, exists := es.collectedSamples[groupingColumns]
return exists
}

func (es *EventSampler) MarkSampleEventAsCollected(groupingColumns string) {
es.mu.Lock()
defer es.mu.Unlock()

es.collectedSamples[groupingColumns] = true
}

func (es *EventSampler) StartResetLoop() error {
go func() {
for {
time.Sleep(es.resetDuration.Load())
es.mu.Lock()
es.collectedSamples = make(map[string]bool)
es.mu.Unlock()
}
}()
return nil
}
22 changes: 22 additions & 0 deletions enterprise/reporting/event_sampler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package reporting

import (
"testing"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/stretchr/testify/assert"
)

func TestEventSampler(t *testing.T) {
resetDuration := config.Default.GetReloadableDurationVar(10, time.Second)
es := NewEventSampler(resetDuration)

t.Run("MarkSampleEventAsCollected and IsSampleEventCollected", func(t *testing.T) {
groupingColumnsHash := "test"
assert.False(t, es.IsSampleEventCollected(groupingColumnsHash))
es.MarkSampleEventAsCollected(groupingColumnsHash)
assert.True(t, es.IsSampleEventCollected(groupingColumnsHash))
assert.False(t, es.IsSampleEventCollected("new hash"))
})
}
69 changes: 69 additions & 0 deletions enterprise/reporting/grouping_columns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package reporting

import (
"crypto/md5"
"encoding/hex"
"strconv"

"github.com/rudderlabs/rudder-server/utils/types"
)

type GroupingColumns struct {
WorkspaceID string
// Namespace string
// InstanceID string
SourceDefinitionID string
SourceCategory string
SourceID string
DestinationDefinitionID string
DestinationID string
SourceTaskRunID string
SourceJobID string
SourceJobRunID string
TransformationID string
TransformationVersionID string
TrackingPlanID string
TrackingPlanVersion int
InPU string
PU string
Status string
TerminalState bool
InitialState bool
StatusCode int
EventName string
EventType string
ErrorType string
}

func NewGroupingColumns(metric types.PUReportedMetric) GroupingColumns {
return GroupingColumns{
WorkspaceID: metric.ConnectionDetails.SourceID,
SourceDefinitionID: metric.ConnectionDetails.SourceDefinitionId,
SourceCategory: metric.ConnectionDetails.SourceCategory,
SourceID: metric.ConnectionDetails.SourceID,
DestinationDefinitionID: metric.ConnectionDetails.DestinationDefinitionId,
DestinationID: metric.ConnectionDetails.DestinationID,
SourceTaskRunID: metric.ConnectionDetails.SourceTaskRunID,
SourceJobID: metric.ConnectionDetails.SourceJobID,
SourceJobRunID: metric.ConnectionDetails.SourceJobRunID,
TransformationID: metric.ConnectionDetails.TransformationID,
TransformationVersionID: metric.ConnectionDetails.TransformationVersionID,
TrackingPlanID: metric.ConnectionDetails.TrackingPlanID,
TrackingPlanVersion: metric.ConnectionDetails.TrackingPlanVersion,
InPU: metric.PUDetails.InPU,
PU: metric.PUDetails.PU,
Status: metric.StatusDetail.Status,
TerminalState: metric.PUDetails.TerminalPU,
InitialState: metric.PUDetails.InitialPU,
StatusCode: metric.StatusDetail.StatusCode,
EventName: metric.StatusDetail.EventName,
EventType: metric.StatusDetail.EventType,
ErrorType: metric.StatusDetail.ErrorType,
}
}

func (groupingColumns GroupingColumns) generateHash() string {
data := groupingColumns.WorkspaceID + groupingColumns.SourceDefinitionID + groupingColumns.SourceCategory + groupingColumns.SourceID + groupingColumns.DestinationDefinitionID + groupingColumns.DestinationID + groupingColumns.SourceTaskRunID + groupingColumns.SourceJobID + groupingColumns.SourceJobRunID + groupingColumns.TransformationID + groupingColumns.TransformationVersionID + groupingColumns.TrackingPlanID + strconv.Itoa(groupingColumns.TrackingPlanVersion) + groupingColumns.InPU + groupingColumns.PU + groupingColumns.Status + strconv.FormatBool(groupingColumns.TerminalState) + strconv.FormatBool(groupingColumns.InitialState) + strconv.Itoa(groupingColumns.StatusCode) + groupingColumns.EventName + groupingColumns.EventType + groupingColumns.ErrorType
hash := md5.Sum([]byte(data))
return hex.EncodeToString(hash[:])
}
69 changes: 69 additions & 0 deletions enterprise/reporting/grouping_columns_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package reporting

import (
"testing"

"github.com/rudderlabs/rudder-server/utils/types"
"github.com/stretchr/testify/assert"
)

func createMetricObject() types.PUReportedMetric {
return types.PUReportedMetric{
ConnectionDetails: types.ConnectionDetails{
SourceID: "some-source-id",
DestinationID: "some-destination-id",
},
PUDetails: types.PUDetails{
InPU: "some-in-pu",
PU: "some-pu",
},
StatusDetail: &types.StatusDetail{
Status: "some-status",
Count: 3,
StatusCode: 0,
SampleResponse: `{"some-sample-response-key": "some-sample-response-value"}`,
SampleEvent: []byte(`{"some-sample-event-key": "some-sample-event-value"}`),
EventName: "some-event-name",
EventType: "some-event-type",
},
}
}

func TestNewGroupingColumns(t *testing.T) {
t.Run("should create the correct grouping columns object from types.PUReportedMetric passed", func(t *testing.T) {
inputMetric := createMetricObject()
groupingColumns := NewGroupingColumns(inputMetric)
assert.Equal(t, "some-source-id", groupingColumns.SourceID)
assert.Equal(t, "some-event-name", groupingColumns.EventName)
})
}

func TestGenerateHash(t *testing.T) {
t.Run("same hash for same grouping columns", func(t *testing.T) {
inputMetric1 := createMetricObject()
groupingColumns1 := NewGroupingColumns(inputMetric1)

inputMetric2 := createMetricObject()
groupingColumns2 := NewGroupingColumns(inputMetric2)

hash1 := groupingColumns1.generateHash()
hash2 := groupingColumns2.generateHash()

assert.Equal(t, hash1, hash2)
})

t.Run("different hash for different grouping columns", func(t *testing.T) {
inputMetric1 := createMetricObject()
inputMetric1.StatusDetail.EventName = "some-event-name-1"
groupingColumns1 := NewGroupingColumns(inputMetric1)

inputMetric2 := createMetricObject()
inputMetric2.StatusDetail.EventName = "some-event-name-2"
groupingColumns2 := NewGroupingColumns(inputMetric2)

hash1 := groupingColumns1.generateHash()
hash2 := groupingColumns2.generateHash()

assert.NotEqual(t, hash1, hash2)
})
}
28 changes: 28 additions & 0 deletions enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type DefaultReporter struct {
getReportsQueryTime stats.Measurement
requestLatency stats.Measurement
stats stats.Stats

eventSamplingEnabled config.ValueLoader[bool]
eventSamplingDuration config.ValueLoader[time.Duration]
eventSampler *EventSampler
}

func NewDefaultReporter(ctx context.Context, log logger.Logger, configSubscriber *configSubscriber, stats stats.Stats) *DefaultReporter {
Expand All @@ -86,6 +90,8 @@ func NewDefaultReporter(ctx context.Context, log logger.Logger, configSubscriber
maxConcurrentRequests := config.GetReloadableIntVar(32, 1, "Reporting.maxConcurrentRequests")
maxOpenConnections := config.GetIntVar(32, 1, "Reporting.maxOpenConnections")
dbQueryTimeout = config.GetReloadableDurationVar(60, time.Second, "Reporting.dbQueryTimeout")
eventSamplingEnabled := config.GetReloadableBoolVar(false, "Reporting.eventSamplingEnabled")
eventSamplingDuration := config.GetReloadableDurationVar(1800, time.Second, "Reporting.eventSamplingDuration")
// only send reports for wh actions sources if whActionsOnly is configured
whActionsOnly := config.GetBool("REPORTING_WH_ACTIONS_ONLY", false)
if whActionsOnly {
Expand All @@ -112,6 +118,9 @@ func NewDefaultReporter(ctx context.Context, log logger.Logger, configSubscriber
maxConcurrentRequests: maxConcurrentRequests,
dbQueryTimeout: dbQueryTimeout,
stats: stats,
eventSamplingEnabled: eventSamplingEnabled,
eventSamplingDuration: eventSamplingDuration,
eventSampler: NewEventSampler(eventSamplingDuration),
}
}

Expand Down Expand Up @@ -372,6 +381,10 @@ func (r *DefaultReporter) mainLoop(ctx context.Context, c types.SyncerConfig) {

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
return r.eventSampler.StartResetLoop()
})

g.Go(func() error {
return r.emitLagMetric(ctx, c, &lastReportedAtTime)
})
Expand Down Expand Up @@ -573,6 +586,17 @@ func transformMetricForPII(metric types.PUReportedMetric, piiColumns []string) t
return metric
}

func (r *DefaultReporter) resetSampleEventIfCollected(metric *types.PUReportedMetric) {
isValidSampleEvent := metric.StatusDetail.SampleEvent != nil && len(metric.StatusDetail.SampleEvent) > 0 && string(metric.StatusDetail.SampleEvent) != "{}"
if isValidSampleEvent {
groupingColumnsHash := NewGroupingColumns(*metric).generateHash()
if !r.eventSampler.IsSampleEventCollected(groupingColumnsHash) {
r.eventSampler.MarkSampleEventAsCollected(groupingColumnsHash)
metric.StatusDetail.SampleEvent = []byte(`{}`)
}
}
}

func (r *DefaultReporter) Report(ctx context.Context, metrics []*types.PUReportedMetric, txn *Tx) error {
if len(metrics) == 0 {
return nil
Expand Down Expand Up @@ -625,6 +649,10 @@ func (r *DefaultReporter) Report(ctx context.Context, metrics []*types.PUReporte
metric.StatusDetail.EventName = types.MaxLengthExceeded
}

if r.eventSamplingEnabled.Load() {
r.resetSampleEventIfCollected(&metric)
}

_, err = stmt.Exec(
workspaceID, r.namespace, r.instanceID,
metric.ConnectionDetails.SourceDefinitionId,
Expand Down

0 comments on commit 44d2319

Please sign in to comment.