Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(processor): enhance reports to hold transformation and tracking plan metrics #3138

Merged
merged 24 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e3a9277
feat(reports): add tracking plan diff status reports
Jayachand Mar 29, 2023
015dece
nested map
Jayachand Mar 29, 2023
ec41346
stage identifier
Jayachand Mar 29, 2023
6c7ed75
Merge branch 'master' into feat.tpStateReportMetrics
Jayachand Apr 3, 2023
ad1228a
feat: add event payload, aggreagte
Jayachand Apr 6, 2023
40ceab6
Merge branch 'master' into feat.tpStateReportMetrics
Jayachand Apr 10, 2023
4712f97
Merge branch 'master' into feat.tpStateReportMetrics
Jayachand Apr 10, 2023
8b4940a
test cases
Jayachand Apr 10, 2023
ef3f929
Merge branch 'master' into feat.tpStateReportMetrics
Jayachand Apr 12, 2023
64481c8
transformation metric changes
Jayachand Apr 12, 2023
84a9a2c
minor changes
Jayachand Apr 12, 2023
9bd588b
Merge branch 'master' into feat.tpStateReportMetrics
Jayachand Apr 13, 2023
40a7cb7
Merge branch 'master' into feat.tpStateReportMetrics
Jayachand Apr 14, 2023
91e4bfa
Merge branch 'master' into feat.tpStateReportMetrics
Jayachand Apr 14, 2023
c5a026b
Merge branch 'master' into feat.tpStateReportMetrics
Jayachand Apr 15, 2023
54efc10
logger, str conversion
Jayachand Apr 17, 2023
62ce533
convention Id to ID
Jayachand Apr 17, 2023
178c688
statags, metadata
Jayachand Apr 17, 2023
2ba0af4
NOP
Jayachand Apr 17, 2023
7b8ef97
lo,succeeded state
Jayachand Apr 19, 2023
97f45ec
Merge branch 'master' into feat.tpStateReportMetrics
Jayachand Apr 19, 2023
0a7c9cc
fmt
Jayachand Apr 19, 2023
dc7791d
Merge branch 'master' into feat.tpStateReportMetrics
Jayachand Apr 19, 2023
cd9826c
Merge branch 'master' into feat.tpStateReportMetrics
Jayachand Apr 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
106 changes: 98 additions & 8 deletions enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (r *HandleT) getReports(currentMs int64, clientName string) (reports []*typ
return nil, 0
}

