Skip to content

Commit

Permalink
feat: optimise reporting module queries (#4472)
Browse files Browse the repository at this point in the history
  • Loading branch information
itsmihir committed Apr 2, 2024
1 parent 1197b6d commit 0da9f28
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 35 deletions.
52 changes: 20 additions & 32 deletions enterprise/reporting/reporting.go
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/cenkalti/backoff/v4"
"github.com/lib/pq"
"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
Expand Down Expand Up @@ -183,7 +182,7 @@ func (r *DefaultReporter) getDBHandle(syncerKey string) (*sql.DB, error) {
}

func (r *DefaultReporter) getReports(currentMs int64, syncerKey string) (reports []*types.ReportByStatus, reportedAt int64, err error) {
sqlStatement := fmt.Sprintf(`SELECT reported_at FROM %s WHERE reported_at < %d ORDER BY reported_at ASC LIMIT 1`, ReportsTable, currentMs)
sqlStatement := fmt.Sprintf(`SELECT min(reported_at) FROM %s WHERE reported_at < $1`, ReportsTable)
var queryMin sql.NullInt64
dbHandle, err := r.getDBHandle(syncerKey)
if err != nil {
Expand All @@ -193,7 +192,7 @@ func (r *DefaultReporter) getReports(currentMs int64, syncerKey string) (reports
queryStart := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), r.dbQueryTimeout.Load())
defer cancel()
err = dbHandle.QueryRowContext(ctx, sqlStatement).Scan(&queryMin)
err = dbHandle.QueryRowContext(ctx, sqlStatement, currentMs).Scan(&queryMin)

if err != nil && err != sql.ErrNoRows && ctx.Err() == nil {
panic(err)
Expand All @@ -207,10 +206,11 @@ func (r *DefaultReporter) getReports(currentMs int64, syncerKey string) (reports
return nil, 0, nil
}

sqlStatement = fmt.Sprintf(`SELECT workspace_id, namespace, instance_id, source_definition_id, source_category, source_id, destination_definition_id, destination_id, source_task_run_id, source_job_id, source_job_run_id, transformation_id, transformation_version_id, tracking_plan_id, tracking_plan_version, in_pu, pu, reported_at, status, count, violation_count, terminal_state, initial_state, status_code, sample_response, sample_event, event_name, event_type, error_type FROM %s WHERE reported_at = %d`, ReportsTable, queryMin.Int64)
groupByColumns := "workspace_id, namespace, instance_id, source_definition_id, source_category, source_id, destination_definition_id, destination_id, source_task_run_id, source_job_id, source_job_run_id, transformation_id, transformation_version_id, tracking_plan_id, tracking_plan_version, in_pu, pu, reported_at, status, terminal_state, initial_state, status_code, event_name, event_type, error_type"
sqlStatement = fmt.Sprintf(`SELECT %s, (ARRAY_AGG(sample_response order by id))[1], (ARRAY_AGG(sample_event order by id))[1], SUM(count), SUM(violation_count) FROM %s WHERE reported_at = $1 GROUP BY %s`, groupByColumns, ReportsTable, groupByColumns)
var rows *sql.Rows
queryStart = time.Now()
rows, err = dbHandle.Query(sqlStatement)
rows, err = dbHandle.Query(sqlStatement, queryMin.Int64)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -238,12 +238,12 @@ func (r *DefaultReporter) getReports(currentMs int64, syncerKey string) (reports
&metricReport.PUDetails.InPU, &metricReport.PUDetails.PU,
&metricReport.ReportedAt,
&metricReport.StatusDetail.Status,
&metricReport.StatusDetail.Count, &metricReport.StatusDetail.ViolationCount,
&metricReport.PUDetails.TerminalPU, &metricReport.PUDetails.InitialPU,
&metricReport.StatusDetail.StatusCode,
&metricReport.StatusDetail.SampleResponse, &metricReport.StatusDetail.SampleEvent,
&metricReport.StatusDetail.EventName, &metricReport.StatusDetail.EventType,
&metricReport.StatusDetail.ErrorType,
&metricReport.StatusDetail.SampleResponse, &metricReport.StatusDetail.SampleEvent,
&metricReport.StatusDetail.Count, &metricReport.StatusDetail.ViolationCount,
)
if err != nil {
panic(err)
Expand All @@ -260,6 +260,7 @@ func (r *DefaultReporter) getReports(currentMs int64, syncerKey string) (reports

func (*DefaultReporter) getAggregatedReports(reports []*types.ReportByStatus) []*types.Metric {
metricsByGroup := map[string]*types.Metric{}
var values []*types.Metric

reportIdentifier := func(report *types.ReportByStatus) string {
groupingIdentifiers := []string{
Expand Down Expand Up @@ -314,35 +315,22 @@ func (*DefaultReporter) getAggregatedReports(reports []*types.ReportByStatus) []
ReportedAt: report.ReportedAt * 60 * 1000, // send reportedAt in milliseconds
},
}
values = append(values, metricsByGroup[identifier])
}
statusDetailInterface, found := lo.Find(metricsByGroup[identifier].StatusDetails, func(i *types.StatusDetail) bool {
return i.Status == report.StatusDetail.Status && i.StatusCode == report.StatusDetail.StatusCode && i.ErrorType == report.StatusDetail.ErrorType

metricsByGroup[identifier].StatusDetails = append(metricsByGroup[identifier].StatusDetails, &types.StatusDetail{
Status: report.StatusDetail.Status,
StatusCode: report.StatusDetail.StatusCode,
Count: report.StatusDetail.Count,
ViolationCount: report.StatusDetail.ViolationCount,
SampleResponse: report.StatusDetail.SampleResponse,
SampleEvent: report.StatusDetail.SampleEvent,
EventName: report.StatusDetail.EventName,
EventType: report.StatusDetail.EventType,
ErrorType: report.StatusDetail.ErrorType,
})
if !found {
metricsByGroup[identifier].StatusDetails = append(metricsByGroup[identifier].StatusDetails, &types.StatusDetail{
Status: report.StatusDetail.Status,
StatusCode: report.StatusDetail.StatusCode,
Count: report.StatusDetail.Count,
ViolationCount: report.StatusDetail.ViolationCount,
SampleResponse: report.StatusDetail.SampleResponse,
SampleEvent: report.StatusDetail.SampleEvent,
EventName: report.StatusDetail.EventName,
EventType: report.StatusDetail.EventType,
ErrorType: report.StatusDetail.ErrorType,
})
continue
}
statusDetail := statusDetailInterface
statusDetail.Count += report.StatusDetail.Count
statusDetail.ViolationCount += report.StatusDetail.ViolationCount
statusDetail.SampleResponse = report.StatusDetail.SampleResponse
statusDetail.SampleEvent = report.StatusDetail.SampleEvent
}

var values []*types.Metric
for _, val := range metricsByGroup {
values = append(values, val)
}
return values
}

Expand Down
35 changes: 32 additions & 3 deletions enterprise/reporting/reporting_test.go
Expand Up @@ -135,7 +135,7 @@ var _ = Describe("Reporting", func() {
WorkspaceID: "some-workspace-id",
},
ConnectionDetails: types.ConnectionDetails{
SourceID: "some-source-id",
SourceID: "some-source-id-2",
DestinationID: "some-destination-id",
TransformationID: "some-transformation-id",
TrackingPlanID: "some-tracking-plan-id",
Expand Down Expand Up @@ -189,8 +189,37 @@ var _ = Describe("Reporting", func() {
},
{
Status: "some-status",
Count: 5,
ViolationCount: 20,
Count: 2,
ViolationCount: 10,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
ErrorType: "some-error-type",
},
},
},
{
InstanceDetails: types.InstanceDetails{
WorkspaceID: "some-workspace-id",
},
ConnectionDetails: types.ConnectionDetails{
SourceID: "some-source-id-2",
DestinationID: "some-destination-id",
TransformationID: "some-transformation-id",
TrackingPlanID: "some-tracking-plan-id",
},
PUDetails: types.PUDetails{
InPU: "some-in-pu",
PU: "some-pu",
},
ReportMetadata: types.ReportMetadata{
ReportedAt: 28017690 * 60 * 1000,
},
StatusDetails: []*types.StatusDetail{
{
Status: "some-status",
Count: 3,
ViolationCount: 10,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
Expand Down

0 comments on commit 0da9f28

Please sign in to comment.