Skip to content

Commit

Permalink
feat: adding sourceId and destinationId in pipeline info metrics (#4332)
Browse files Browse the repository at this point in the history
* feat: add sourceid and destination id to pipeline_processed_events metric
  • Loading branch information
gane5hvarma committed Feb 5, 2024
1 parent cf531a4 commit 6c1f1c4
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 64 deletions.
5 changes: 5 additions & 0 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,11 @@ type JobStatusT struct {
WorkspaceId string `json:"WorkspaceId"`
}

type ConnectionDetails struct {
SourceID string
DestinationID string
}

func (r *JobStatusT) sanitizeJson() {
r.ErrorResponse = sanitizeJson(r.ErrorResponse)
r.Parameters = sanitizeJson(r.Parameters)
Expand Down
9 changes: 8 additions & 1 deletion router/batchrouter/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

const module = "batch_router"

type Handle struct {
destType string
// dependencies
Expand Down Expand Up @@ -576,6 +578,7 @@ func (brt *Handle) updateJobStatus(batchJobs *BatchedJobs, isWarehouse bool, err
transformedAtMap := make(map[string]string)
statusDetailsMap := make(map[string]*types.StatusDetail)
jobStateCounts := make(map[string]int)
jobIDConnectionDetailsMap := make(map[int64]jobsdb.ConnectionDetails)
for _, job := range batchJobs.Jobs {
jobState := batchJobState

Expand Down Expand Up @@ -622,6 +625,10 @@ func (brt *Handle) updateJobStatus(batchJobs *BatchedJobs, isWarehouse bool, err
abortedEvents = append(abortedEvents, job)
}
attemptNum := job.LastJobStatus.AttemptNum + 1
jobIDConnectionDetailsMap[job.JobID] = jobsdb.ConnectionDetails{
SourceID: parameters.SourceID,
DestinationID: parameters.DestinationID,
}
status := jobsdb.JobStatusT{
JobID: job.JobID,
AttemptNum: attemptNum,
Expand Down Expand Up @@ -763,7 +770,7 @@ func (brt *Handle) updateJobStatus(batchJobs *BatchedJobs, isWarehouse bool, err
if err != nil {
panic(err)
}
brt.updateProcessedEventsMetrics(statusList)
routerutils.UpdateProcessedEventsMetrics(stats.Default, module, brt.destType, statusList, jobIDConnectionDetailsMap)
sendDestStatusStats(batchJobs.Connection, jobStateCounts, brt.destType, isWarehouse)
}

Expand Down
60 changes: 43 additions & 17 deletions router/batchrouter/handle_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/google/uuid"
"github.com/tidwall/gjson"

"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
Expand Down Expand Up @@ -88,13 +89,13 @@ func getFirstAttemptAtFromErrorResponse(msg stdjson.RawMessage) time.Time {
return res
}

func (brt *Handle) prepareJobStatusList(importingList []*jobsdb.JobT, defaultStatus jobsdb.JobStatusT) ([]*jobsdb.JobStatusT, []*jobsdb.JobT) {
func (brt *Handle) prepareJobStatusList(importingList []*jobsdb.JobT, defaultStatus jobsdb.JobStatusT, sourceID, destinationID string) ([]*jobsdb.JobStatusT, []*jobsdb.JobT, map[int64]jobsdb.ConnectionDetails) {
var abortedJobsList []*jobsdb.JobT
var statusList []*jobsdb.JobStatusT
if defaultStatus.ErrorResponse == nil {
defaultStatus.ErrorResponse = routerutils.EmptyPayload
}

jobIdConnectionDetailsMap := make(map[int64]jobsdb.ConnectionDetails)
for _, job := range importingList {
resp := enhanceResponseWithFirstAttemptedAt(job.LastJobStatus.ErrorResponse, defaultStatus.ErrorResponse)
status := jobsdb.JobStatusT{
Expand All @@ -109,6 +110,10 @@ func (brt *Handle) prepareJobStatusList(importingList []*jobsdb.JobT, defaultSta
JobParameters: job.Parameters,
WorkspaceId: job.WorkspaceId,
}
jobIdConnectionDetailsMap[job.JobID] = jobsdb.ConnectionDetails{
SourceID: sourceID,
DestinationID: destinationID,
}

if defaultStatus.JobState == jobsdb.Failed.State {
if brt.retryLimitReached(&status) {
Expand All @@ -118,7 +123,7 @@ func (brt *Handle) prepareJobStatusList(importingList []*jobsdb.JobT, defaultSta
}
statusList = append(statusList, &status)
}
return statusList, abortedJobsList
return statusList, abortedJobsList, jobIdConnectionDetailsMap
}

func (brt *Handle) getParamertsFromJobs(jobs []*jobsdb.JobT) map[int64]stdjson.RawMessage {
Expand All @@ -129,25 +134,28 @@ func (brt *Handle) getParamertsFromJobs(jobs []*jobsdb.JobT) map[int64]stdjson.R
return parametersMap
}

func (brt *Handle) updatePollStatusToDB(ctx context.Context, destinationID string,
importingJob *jobsdb.JobT, pollResp common.PollStatusResponse,
func (brt *Handle) updatePollStatusToDB(
ctx context.Context,
destinationID string,
sourceID string,
importingJob *jobsdb.JobT,
pollResp common.PollStatusResponse,
) ([]*jobsdb.JobStatusT, error) {
var statusList []*jobsdb.JobStatusT
jobIDConnectionDetailsMap := make(map[int64]jobsdb.ConnectionDetails)
list, err := brt.getImportingJobs(ctx, destinationID, brt.maxEventsInABatch)
if err != nil {
return statusList, err
}
importingList := list.Jobs
if pollResp.StatusCode == http.StatusOK && pollResp.Complete {
if !pollResp.HasFailed && !pollResp.HasWarning {
statusList, _ = brt.prepareJobStatusList(importingList, jobsdb.JobStatusT{JobState: jobsdb.Succeeded.State})
statusList, _, jobIDConnectionDetailsMap = brt.prepareJobStatusList(importingList, jobsdb.JobStatusT{JobState: jobsdb.Succeeded.State}, sourceID, destinationID)
if err := brt.updateJobStatuses(ctx, destinationID, importingList, importingList, statusList); err != nil {
brt.logger.Errorf("[Batch Router] Failed to update job status for Dest Type %v with error %v", brt.destType, err)
return statusList, err
}
brt.asyncSuccessfulJobCount.Count(len(statusList))
brt.updateProcessedEventsMetrics(statusList)
return statusList, nil
} else {
getUploadStatsInput := common.GetUploadStatsInput{
FailedJobURLs: pollResp.FailedJobURLs,
Expand All @@ -171,6 +179,10 @@ func (brt *Handle) updatePollStatusToDB(ctx context.Context, destinationID strin
successfulJobIDs := append(uploadStatsResp.Metadata.SucceededKeys, uploadStatsResp.Metadata.WarningKeys...)
for _, job := range importingList {
jobID := job.JobID
jobIDConnectionDetailsMap[jobID] = jobsdb.ConnectionDetails{
SourceID: sourceID,
DestinationID: destinationID,
}
var status *jobsdb.JobStatusT
if slices.Contains(successfulJobIDs, jobID) {
warningRespString := uploadStatsResp.Metadata.WarningReasons[jobID]
Expand Down Expand Up @@ -226,26 +238,24 @@ func (brt *Handle) updatePollStatusToDB(ctx context.Context, destinationID strin
brt.logger.Errorf("[Batch Router] Failed to update job status for Dest Type %v with error %v", brt.destType, err)
return statusList, err
}
brt.updateProcessedEventsMetrics(statusList)
}
} else if pollResp.StatusCode == http.StatusBadRequest {
statusList, _ := brt.prepareJobStatusList(importingList, jobsdb.JobStatusT{JobState: jobsdb.Aborted.State, ErrorResponse: misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", "poll failed with status code 400")})
statusList, _, jobIDConnectionDetailsMap = brt.prepareJobStatusList(importingList, jobsdb.JobStatusT{JobState: jobsdb.Aborted.State, ErrorResponse: misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", "poll failed with status code 400")}, sourceID, destinationID)
if err := brt.updateJobStatuses(ctx, destinationID, importingList, importingList, statusList); err != nil {
brt.logger.Errorf("[Batch Router] Failed to update job status for Dest Type %v with error %v", brt.destType, err)
return statusList, err
}
brt.asyncAbortedJobCount.Count(len(statusList))
brt.updateProcessedEventsMetrics(statusList)
} else {
statusList, abortedJobsList := brt.prepareJobStatusList(importingList, jobsdb.JobStatusT{JobState: jobsdb.Failed.State, ErrorResponse: misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", pollResp.Error)})
var abortedJobsList []*jobsdb.JobT
statusList, abortedJobsList, jobIDConnectionDetailsMap = brt.prepareJobStatusList(importingList, jobsdb.JobStatusT{JobState: jobsdb.Failed.State, ErrorResponse: misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", pollResp.Error)}, sourceID, destinationID)
if err := brt.updateJobStatuses(ctx, destinationID, importingList, abortedJobsList, statusList); err != nil {
brt.logger.Errorf("[Batch Router] Failed to update job status for Dest Type %v with error %v", brt.destType, err)
return statusList, err
}
brt.asyncFailedJobCount.Count(len(statusList))
brt.updateProcessedEventsMetrics(statusList)
}

routerutils.UpdateProcessedEventsMetrics(stats.Default, module, brt.destType, statusList, jobIDConnectionDetailsMap)
return statusList, nil
}

Expand Down Expand Up @@ -280,7 +290,7 @@ func (brt *Handle) pollAsyncStatus(ctx context.Context) {
if pollResp.InProgress {
continue
}
statusList, err := brt.updatePollStatusToDB(ctx, destinationID, importingJob, pollResp)
statusList, err := brt.updatePollStatusToDB(ctx, destinationID, sourceID, importingJob, pollResp)
if err == nil {
brt.asyncDestinationStruct[destinationID].UploadInProgress = false
brt.recordAsyncDestinationDeliveryStatus(sourceID, destinationID, statusList)
Expand Down Expand Up @@ -576,8 +586,13 @@ func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, at
workspaceID := brt.GetWorkspaceIDForDestID(asyncOutput.DestinationID)
var completedJobsList []*jobsdb.JobT
var statusList []*jobsdb.JobStatusT
jobIDConnectionDetailsMap := make(map[int64]jobsdb.ConnectionDetails)
if len(asyncOutput.ImportingJobIDs) > 0 {
for _, jobId := range asyncOutput.ImportingJobIDs {
jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{
DestinationID: asyncOutput.DestinationID,
SourceID: gjson.GetBytes(originalJobParameters[jobId], "source_id").String(),
}
status := jobsdb.JobStatusT{
JobID: jobId,
JobState: jobsdb.Importing.State,
Expand All @@ -595,6 +610,10 @@ func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, at
}
if len(asyncOutput.SucceededJobIDs) > 0 {
for _, jobId := range asyncOutput.SucceededJobIDs {
jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{
DestinationID: asyncOutput.DestinationID,
SourceID: gjson.GetBytes(originalJobParameters[jobId], "source_id").String(),
}
status := jobsdb.JobStatusT{
JobID: jobId,
JobState: jobsdb.Succeeded.State,
Expand All @@ -613,6 +632,10 @@ func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, at
}
if len(asyncOutput.FailedJobIDs) > 0 {
for _, jobId := range asyncOutput.FailedJobIDs {
jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{
DestinationID: asyncOutput.DestinationID,
SourceID: gjson.GetBytes(originalJobParameters[jobId], "source_id").String(),
}
resp := misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", asyncOutput.FailedReason)
status := jobsdb.JobStatusT{
JobID: jobId,
Expand Down Expand Up @@ -641,6 +664,10 @@ func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, at
}
if len(asyncOutput.AbortJobIDs) > 0 {
for _, jobId := range asyncOutput.AbortJobIDs {
jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{
DestinationID: asyncOutput.DestinationID,
SourceID: gjson.GetBytes(originalJobParameters[jobId], "source_id").String(),
}
status := jobsdb.JobStatusT{
JobID: jobId,
JobState: jobsdb.Aborted.State,
Expand Down Expand Up @@ -696,14 +723,13 @@ func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, at
if err != nil {
panic(err)
}
brt.updateProcessedEventsMetrics(statusList)
routerutils.UpdateProcessedEventsMetrics(stats.Default, module, brt.destType, statusList, jobIDConnectionDetailsMap)
rmetrics.DecreasePendingEvents(
"batch_rt",
workspaceID,
brt.destType,
float64(len(completedJobsList)),
)

if attempted {
var sourceID string
if len(statusList) > 0 {
Expand Down
22 changes: 0 additions & 22 deletions router/batchrouter/handle_observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,28 +229,6 @@ func (brt *Handle) updateRudderSourcesStats(
return nil
}

func (brt *Handle) updateProcessedEventsMetrics(statusList []*jobsdb.JobStatusT) {
eventsPerStateAndCode := map[string]map[string]int{}
for i := range statusList {
state := statusList[i].JobState
code := statusList[i].ErrorCode
if _, ok := eventsPerStateAndCode[state]; !ok {
eventsPerStateAndCode[state] = map[string]int{}
}
eventsPerStateAndCode[state][code]++
}
for state, codes := range eventsPerStateAndCode {
for code, count := range codes {
stats.Default.NewTaggedStat(`pipeline_processed_events`, stats.CountType, stats.Tags{
"module": "batch_router",
"destType": brt.destType,
"state": state,
"code": code,
}).Count(count)
}
}
}

// pipelineDelayStats reports the delay of the pipeline as a range:
//
// - max - time elapsed since the first job was created
Expand Down
7 changes: 6 additions & 1 deletion router/batchrouter/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,14 @@ func (w *worker) processJobAsync(jobsWg *sync.WaitGroup, destinationJobs *Destin
var drainList []*jobsdb.JobStatusT
var drainJobList []*jobsdb.JobT
drainStatsbyDest := make(map[string]*routerutils.DrainStats)
jobIDConnectionDetailsMap := make(map[int64]jobsdb.ConnectionDetails)

jobsBySource := make(map[string][]*jobsdb.JobT)
for _, job := range destinationJobs.jobs {
jobIDConnectionDetailsMap[job.JobID] = jobsdb.ConnectionDetails{
SourceID: gjson.GetBytes(job.Parameters, "source_id").String(),
DestinationID: destWithSources.Destination.ID,
}
if drain, reason := brt.drainer.Drain(
job,
); drain {
Expand Down Expand Up @@ -153,7 +158,7 @@ func (w *worker) processJobAsync(jobsWg *sync.WaitGroup, destinationJobs *Destin
if err != nil {
panic(err)
}
brt.updateProcessedEventsMetrics(statusList)
routerutils.UpdateProcessedEventsMetrics(stats.Default, module, brt.destType, statusList, jobIDConnectionDetailsMap)
for destID, destDrainStat := range drainStatsbyDest {
stats.Default.NewTaggedStat("drained_events", stats.CountType, stats.Tags{
"destType": brt.destType,
Expand Down
9 changes: 8 additions & 1 deletion router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
utilTypes "github.com/rudderlabs/rudder-server/utils/types"
)

const module = "router"

// Handle is the handle to this module.
type Handle struct {
// external dependencies
Expand Down Expand Up @@ -324,6 +326,7 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
var completedJobsList []*jobsdb.JobT
var statusList []*jobsdb.JobStatusT
var routerAbortedJobs []*jobsdb.JobT
jobIDConnectionDetailsMap := make(map[int64]jobsdb.ConnectionDetails)
for _, workerJobStatus := range *workerJobStatuses {
var parameters routerutils.JobParameters
err := json.Unmarshal(workerJobStatus.job.Parameters, &parameters)
Expand All @@ -337,6 +340,10 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
workspaceID := workerJobStatus.status.WorkspaceId
eventName := gjson.GetBytes(workerJobStatus.job.Parameters, "event_name").String()
eventType := gjson.GetBytes(workerJobStatus.job.Parameters, "event_type").String()
jobIDConnectionDetailsMap[workerJobStatus.job.JobID] = jobsdb.ConnectionDetails{
SourceID: parameters.SourceID,
DestinationID: parameters.DestinationID,
}
key := fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s", parameters.SourceID, parameters.DestinationID, parameters.SourceJobRunID, workerJobStatus.status.JobState, workerJobStatus.status.ErrorCode, eventName, eventType)
_, ok := connectionDetailsMap[key]
if !ok {
Expand Down Expand Up @@ -454,7 +461,7 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
if err != nil {
panic(err)
}
rt.updateProcessedEventsMetrics(statusList)
routerutils.UpdateProcessedEventsMetrics(stats.Default, module, rt.destType, statusList, jobIDConnectionDetailsMap)
for workspace, jobCount := range routerWorkspaceJobStatusCount {
rmetrics.DecreasePendingEvents(
"rt",
Expand Down
22 changes: 0 additions & 22 deletions router/handle_observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,28 +102,6 @@ func (rt *Handle) updateRudderSourcesStats(
return err
}

func (rt *Handle) updateProcessedEventsMetrics(statusList []*jobsdb.JobStatusT) {
eventsPerStateAndCode := map[string]map[string]int{}
for i := range statusList {
state := statusList[i].JobState
code := statusList[i].ErrorCode
if _, ok := eventsPerStateAndCode[state]; !ok {
eventsPerStateAndCode[state] = map[string]int{}
}
eventsPerStateAndCode[state][code]++
}
for state, codes := range eventsPerStateAndCode {
for code, count := range codes {
stats.Default.NewTaggedStat(`pipeline_processed_events`, stats.CountType, stats.Tags{
"module": "router",
"destType": rt.destType,
"state": state,
"code": code,
}).Count(count)
}
}
}

func (rt *Handle) sendRetryStoreStats(attempt int) {
rt.logger.Warnf("Timeout during store jobs in router module, attempt %d", attempt)
stats.Default.NewTaggedStat("jobsdb_store_timeout", stats.CountType, stats.Tags{"attempt": fmt.Sprint(attempt), "module": "router"}).Count(1)
Expand Down

0 comments on commit 6c1f1c4

Please sign in to comment.