sqlStatement = fmt.Sprintf(`SELECT workspace_id, namespace, instance_id, source_definition_id, source_category, source_id, destination_definition_id, destination_id, source_task_run_id, source_job_id, source_job_run_id, in_pu, pu, reported_at, status, count, terminal_state, initial_state, status_code, sample_response, sample_event, event_name, event_type FROM %s WHERE reported_at = %d`, ReportsTable, queryMin.Int64)
sqlStatement = fmt.Sprintf(`SELECT workspace_id, namespace, instance_id, source_definition_id, source_category, source_id, destination_definition_id, destination_id, source_task_run_id, source_job_id, source_job_run_id, transformation_id, transformation_version_id, tracking_plan_id, tracking_plan_version, in_pu, pu, reported_at, status, count, violation_count, terminal_state, initial_state, status_code, sample_response, sample_event, event_name, event_type, error_type FROM %s WHERE reported_at = %d`, ReportsTable, queryMin.Int64)
var rows *sql.Rows
queryStart = time.Now()
rows, err = dbHandle.Query(sqlStatement)
Expand All @@ -230,7 +230,30 @@ func (r *HandleT) getReports(currentMs int64, clientName string) (reports []*typ
var metricReports []*types.ReportByStatus
for rows.Next() {
metricReport := types.ReportByStatus{StatusDetail: &types.StatusDetail{}}
err = rows.Scan(&metricReport.InstanceDetails.WorkspaceID, &metricReport.InstanceDetails.Namespace, &metricReport.InstanceDetails.InstanceID, &metricReport.ConnectionDetails.SourceDefinitionId, &metricReport.ConnectionDetails.SourceCategory, &metricReport.ConnectionDetails.SourceID, &metricReport.ConnectionDetails.DestinationDefinitionId, &metricReport.ConnectionDetails.DestinationID, &metricReport.ConnectionDetails.SourceTaskRunID, &metricReport.ConnectionDetails.SourceJobID, &metricReport.ConnectionDetails.SourceJobRunID, &metricReport.PUDetails.InPU, &metricReport.PUDetails.PU, &metricReport.ReportedAt, &metricReport.StatusDetail.Status, &metricReport.StatusDetail.Count, &metricReport.PUDetails.TerminalPU, &metricReport.PUDetails.InitialPU, &metricReport.StatusDetail.StatusCode, &metricReport.StatusDetail.SampleResponse, &metricReport.StatusDetail.SampleEvent, &metricReport.StatusDetail.EventName, &metricReport.StatusDetail.EventType)
err = rows.Scan(
&metricReport.InstanceDetails.WorkspaceID, &metricReport.InstanceDetails.Namespace, &metricReport.InstanceDetails.InstanceID,
&metricReport.ConnectionDetails.SourceDefinitionId,
&metricReport.ConnectionDetails.SourceCategory,
&metricReport.ConnectionDetails.SourceID,
&metricReport.ConnectionDetails.DestinationDefinitionId,
&metricReport.ConnectionDetails.DestinationID,
&metricReport.ConnectionDetails.SourceTaskRunID,
&metricReport.ConnectionDetails.SourceJobID,
&metricReport.ConnectionDetails.SourceJobRunID,
&metricReport.ConnectionDetails.TransformationId,
&metricReport.ConnectionDetails.TransformationVersionId,
&metricReport.ConnectionDetails.TrackingPlanId,
&metricReport.ConnectionDetails.TrackingPlanVersion,
&metricReport.PUDetails.InPU, &metricReport.PUDetails.PU,
&metricReport.ReportedAt,
&metricReport.StatusDetail.Status,
&metricReport.StatusDetail.Count, &metricReport.StatusDetail.ViolationCount,
&metricReport.PUDetails.TerminalPU, &metricReport.PUDetails.InitialPU,
&metricReport.StatusDetail.StatusCode,
&metricReport.StatusDetail.SampleResponse, &metricReport.StatusDetail.SampleEvent,
&metricReport.StatusDetail.EventName, &metricReport.StatusDetail.EventType,
&metricReport.StatusDetail.ErrorType,
)
if err != nil {
panic(err)
}
Expand All @@ -244,7 +267,22 @@ func (*HandleT) getAggregatedReports(reports []*types.ReportByStatus) []*types.M
metricsByGroup := map[string]*types.Metric{}

reportIdentifier := func(report *types.ReportByStatus) string {
groupingIdentifiers := []string{report.InstanceDetails.WorkspaceID, report.InstanceDetails.Namespace, report.InstanceDetails.InstanceID, report.ConnectionDetails.SourceID, report.ConnectionDetails.DestinationID, report.ConnectionDetails.SourceTaskRunID, report.ConnectionDetails.SourceJobID, report.ConnectionDetails.SourceJobRunID, report.PUDetails.InPU, report.PUDetails.PU, report.StatusDetail.Status, fmt.Sprint(report.StatusDetail.StatusCode), report.StatusDetail.EventName, report.StatusDetail.EventType}
groupingIdentifiers := []string{
report.InstanceDetails.WorkspaceID, report.InstanceDetails.Namespace, report.InstanceDetails.InstanceID,
report.ConnectionDetails.SourceID,
report.ConnectionDetails.DestinationID,
report.ConnectionDetails.SourceTaskRunID,
report.ConnectionDetails.SourceJobID,
report.ConnectionDetails.SourceJobRunID,
report.ConnectionDetails.TransformationId,
report.ConnectionDetails.TransformationVersionId,
report.ConnectionDetails.TrackingPlanId,
fmt.Sprint(report.ConnectionDetails.TrackingPlanVersion),
Jayachand marked this conversation as resolved.
Show resolved Hide resolved
report.PUDetails.InPU, report.PUDetails.PU,
report.StatusDetail.Status,
fmt.Sprint(report.StatusDetail.StatusCode),
Jayachand marked this conversation as resolved.
Show resolved Hide resolved
report.StatusDetail.EventName, report.StatusDetail.EventType,
}
return strings.Join(groupingIdentifiers, `::`)
}

Expand All @@ -266,6 +304,10 @@ func (*HandleT) getAggregatedReports(reports []*types.ReportByStatus) []*types.M
SourceTaskRunID: report.SourceTaskRunID,
SourceJobID: report.SourceJobID,
SourceJobRunID: report.SourceJobRunID,
TransformationId: report.TransformationId,
TransformationVersionId: report.TransformationVersionId,
TrackingPlanId: report.TrackingPlanId,
TrackingPlanVersion: report.TrackingPlanVersion,
},
PUDetails: types.PUDetails{
InPU: report.InPU,
Expand All @@ -279,22 +321,25 @@ func (*HandleT) getAggregatedReports(reports []*types.ReportByStatus) []*types.M
}
}
statusDetailInterface := funk.Find(metricsByGroup[identifier].StatusDetails, func(i *types.StatusDetail) bool {
return i.Status == report.StatusDetail.Status && i.StatusCode == report.StatusDetail.StatusCode
return i.Status == report.StatusDetail.Status && i.StatusCode == report.StatusDetail.StatusCode && i.ErrorType == report.StatusDetail.ErrorType
})
if statusDetailInterface == nil {
metricsByGroup[identifier].StatusDetails = append(metricsByGroup[identifier].StatusDetails, &types.StatusDetail{
Status: report.StatusDetail.Status,
StatusCode: report.StatusDetail.StatusCode,
Count: report.StatusDetail.Count,
ViolationCount: report.StatusDetail.ViolationCount,
SampleResponse: report.StatusDetail.SampleResponse,
SampleEvent: report.StatusDetail.SampleEvent,
EventName: report.StatusDetail.EventName,
EventType: report.StatusDetail.EventType,
ErrorType: report.StatusDetail.ErrorType,
})
continue
}
statusDetail := statusDetailInterface.(*types.StatusDetail)
atzoum marked this conversation as resolved.
Show resolved Hide resolved
statusDetail.Count += report.StatusDetail.Count
statusDetail.ViolationCount += report.StatusDetail.ViolationCount
statusDetail.SampleResponse = report.StatusDetail.SampleResponse
statusDetail.SampleEvent = report.StatusDetail.SampleEvent
}
Expand Down Expand Up @@ -409,7 +454,7 @@ func (r *HandleT) sendMetric(ctx context.Context, netClient *http.Client, client
httpRequestStart := time.Now()
resp, err := netClient.Do(req)
if err != nil {
r.log.Error(err.Error())
// r.log.Error(err.Error())
Jayachand marked this conversation as resolved.
Show resolved Hide resolved
return err
}

Expand All @@ -433,7 +478,7 @@ func (r *HandleT) sendMetric(ctx context.Context, netClient *http.Client, client

b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
err = backoff.RetryNotify(operation, b, func(err error, t time.Duration) {
r.log.Errorf(`[ Reporting ]: Error reporting to service: %v`, err)
// r.log.Errorf(`[ Reporting ]: Error reporting to service: %v`, err)
Jayachand marked this conversation as resolved.
Show resolved Hide resolved
})
if err != nil {
r.log.Errorf(`[ Reporting ]: Error making request to reporting service: %v`, err)
Expand Down Expand Up @@ -484,7 +529,30 @@ func (r *HandleT) Report(metrics []*types.PUReportedMetric, txn *sql.Tx) {
return
}

stmt, err := txn.Prepare(pq.CopyIn(ReportsTable, "workspace_id", "namespace", "instance_id", "source_definition_id", "source_category", "source_id", "destination_definition_id", "destination_id", "source_task_run_id", "source_job_id", "source_job_run_id", "in_pu", "pu", "reported_at", "status", "count", "terminal_state", "initial_state", "status_code", "sample_response", "sample_event", "event_name", "event_type"))
stmt, err := txn.Prepare(pq.CopyIn(ReportsTable,
"workspace_id", "namespace", "instance_id",
"source_definition_id",
"source_category",
"source_id",
"destination_definition_id",
"destination_id",
"source_task_run_id",
"source_job_id",
"source_job_run_id",
"transformation_id",
"transformation_version_id",
"tracking_plan_id",
"tracking_plan_version",
"in_pu", "pu",
"reported_at",
"status",
"count", "violation_count",
"terminal_state", "initial_state",
"status_code",
"sample_response", "sample_event",
"event_name", "event_type",
"error_type",
))
if err != nil {
_ = txn.Rollback()
panic(err)
Expand All @@ -499,7 +567,29 @@ func (r *HandleT) Report(metrics []*types.PUReportedMetric, txn *sql.Tx) {
metric = transformMetricForPII(metric, getPIIColumnsToExclude())
}

_, err = stmt.Exec(workspaceID, r.namespace, r.instanceID, metric.ConnectionDetails.SourceDefinitionId, metric.ConnectionDetails.SourceCategory, metric.ConnectionDetails.SourceID, metric.ConnectionDetails.DestinationDefinitionId, metric.ConnectionDetails.DestinationID, metric.ConnectionDetails.SourceTaskRunID, metric.ConnectionDetails.SourceJobID, metric.ConnectionDetails.SourceJobRunID, metric.PUDetails.InPU, metric.PUDetails.PU, reportedAt, metric.StatusDetail.Status, metric.StatusDetail.Count, metric.PUDetails.TerminalPU, metric.PUDetails.InitialPU, metric.StatusDetail.StatusCode, metric.StatusDetail.SampleResponse, string(metric.StatusDetail.SampleEvent), metric.StatusDetail.EventName, metric.StatusDetail.EventType)
_, err = stmt.Exec(
workspaceID, r.namespace, r.instanceID,
metric.ConnectionDetails.SourceDefinitionId,
metric.ConnectionDetails.SourceCategory,
metric.ConnectionDetails.SourceID,
metric.ConnectionDetails.DestinationDefinitionId,
metric.ConnectionDetails.DestinationID,
metric.ConnectionDetails.SourceTaskRunID,
metric.ConnectionDetails.SourceJobID,
metric.ConnectionDetails.SourceJobRunID,
metric.ConnectionDetails.TransformationId,
metric.ConnectionDetails.TransformationVersionId,
metric.ConnectionDetails.TrackingPlanId,
metric.ConnectionDetails.TrackingPlanVersion,
metric.PUDetails.InPU, metric.PUDetails.PU,
reportedAt,
metric.StatusDetail.Status,
metric.StatusDetail.Count, metric.StatusDetail.ViolationCount,
metric.PUDetails.TerminalPU, metric.PUDetails.InitialPU,
metric.StatusDetail.StatusCode,
metric.StatusDetail.SampleResponse, string(metric.StatusDetail.SampleEvent),
metric.StatusDetail.EventName, metric.StatusDetail.EventType,
metric.StatusDetail.ErrorType)
if err != nil {
panic(err)
}
Expand Down
134 changes: 134 additions & 0 deletions enterprise/reporting/reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/rudderlabs/rudder-go-kit/logger"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
mock_backendconfig "github.com/rudderlabs/rudder-server/mocks/backend-config"
"github.com/rudderlabs/rudder-server/utils/pubsub"
Expand Down Expand Up @@ -72,6 +73,139 @@ var _ = Describe("Reporting", func() {
assertReportMetric(expectedResponse, transformedMetric)
})
})

Context("getAggregatedReports Tests", func() {
inputReports := []*types.ReportByStatus{
{
InstanceDetails: types.InstanceDetails{
WorkspaceID: "some-workspace-id",
},
ConnectionDetails: types.ConnectionDetails{
SourceID: "some-source-id",
DestinationID: "some-destination-id",
TransformationId: "some-transformation-id",
TrackingPlanId: "some-tracking-plan-id",
},
PUDetails: types.PUDetails{
InPU: "some-in-pu",
PU: "some-pu",
},
ReportMetadata: types.ReportMetadata{
ReportedAt: 28017690,
},
StatusDetail: &types.StatusDetail{
Status: "some-status",
Count: 3,
ViolationCount: 5,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
ErrorType: "",
},
},
{
InstanceDetails: types.InstanceDetails{
WorkspaceID: "some-workspace-id",
},
ConnectionDetails: types.ConnectionDetails{
SourceID: "some-source-id",
DestinationID: "some-destination-id",
TransformationId: "some-transformation-id",
TrackingPlanId: "some-tracking-plan-id",
},
PUDetails: types.PUDetails{
InPU: "some-in-pu",
PU: "some-pu",
},
ReportMetadata: types.ReportMetadata{
ReportedAt: 28017690,
},
StatusDetail: &types.StatusDetail{
Status: "some-status",
Count: 2,
ViolationCount: 10,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
ErrorType: "some-error-type",
},
},
{
InstanceDetails: types.InstanceDetails{
WorkspaceID: "some-workspace-id",
},
ConnectionDetails: types.ConnectionDetails{
SourceID: "some-source-id",
DestinationID: "some-destination-id",
TransformationId: "some-transformation-id",
TrackingPlanId: "some-tracking-plan-id",
},
PUDetails: types.PUDetails{
InPU: "some-in-pu",
PU: "some-pu",
},
ReportMetadata: types.ReportMetadata{
ReportedAt: 28017690,
},
StatusDetail: &types.StatusDetail{
Status: "some-status",
Count: 3,
ViolationCount: 10,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
ErrorType: "some-error-type",
},
},
}

expectedResponse := []*types.Metric{
{
InstanceDetails: types.InstanceDetails{
WorkspaceID: "some-workspace-id",
},
ConnectionDetails: types.ConnectionDetails{
SourceID: "some-source-id",
DestinationID: "some-destination-id",
TransformationId: "some-transformation-id",
TrackingPlanId: "some-tracking-plan-id",
},
PUDetails: types.PUDetails{
InPU: "some-in-pu",
PU: "some-pu",
},
ReportMetadata: types.ReportMetadata{
ReportedAt: 28017690 * 60 * 1000,
},
StatusDetails: []*types.StatusDetail{
{
Status: "some-status",
Count: 3,
ViolationCount: 5,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
ErrorType: "",
},
{
Status: "some-status",
Count: 5,
ViolationCount: 20,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
ErrorType: "some-error-type",
},
},
},
}

var log logger.Logger
reportHandle := NewFromEnvConfig(log)

aggregatedMetrics := reportHandle.getAggregatedReports(inputReports)
Expect(aggregatedMetrics).To(Equal(expectedResponse))
})
})

func assertReportMetric(expectedMetric, actualMetric types.PUReportedMetric) {
Expand Down