Skip to content

Commit

Permalink
feat(processor): enhance reports to hold transformation and tracking …
Browse files Browse the repository at this point in the history
…plan metrics (#3138)

* feat(reports): add reports for transformation and tracking plan
  • Loading branch information
Jayachand committed Apr 20, 2023
1 parent 460890d commit 865ad6c
Show file tree
Hide file tree
Showing 10 changed files with 501 additions and 71 deletions.
110 changes: 100 additions & 10 deletions enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
"github.com/thoas/go-funk"
"github.com/samber/lo"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -217,7 +217,7 @@ func (r *HandleT) getReports(currentMs int64, clientName string) (reports []*typ
return nil, 0
}

sqlStatement = fmt.Sprintf(`SELECT workspace_id, namespace, instance_id, source_definition_id, source_category, source_id, destination_definition_id, destination_id, source_task_run_id, source_job_id, source_job_run_id, in_pu, pu, reported_at, status, count, terminal_state, initial_state, status_code, sample_response, sample_event, event_name, event_type FROM %s WHERE reported_at = %d`, ReportsTable, queryMin.Int64)
sqlStatement = fmt.Sprintf(`SELECT workspace_id, namespace, instance_id, source_definition_id, source_category, source_id, destination_definition_id, destination_id, source_task_run_id, source_job_id, source_job_run_id, transformation_id, transformation_version_id, tracking_plan_id, tracking_plan_version, in_pu, pu, reported_at, status, count, violation_count, terminal_state, initial_state, status_code, sample_response, sample_event, event_name, event_type, error_type FROM %s WHERE reported_at = %d`, ReportsTable, queryMin.Int64)
var rows *sql.Rows
queryStart = time.Now()
rows, err = dbHandle.Query(sqlStatement)
Expand All @@ -230,7 +230,30 @@ func (r *HandleT) getReports(currentMs int64, clientName string) (reports []*typ
var metricReports []*types.ReportByStatus
for rows.Next() {
metricReport := types.ReportByStatus{StatusDetail: &types.StatusDetail{}}
err = rows.Scan(&metricReport.InstanceDetails.WorkspaceID, &metricReport.InstanceDetails.Namespace, &metricReport.InstanceDetails.InstanceID, &metricReport.ConnectionDetails.SourceDefinitionId, &metricReport.ConnectionDetails.SourceCategory, &metricReport.ConnectionDetails.SourceID, &metricReport.ConnectionDetails.DestinationDefinitionId, &metricReport.ConnectionDetails.DestinationID, &metricReport.ConnectionDetails.SourceTaskRunID, &metricReport.ConnectionDetails.SourceJobID, &metricReport.ConnectionDetails.SourceJobRunID, &metricReport.PUDetails.InPU, &metricReport.PUDetails.PU, &metricReport.ReportedAt, &metricReport.StatusDetail.Status, &metricReport.StatusDetail.Count, &metricReport.PUDetails.TerminalPU, &metricReport.PUDetails.InitialPU, &metricReport.StatusDetail.StatusCode, &metricReport.StatusDetail.SampleResponse, &metricReport.StatusDetail.SampleEvent, &metricReport.StatusDetail.EventName, &metricReport.StatusDetail.EventType)
err = rows.Scan(
&metricReport.InstanceDetails.WorkspaceID, &metricReport.InstanceDetails.Namespace, &metricReport.InstanceDetails.InstanceID,
&metricReport.ConnectionDetails.SourceDefinitionId,
&metricReport.ConnectionDetails.SourceCategory,
&metricReport.ConnectionDetails.SourceID,
&metricReport.ConnectionDetails.DestinationDefinitionId,
&metricReport.ConnectionDetails.DestinationID,
&metricReport.ConnectionDetails.SourceTaskRunID,
&metricReport.ConnectionDetails.SourceJobID,
&metricReport.ConnectionDetails.SourceJobRunID,
&metricReport.ConnectionDetails.TransformationID,
&metricReport.ConnectionDetails.TransformationVersionID,
&metricReport.ConnectionDetails.TrackingPlanID,
&metricReport.ConnectionDetails.TrackingPlanVersion,
&metricReport.PUDetails.InPU, &metricReport.PUDetails.PU,
&metricReport.ReportedAt,
&metricReport.StatusDetail.Status,
&metricReport.StatusDetail.Count, &metricReport.StatusDetail.ViolationCount,
&metricReport.PUDetails.TerminalPU, &metricReport.PUDetails.InitialPU,
&metricReport.StatusDetail.StatusCode,
&metricReport.StatusDetail.SampleResponse, &metricReport.StatusDetail.SampleEvent,
&metricReport.StatusDetail.EventName, &metricReport.StatusDetail.EventType,
&metricReport.StatusDetail.ErrorType,
)
if err != nil {
panic(err)
}
Expand All @@ -244,7 +267,22 @@ func (*HandleT) getAggregatedReports(reports []*types.ReportByStatus) []*types.M
metricsByGroup := map[string]*types.Metric{}

reportIdentifier := func(report *types.ReportByStatus) string {
groupingIdentifiers := []string{report.InstanceDetails.WorkspaceID, report.InstanceDetails.Namespace, report.InstanceDetails.InstanceID, report.ConnectionDetails.SourceID, report.ConnectionDetails.DestinationID, report.ConnectionDetails.SourceTaskRunID, report.ConnectionDetails.SourceJobID, report.ConnectionDetails.SourceJobRunID, report.PUDetails.InPU, report.PUDetails.PU, report.StatusDetail.Status, fmt.Sprint(report.StatusDetail.StatusCode), report.StatusDetail.EventName, report.StatusDetail.EventType}
groupingIdentifiers := []string{
report.InstanceDetails.WorkspaceID, report.InstanceDetails.Namespace, report.InstanceDetails.InstanceID,
report.ConnectionDetails.SourceID,
report.ConnectionDetails.DestinationID,
report.ConnectionDetails.SourceTaskRunID,
report.ConnectionDetails.SourceJobID,
report.ConnectionDetails.SourceJobRunID,
report.ConnectionDetails.TransformationID,
report.ConnectionDetails.TransformationVersionID,
report.ConnectionDetails.TrackingPlanID,
strconv.Itoa(report.ConnectionDetails.TrackingPlanVersion),
report.PUDetails.InPU, report.PUDetails.PU,
report.StatusDetail.Status,
strconv.Itoa(report.StatusDetail.StatusCode),
report.StatusDetail.EventName, report.StatusDetail.EventType,
}
return strings.Join(groupingIdentifiers, `::`)
}

Expand All @@ -266,6 +304,10 @@ func (*HandleT) getAggregatedReports(reports []*types.ReportByStatus) []*types.M
SourceTaskRunID: report.SourceTaskRunID,
SourceJobID: report.SourceJobID,
SourceJobRunID: report.SourceJobRunID,
TransformationID: report.TransformationID,
TransformationVersionID: report.TransformationVersionID,
TrackingPlanID: report.TrackingPlanID,
TrackingPlanVersion: report.TrackingPlanVersion,
},
PUDetails: types.PUDetails{
InPU: report.InPU,
Expand All @@ -278,23 +320,26 @@ func (*HandleT) getAggregatedReports(reports []*types.ReportByStatus) []*types.M
},
}
}
statusDetailInterface := funk.Find(metricsByGroup[identifier].StatusDetails, func(i *types.StatusDetail) bool {
return i.Status == report.StatusDetail.Status && i.StatusCode == report.StatusDetail.StatusCode
statusDetailInterface, found := lo.Find(metricsByGroup[identifier].StatusDetails, func(i *types.StatusDetail) bool {
return i.Status == report.StatusDetail.Status && i.StatusCode == report.StatusDetail.StatusCode && i.ErrorType == report.StatusDetail.ErrorType
})
if statusDetailInterface == nil {
if !found {
metricsByGroup[identifier].StatusDetails = append(metricsByGroup[identifier].StatusDetails, &types.StatusDetail{
Status: report.StatusDetail.Status,
StatusCode: report.StatusDetail.StatusCode,
Count: report.StatusDetail.Count,
ViolationCount: report.StatusDetail.ViolationCount,
SampleResponse: report.StatusDetail.SampleResponse,
SampleEvent: report.StatusDetail.SampleEvent,
EventName: report.StatusDetail.EventName,
EventType: report.StatusDetail.EventType,
ErrorType: report.StatusDetail.ErrorType,
})
continue
}
statusDetail := statusDetailInterface.(*types.StatusDetail)
statusDetail := statusDetailInterface
statusDetail.Count += report.StatusDetail.Count
statusDetail.ViolationCount += report.StatusDetail.ViolationCount
statusDetail.SampleResponse = report.StatusDetail.SampleResponse
statusDetail.SampleEvent = report.StatusDetail.SampleEvent
}
Expand Down Expand Up @@ -484,7 +529,30 @@ func (r *HandleT) Report(metrics []*types.PUReportedMetric, txn *sql.Tx) {
return
}

stmt, err := txn.Prepare(pq.CopyIn(ReportsTable, "workspace_id", "namespace", "instance_id", "source_definition_id", "source_category", "source_id", "destination_definition_id", "destination_id", "source_task_run_id", "source_job_id", "source_job_run_id", "in_pu", "pu", "reported_at", "status", "count", "terminal_state", "initial_state", "status_code", "sample_response", "sample_event", "event_name", "event_type"))
stmt, err := txn.Prepare(pq.CopyIn(ReportsTable,
"workspace_id", "namespace", "instance_id",
"source_definition_id",
"source_category",
"source_id",
"destination_definition_id",
"destination_id",
"source_task_run_id",
"source_job_id",
"source_job_run_id",
"transformation_id",
"transformation_version_id",
"tracking_plan_id",
"tracking_plan_version",
"in_pu", "pu",
"reported_at",
"status",
"count", "violation_count",
"terminal_state", "initial_state",
"status_code",
"sample_response", "sample_event",
"event_name", "event_type",
"error_type",
))
if err != nil {
_ = txn.Rollback()
panic(err)
Expand All @@ -499,7 +567,29 @@ func (r *HandleT) Report(metrics []*types.PUReportedMetric, txn *sql.Tx) {
metric = transformMetricForPII(metric, getPIIColumnsToExclude())
}

_, err = stmt.Exec(workspaceID, r.namespace, r.instanceID, metric.ConnectionDetails.SourceDefinitionId, metric.ConnectionDetails.SourceCategory, metric.ConnectionDetails.SourceID, metric.ConnectionDetails.DestinationDefinitionId, metric.ConnectionDetails.DestinationID, metric.ConnectionDetails.SourceTaskRunID, metric.ConnectionDetails.SourceJobID, metric.ConnectionDetails.SourceJobRunID, metric.PUDetails.InPU, metric.PUDetails.PU, reportedAt, metric.StatusDetail.Status, metric.StatusDetail.Count, metric.PUDetails.TerminalPU, metric.PUDetails.InitialPU, metric.StatusDetail.StatusCode, metric.StatusDetail.SampleResponse, string(metric.StatusDetail.SampleEvent), metric.StatusDetail.EventName, metric.StatusDetail.EventType)
_, err = stmt.Exec(
workspaceID, r.namespace, r.instanceID,
metric.ConnectionDetails.SourceDefinitionId,
metric.ConnectionDetails.SourceCategory,
metric.ConnectionDetails.SourceID,
metric.ConnectionDetails.DestinationDefinitionId,
metric.ConnectionDetails.DestinationID,
metric.ConnectionDetails.SourceTaskRunID,
metric.ConnectionDetails.SourceJobID,
metric.ConnectionDetails.SourceJobRunID,
metric.ConnectionDetails.TransformationID,
metric.ConnectionDetails.TransformationVersionID,
metric.ConnectionDetails.TrackingPlanID,
metric.ConnectionDetails.TrackingPlanVersion,
metric.PUDetails.InPU, metric.PUDetails.PU,
reportedAt,
metric.StatusDetail.Status,
metric.StatusDetail.Count, metric.StatusDetail.ViolationCount,
metric.PUDetails.TerminalPU, metric.PUDetails.InitialPU,
metric.StatusDetail.StatusCode,
metric.StatusDetail.SampleResponse, string(metric.StatusDetail.SampleEvent),
metric.StatusDetail.EventName, metric.StatusDetail.EventType,
metric.StatusDetail.ErrorType)
if err != nil {
panic(err)
}
Expand Down
133 changes: 133 additions & 0 deletions enterprise/reporting/reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/rudderlabs/rudder-go-kit/logger"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
mock_backendconfig "github.com/rudderlabs/rudder-server/mocks/backend-config"
"github.com/rudderlabs/rudder-server/utils/pubsub"
Expand Down Expand Up @@ -72,6 +73,138 @@ var _ = Describe("Reporting", func() {
assertReportMetric(expectedResponse, transformedMetric)
})
})

Context("getAggregatedReports Tests", func() {
inputReports := []*types.ReportByStatus{
{
InstanceDetails: types.InstanceDetails{
WorkspaceID: "some-workspace-id",
},
ConnectionDetails: types.ConnectionDetails{
SourceID: "some-source-id",
DestinationID: "some-destination-id",
TransformationID: "some-transformation-id",
TrackingPlanID: "some-tracking-plan-id",
},
PUDetails: types.PUDetails{
InPU: "some-in-pu",
PU: "some-pu",
},
ReportMetadata: types.ReportMetadata{
ReportedAt: 28017690,
},
StatusDetail: &types.StatusDetail{
Status: "some-status",
Count: 3,
ViolationCount: 5,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
ErrorType: "",
},
},
{
InstanceDetails: types.InstanceDetails{
WorkspaceID: "some-workspace-id",
},
ConnectionDetails: types.ConnectionDetails{
SourceID: "some-source-id",
DestinationID: "some-destination-id",
TransformationID: "some-transformation-id",
TrackingPlanID: "some-tracking-plan-id",
},
PUDetails: types.PUDetails{
InPU: "some-in-pu",
PU: "some-pu",
},
ReportMetadata: types.ReportMetadata{
ReportedAt: 28017690,
},
StatusDetail: &types.StatusDetail{
Status: "some-status",
Count: 2,
ViolationCount: 10,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
ErrorType: "some-error-type",
},
},
{
InstanceDetails: types.InstanceDetails{
WorkspaceID: "some-workspace-id",
},
ConnectionDetails: types.ConnectionDetails{
SourceID: "some-source-id",
DestinationID: "some-destination-id",
TransformationID: "some-transformation-id",
TrackingPlanID: "some-tracking-plan-id",
},
PUDetails: types.PUDetails{
InPU: "some-in-pu",
PU: "some-pu",
},
ReportMetadata: types.ReportMetadata{
ReportedAt: 28017690,
},
StatusDetail: &types.StatusDetail{
Status: "some-status",
Count: 3,
ViolationCount: 10,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
ErrorType: "some-error-type",
},
},
}

expectedResponse := []*types.Metric{
{
InstanceDetails: types.InstanceDetails{
WorkspaceID: "some-workspace-id",
},
ConnectionDetails: types.ConnectionDetails{
SourceID: "some-source-id",
DestinationID: "some-destination-id",
TransformationID: "some-transformation-id",
TrackingPlanID: "some-tracking-plan-id",
},
PUDetails: types.PUDetails{
InPU: "some-in-pu",
PU: "some-pu",
},
ReportMetadata: types.ReportMetadata{
ReportedAt: 28017690 * 60 * 1000,
},
StatusDetails: []*types.StatusDetail{
{
Status: "some-status",
Count: 3,
ViolationCount: 5,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
ErrorType: "",
},
{
Status: "some-status",
Count: 5,
ViolationCount: 20,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
ErrorType: "some-error-type",
},
},
},
}

reportHandle := NewFromEnvConfig(logger.NOP)

aggregatedMetrics := reportHandle.getAggregatedReports(inputReports)
Expect(aggregatedMetrics).To(Equal(expectedResponse))
})
})

func assertReportMetric(expectedMetric, actualMetric types.PUReportedMetric) {
Expand Down
Loading

0 comments on commit 865ad6c

Please sign in to comment.