diff --git a/backend-config/types.go b/backend-config/types.go index ffd00c2244..1ac9824e19 100644 --- a/backend-config/types.go +++ b/backend-config/types.go @@ -72,6 +72,10 @@ type SourceT struct { EventSchemasEnabled bool } +func (s *SourceT) IsReplaySource() bool { + return s.OriginalID != "" +} + type WorkspaceRegulationT struct { ID string RegulationType string diff --git a/gateway/gateway_test.go b/gateway/gateway_test.go index 95f5aac01b..f52a1bfeb4 100644 --- a/gateway/gateway_test.go +++ b/gateway/gateway_test.go @@ -56,6 +56,8 @@ const ( WriteKeyInvalid = "invalid-write-key" WriteKeyEmpty = "" SourceIDEnabled = "enabled-source" + ReplaySourceID = "replay-source" + ReplayWriteKey = "replay-source" SourceIDDisabled = "disabled-source" TestRemoteAddressWithPort = "test.com:80" TestRemoteAddress = "test.com" @@ -112,6 +114,16 @@ var sampleBackendConfig = backendconfig.ConfigT{ }, WorkspaceID: WorkspaceID, }, + { + ID: ReplaySourceID, + WriteKey: ReplayWriteKey, + Enabled: true, + OriginalID: ReplaySourceID, + SourceDefinition: backendconfig.SourceDefinitionT{ + Name: SourceIDEnabled, + }, + WorkspaceID: WorkspaceID, + }, }, } @@ -255,7 +267,7 @@ var _ = Describe("Gateway Enterprise", func() { It("should not accept events from suppress users", func() { suppressedUserEventData := fmt.Sprintf(`{"batch":[{"userId":%q}]}`, SuppressedUserID) // Why GET - expectHandlerResponse((gateway.webBatchHandler()), authorizedRequest(WriteKeyEnabled, bytes.NewBufferString(suppressedUserEventData)), http.StatusOK, "OK", "batch") + expectHandlerResponse(gateway.webBatchHandler(), authorizedRequest(WriteKeyEnabled, bytes.NewBufferString(suppressedUserEventData)), http.StatusOK, "OK", "batch") Eventually( func() bool { stat := statsStore.Get( @@ -450,7 +462,11 @@ var _ = Describe("Gateway", func() { } Expect(err).To(BeNil()) req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(WriteKeyEnabled+":"))) + if ep == "/internal/v1/replay" { + req.Header.Set("X-Rudder-Source-Id", ReplaySourceID) + } else { + req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(WriteKeyEnabled+":"))) + } resp, err := client.Do(req) Expect(err).To(BeNil()) Expect(resp.StatusCode).To(SatisfyAny(Equal(http.StatusOK), Equal(http.StatusNoContent)), "endpoint: "+ep) @@ -1387,6 +1403,8 @@ func endpointsToVerify() ([]string, []string, []string) { // TODO: Remove this endpoint once sources change is released "/v1/warehouse/fetch-tables", "/internal/v1/warehouse/fetch-tables", + "/internal/v1/job-status/123", + "/internal/v1/job-status/123/failed-records", } postEndpoints := []string{ @@ -1399,10 +1417,12 @@ func endpointsToVerify() ([]string, []string, []string) { "/v1/merge", "/v1/group", "/v1/import", - "/v1/audiencelist", + "/v1/audiencelist", // Get rid of this over time and use the /internal endpoint "/v1/webhook", "/beacon/v1/batch", "/internal/v1/extract", + "/internal/v1/replay", + "/internal/v1/audiencelist", "/v1/warehouse/pending-events", "/v1/warehouse/trigger-upload", "/v1/warehouse/jobs", diff --git a/gateway/handle.go b/gateway/handle.go index 3e26887cca..5bf1153ebe 100644 --- a/gateway/handle.go +++ b/gateway/handle.go @@ -110,7 +110,6 @@ type Handle struct { IdleTimeout time.Duration allowReqsWithoutUserIDAndAnonymousID bool gwAllowPartialWriteWithErrors bool - allowBatchSplitting bool } } @@ -434,17 +433,6 @@ func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq, `{"error": "rudder-server gateway failed to marshal params"}`, ) } - if !gw.conf.allowBatchSplitting { - // instead of multiple jobs with one event, create one job with all events - out = []jobObject{ - { - userID: out[0].userID, - events: lo.Map(out, func(userEvent jobObject, _ int) map[string]interface{} { - return userEvent.events[0] - }), - }, - } - } jobs := make([]*jobsdb.JobT, 0) for _, userEvent := range out { var ( @@ -458,11 +446,15 @@ func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq, WriteKey string `json:"writeKey"` ReceivedAt string `json:"receivedAt"` } + receivedAt, ok := userEvent.events[0]["receivedAt"].(string) + if !ok || !arctx.ReplaySource { + receivedAt = time.Now().Format(misc.RFC3339Milli) + } singularEventBatch := SingularEventBatch{ Batch: userEvent.events, RequestIP: ipAddr, WriteKey: arctx.WriteKey, - ReceivedAt: time.Now().Format(misc.RFC3339Milli), + ReceivedAt: receivedAt, } payload, err = json.Marshal(singularEventBatch) if err != nil { diff --git a/gateway/handle_http_auth.go b/gateway/handle_http_auth.go index c86165ddc5..60a73092e0 100644 --- a/gateway/handle_http_auth.go +++ b/gateway/handle_http_auth.go @@ -129,6 +129,21 @@ func (gw *Handle) sourceIDAuth(delegate http.HandlerFunc) http.HandlerFunc { } } +// replaySourceIDAuth middleware to authenticate sourceID in the X-Rudder-Source-Id header. +// If the sourceID is valid, i.e. it is a replay source and enabled, the source auth info is added to the request context. +// If the sourceID is invalid, the request is rejected. +func (gw *Handle) replaySourceIDAuth(delegate http.HandlerFunc) http.HandlerFunc { + return gw.sourceIDAuth(func(w http.ResponseWriter, r *http.Request) { + arctx := r.Context().Value(gwtypes.CtxParamAuthRequestContext).(*gwtypes.AuthRequestContext) + s, ok := gw.sourceIDSourceMap[arctx.SourceID] + if !ok || !s.IsReplaySource() { + gw.handleHttpError(w, r, response.InvalidReplaySource) + return + } + delegate.ServeHTTP(w, r) + }) +} + // augmentAuthRequestContext adds source job run id and task run id from the request to the authentication context. func augmentAuthRequestContext(arctx *gwtypes.AuthRequestContext, r *http.Request) { arctx.SourceJobRunID = r.Header.Get("X-Rudder-Job-Run-Id") @@ -165,6 +180,7 @@ func sourceToRequestContext(s backendconfig.SourceT) *gwtypes.AuthRequestContext SourceName: s.Name, SourceCategory: s.SourceDefinition.Category, SourceDefName: s.SourceDefinition.Name, + ReplaySource: s.IsReplaySource(), } if arctx.SourceCategory == "" { arctx.SourceCategory = eventStreamSourceCategory diff --git a/gateway/handle_http_auth_test.go b/gateway/handle_http_auth_test.go index ef92aba1ce..0e730b5d54 100644 --- a/gateway/handle_http_auth_test.go +++ b/gateway/handle_http_auth_test.go @@ -277,4 +277,64 @@ func TestAuth(t *testing.T) { require.Equal(t, "Source is disabled\n", string(body)) }) }) + + t.Run("replaySourceIDAuth", func(t *testing.T) { + t.Run("replay source", func(t *testing.T) { + sourceID := "123" + gw := newGateway(nil, map[string]backendconfig.SourceT{ + sourceID: { + ID: sourceID, + Enabled: true, + OriginalID: sourceID, + }, + }) + r := newSourceIDRequest(sourceID) + w := httptest.NewRecorder() + gw.replaySourceIDAuth(delegate).ServeHTTP(w, r) + + require.Equal(t, http.StatusOK, w.Code, "authentication should succeed") + body, err := io.ReadAll(w.Body) + require.NoError(t, err, "reading response body should succeed") + require.Equal(t, "OK", string(body)) + }) + + t.Run("invalid source using replay endpoint", func(t *testing.T) { + sourceID := "123" + invalidSource := "345" + gw := newGateway(nil, map[string]backendconfig.SourceT{ + sourceID: { + ID: sourceID, + Enabled: true, + OriginalID: "", + }, + }) + r := newSourceIDRequest(invalidSource) + w := httptest.NewRecorder() + gw.replaySourceIDAuth(delegate).ServeHTTP(w, r) + + require.Equal(t, http.StatusUnauthorized, w.Code, "authentication should not succeed") + body, err := io.ReadAll(w.Body) + require.NoError(t, err, "reading response body should succeed") + require.Equal(t, "Invalid source id\n", string(body)) + }) + + t.Run("regular source using replay endpoint", func(t *testing.T) { + sourceID := "123" + gw := newGateway(nil, map[string]backendconfig.SourceT{ + sourceID: { + ID: sourceID, + Enabled: true, + OriginalID: "", + }, + }) + r := newSourceIDRequest(sourceID) + w := httptest.NewRecorder() + gw.replaySourceIDAuth(delegate).ServeHTTP(w, r) + + require.Equal(t, http.StatusUnauthorized, w.Code, "authentication should not succeed") + body, err := io.ReadAll(w.Body) + require.NoError(t, err, "reading response body should succeed") + require.Equal(t, "Invalid replay source\n", string(body)) + }) + }) } diff --git a/gateway/handle_http_replay.go b/gateway/handle_http_replay.go new file mode 100644 index 0000000000..10ce87338c --- /dev/null +++ b/gateway/handle_http_replay.go @@ -0,0 +1,8 @@ +package gateway + +import "net/http" + +// webImportHandler can handle import requests +func (gw *Handle) webReplayHandler() http.HandlerFunc { + return gw.callType("replay", gw.replaySourceIDAuth(gw.webHandler())) +} diff --git a/gateway/handle_lifecycle.go b/gateway/handle_lifecycle.go index 2ddd9b70e2..4dd26ea17b 100644 --- a/gateway/handle_lifecycle.go +++ b/gateway/handle_lifecycle.go @@ -9,12 +9,13 @@ import ( "strconv" "time" + "golang.org/x/sync/errgroup" + "github.com/bugsnag/bugsnag-go/v2" "github.com/go-chi/chi/v5" "github.com/google/uuid" "github.com/rs/cors" "github.com/samber/lo" - "golang.org/x/sync/errgroup" "github.com/rudderlabs/rudder-go-kit/chiware" "github.com/rudderlabs/rudder-go-kit/config" @@ -91,7 +92,6 @@ func (gw *Handle) Setup( // Enables accepting requests without user id and anonymous id. This is added to prevent client 4xx retries. config.RegisterBoolConfigVariable(false, &gw.conf.allowReqsWithoutUserIDAndAnonymousID, true, "Gateway.allowReqsWithoutUserIDAndAnonymousID") config.RegisterBoolConfigVariable(true, &gw.conf.gwAllowPartialWriteWithErrors, true, "Gateway.allowPartialWriteWithErrors") - config.RegisterBoolConfigVariable(true, &gw.conf.allowBatchSplitting, true, "Gateway.allowBatchSplitting") config.RegisterDurationConfigVariable(0, &gw.conf.ReadTimeout, false, time.Second, []string{"ReadTimeout", "ReadTimeOutInSec"}...) config.RegisterDurationConfigVariable(0, &gw.conf.ReadHeaderTimeout, false, time.Second, []string{"ReadHeaderTimeout", "ReadHeaderTimeoutInSec"}...) config.RegisterDurationConfigVariable(10, &gw.conf.WriteTimeout, false, time.Second, []string{"WriteTimeout", "WriteTimeOutInSec"}...) @@ -359,6 +359,10 @@ func (gw *Handle) StartWebHandler(ctx context.Context) error { gw.logger.Infof("WebHandler Starting on %d", gw.conf.webPort) component := "gateway" srvMux := chi.NewRouter() + // rudder-sources new APIs + rsourcesHandler := rsources_http.NewHandler( + gw.rsourcesService, + gw.logger.Child("rsources")) srvMux.Use( chiware.StatMiddleware(ctx, srvMux, stats.Default, component), middleware.LimitConcurrentRequests(gw.conf.maxConcurrentRequests), @@ -367,7 +371,11 @@ func (gw *Handle) StartWebHandler(ctx context.Context) error { srvMux.Route("/internal", func(r chi.Router) { r.Post("/v1/extract", gw.webExtractHandler()) r.Get("/v1/warehouse/fetch-tables", gw.whProxy.ServeHTTP) + r.Post("/v1/audiencelist", gw.webAudienceListHandler()) + r.Post("/v1/replay", gw.webReplayHandler()) + r.Mount("/v1/job-status", withContentType("application/json; charset=utf-8", rsourcesHandler.ServeHTTP)) }) + srvMux.Mount("/v1/job-status", withContentType("application/json; charset=utf-8", rsourcesHandler.ServeHTTP)) srvMux.Route("/v1", func(r chi.Router) { r.Post("/alias", gw.webAliasHandler()) @@ -419,12 +427,6 @@ func (gw *Handle) StartWebHandler(ctx context.Context) error { }) } - // rudder-sources new APIs - rsourcesHandler := rsources_http.NewHandler( - gw.rsourcesService, - gw.logger.Child("rsources")) - srvMux.Mount("/v1/job-status", withContentType("application/json; charset=utf-8", rsourcesHandler.ServeHTTP)) - c := cors.New(cors.Options{ AllowOriginFunc: func(_ string) bool { return true }, AllowCredentials: true, diff --git a/gateway/internal/types/types.go b/gateway/internal/types/types.go index 1dfb46e627..07fd719701 100644 --- a/gateway/internal/types/types.go +++ b/gateway/internal/types/types.go @@ -13,14 +13,14 @@ const ( // AuthRequestContext contains the authenticated source information for a request. type AuthRequestContext struct { - SourceEnabled bool - SourceID string - WriteKey string - WorkspaceID string - SourceName string - SourceDefName string - SourceCategory string - + SourceEnabled bool + SourceID string + WriteKey string + WorkspaceID string + SourceName string + SourceDefName string + SourceCategory string + ReplaySource bool SourceJobRunID string SourceTaskRunID string } diff --git a/gateway/response/response.go b/gateway/response/response.go index 8b57564dcb..0fd3001776 100644 --- a/gateway/response/response.go +++ b/gateway/response/response.go @@ -60,6 +60,8 @@ const ( NoSourceIdInHeader = "Failed to read source id from header" // InvalidSourceID - Invalid source id InvalidSourceID = "Invalid source id" + // InvalidReplaySource - Invalid replay source + InvalidReplaySource = "Invalid replay source" transPixelResponse = "\x47\x49\x46\x38\x39\x61\x01\x00\x01\x00\x80\x00\x00\x00\x00\x00\x00\x00\x00\x21\xF9\x04" + "\x01\x00\x00\x00\x00\x2C\x00\x00\x00\x00\x01\x00\x01\x00\x00\x02\x02\x44\x01\x00\x3B" @@ -79,6 +81,7 @@ var statusMap = map[string]status{ InvalidJSON: {message: InvalidJSON, code: http.StatusBadRequest}, NoSourceIdInHeader: {message: NoSourceIdInHeader, code: http.StatusUnauthorized}, InvalidSourceID: {message: InvalidSourceID, code: http.StatusUnauthorized}, + InvalidReplaySource: {message: InvalidReplaySource, code: http.StatusUnauthorized}, // webhook specific status InvalidWebhookSource: {message: InvalidWebhookSource, code: http.StatusNotFound}, diff --git a/processor/processor.go b/processor/processor.go index dbec209218..c7d597bcb6 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -120,8 +120,8 @@ type Handle struct { maxEventsToProcess int transformBatchSize int userTransformBatchSize int - writeKeyDestinationMap map[string][]backendconfig.DestinationT - writeKeySourceMap map[string]backendconfig.SourceT + sourceIdDestinationMap map[string][]backendconfig.DestinationT + sourceIdSourceMap map[string]backendconfig.SourceT workspaceLibrariesMap map[string]backendconfig.LibrariesT destinationIDtoTypeMap map[string]string destConsentCategories map[string][]string @@ -234,7 +234,6 @@ type MetricMetadata struct { } type ( - WriteKeyT string SourceIDT string ) @@ -714,16 +713,16 @@ func (proc *Handle) backendConfigSubscriber(ctx context.Context) { var ( destConsentCategories = make(map[string][]string) workspaceLibrariesMap = make(map[string]backendconfig.LibrariesT, len(config)) - writeKeyDestinationMap = make(map[string][]backendconfig.DestinationT) - writeKeySourceMap = map[string]backendconfig.SourceT{} + sourceIdDestinationMap = make(map[string][]backendconfig.DestinationT) + sourceIdSourceMap = map[string]backendconfig.SourceT{} destinationIDtoTypeMap = make(map[string]string) ) for workspaceID, wConfig := range config { for i := range wConfig.Sources { source := &wConfig.Sources[i] - writeKeySourceMap[source.WriteKey] = *source + sourceIdSourceMap[source.ID] = *source if source.Enabled { - writeKeyDestinationMap[source.WriteKey] = source.Destinations + sourceIdDestinationMap[source.ID] = source.Destinations for j := range source.Destinations { destination := &source.Destinations[j] destinationIDtoTypeMap[destination.ID] = destination.DestinationDefinition.Name @@ -736,8 +735,8 @@ func (proc *Handle) backendConfigSubscriber(ctx context.Context) { proc.config.configSubscriberLock.Lock() proc.config.destConsentCategories = destConsentCategories proc.config.workspaceLibrariesMap = workspaceLibrariesMap - proc.config.writeKeyDestinationMap = writeKeyDestinationMap - proc.config.writeKeySourceMap = writeKeySourceMap + proc.config.sourceIdDestinationMap = sourceIdDestinationMap + proc.config.sourceIdSourceMap = sourceIdSourceMap proc.config.destinationIDtoTypeMap = destinationIDtoTypeMap proc.config.configSubscriberLock.Unlock() if !initDone { @@ -759,24 +758,24 @@ func (proc *Handle) getWorkspaceLibraries(workspaceID string) backendconfig.Libr return proc.config.workspaceLibrariesMap[workspaceID] } -func (proc *Handle) getSourceByWriteKey(writeKey string) (*backendconfig.SourceT, error) { +func (proc *Handle) getSourceBySourceID(sourceId string) (*backendconfig.SourceT, error) { var err error proc.config.configSubscriberLock.RLock() defer proc.config.configSubscriberLock.RUnlock() - source, ok := proc.config.writeKeySourceMap[writeKey] + source, ok := proc.config.sourceIdSourceMap[sourceId] if !ok { - err = errors.New("source not found for writeKey") - proc.logger.Errorf(`Processor : source not found for writeKey: %s`, writeKey) + err = errors.New("source not found for sourceId") + proc.logger.Errorf(`Processor : source not found for sourceId: %s`, sourceId) } return &source, err } -func (proc *Handle) getEnabledDestinations(writeKey, destinationName string) []backendconfig.DestinationT { +func (proc *Handle) getEnabledDestinations(sourceId, destinationName string) []backendconfig.DestinationT { proc.config.configSubscriberLock.RLock() defer proc.config.configSubscriberLock.RUnlock() var enabledDests []backendconfig.DestinationT - for i := range proc.config.writeKeyDestinationMap[writeKey] { - dest := &proc.config.writeKeyDestinationMap[writeKey][i] + for i := range proc.config.sourceIdDestinationMap[sourceId] { + dest := &proc.config.sourceIdDestinationMap[sourceId][i] if destinationName == dest.DestinationDefinition.Name && dest.Enabled { enabledDests = append(enabledDests, *dest) } @@ -784,12 +783,12 @@ func (proc *Handle) getEnabledDestinations(writeKey, destinationName string) []b return enabledDests } -func (proc *Handle) getBackendEnabledDestinationTypes(writeKey string) map[string]backendconfig.DestinationDefinitionT { +func (proc *Handle) getBackendEnabledDestinationTypes(sourceId string) map[string]backendconfig.DestinationDefinitionT { proc.config.configSubscriberLock.RLock() defer proc.config.configSubscriberLock.RUnlock() enabledDestinationTypes := make(map[string]backendconfig.DestinationDefinitionT) - for i := range proc.config.writeKeyDestinationMap[writeKey] { - destination := &proc.config.writeKeyDestinationMap[writeKey][i] + for i := range proc.config.sourceIdDestinationMap[sourceId] { + destination := &proc.config.sourceIdDestinationMap[sourceId][i] if destination.Enabled { enabledDestinationTypes[destination.DestinationDefinition.DisplayName] = destination.DestinationDefinition } @@ -827,9 +826,9 @@ func enhanceWithTimeFields(event *transformer.TransformerEvent, singularEventMap event.Message["timestamp"] = timestamp.Format(misc.RFC3339Milli) } -func makeCommonMetadataFromSingularEvent(singularEvent types.SingularEventT, batchEvent *jobsdb.JobT, receivedAt time.Time, source *backendconfig.SourceT) *transformer.Metadata { +func makeCommonMetadataFromSingularEvent(singularEvent types.SingularEventT, batchEvent *jobsdb.JobT, receivedAt time.Time, source *backendconfig.SourceT, eventParams types.EventParams) *transformer.Metadata { commonMetadata := transformer.Metadata{} - commonMetadata.SourceID = gjson.GetBytes(batchEvent.Parameters, "source_id").Str + commonMetadata.SourceID = source.ID commonMetadata.WorkspaceID = source.WorkspaceID commonMetadata.Namespace = config.GetKubeNamespace() commonMetadata.InstanceID = misc.GetInstanceID() @@ -840,9 +839,9 @@ func makeCommonMetadataFromSingularEvent(singularEvent types.SingularEventT, bat commonMetadata.SourceType = source.SourceDefinition.Name commonMetadata.SourceCategory = source.SourceDefinition.Category - commonMetadata.SourceJobRunID, _ = misc.MapLookup(singularEvent, "context", "sources", "job_run_id").(string) + commonMetadata.SourceJobRunID = eventParams.SourceJobRunId commonMetadata.SourceJobID, _ = misc.MapLookup(singularEvent, "context", "sources", "job_id").(string) - commonMetadata.SourceTaskRunID, _ = misc.MapLookup(singularEvent, "context", "sources", "task_run_id").(string) + commonMetadata.SourceTaskRunID = eventParams.SourceTaskRunId commonMetadata.RecordID = misc.MapLookup(singularEvent, "recordId") commonMetadata.EventName, _ = misc.MapLookup(singularEvent, "event").(string) @@ -1292,7 +1291,7 @@ func (proc *Handle) getFailedEventJobs(response transformer.Response, commonMeta return failedEventsToStore, failedMetrics, failedCountMap } -func (proc *Handle) updateSourceEventStatsDetailed(event types.SingularEventT, writeKey string) { +func (proc *Handle) updateSourceEventStatsDetailed(event types.SingularEventT, sourceId string) { // Any panics in this function are captured and ignore sending the stat defer func() { if r := recover(); r != nil { @@ -1301,10 +1300,15 @@ func (proc *Handle) updateSourceEventStatsDetailed(event types.SingularEventT, w }() var eventType string var eventName string + source, err := proc.getSourceBySourceID(sourceId) + if err != nil { + proc.logger.Errorf("[Processor] Failed to get source by source id: %s", sourceId) + return + } if val, ok := event["type"]; ok { - eventType = val.(string) + eventType, _ = val.(string) tags := map[string]string{ - "writeKey": writeKey, + "writeKey": source.WriteKey, "event_type": eventType, } statEventType := proc.statsFactory.NewSampledTaggedStat("processor.event_type", stats.CountType, tags) @@ -1314,13 +1318,13 @@ func (proc *Handle) updateSourceEventStatsDetailed(event types.SingularEventT, w eventName = eventType } else { if val, ok := event["event"]; ok { - eventName = val.(string) + eventName, _ = val.(string) } else { eventName = eventType } } tagsDetailed := map[string]string{ - "writeKey": writeKey, + "writeKey": source.WriteKey, "event_type": eventType, "event_name": eventName, } @@ -1378,7 +1382,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf var statusList []*jobsdb.JobStatusT groupedEvents := make(map[string][]transformer.TransformerEvent) - groupedEventsByWriteKey := make(map[WriteKeyT][]transformer.TransformerEvent) + groupedEventsBySourceId := make(map[SourceIDT][]transformer.TransformerEvent) eventsByMessageID := make(map[string]types.SingularEventWithReceivedAt) var procErrorJobs []*jobsdb.JobT eventSchemaJobs := make([]*jobsdb.JobT, 0) @@ -1397,7 +1401,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf proc.logger.Debug("[Processor] Total jobs picked up : ", len(jobList)) marshalStart := time.Now() - uniqueMessageIds := make(map[string]struct{}) + dedupKeys := make(map[string]struct{}) uniqueMessageIdsBySrcDestKey := make(map[string]map[string]struct{}) sourceDupStats := make(map[dupStatKey]int) @@ -1418,32 +1422,37 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf proc.logger.Warnf("json parsing of event payload for %s: %v", batchEvent.JobID, err) gatewayBatchEvent.Batch = []types.SingularEventT{} } - - writeKey := gatewayBatchEvent.WriteKey + var eventParams types.EventParams + err = jsonfast.Unmarshal(batchEvent.Parameters, &eventParams) + if err != nil { + panic(err) + } + sourceId := eventParams.SourceId requestIP := gatewayBatchEvent.RequestIP receivedAt := gatewayBatchEvent.ReceivedAt // Iterate through all the events in the batch for _, singularEvent := range gatewayBatchEvent.Batch { messageId := misc.GetStringifiedData(singularEvent["messageId"]) - source, sourceError := proc.getSourceByWriteKey(writeKey) + source, sourceError := proc.getSourceBySourceID(sourceId) if sourceError != nil { - proc.logger.Errorf("Dropping Job since Source not found for writeKey %q: %v", writeKey, sourceError) + proc.logger.Errorf("Dropping Job since Source not found for sourceId %q: %v", sourceId, sourceError) continue } if proc.config.enableDedup { payload, _ := jsonfast.Marshal(singularEvent) messageSize := int64(len(payload)) - if ok, previousSize := proc.dedup.Set(dedup.KeyValue{Key: messageId, Value: messageSize}); !ok { - proc.logger.Debugf("Dropping event with duplicate messageId: %s", messageId) + dedupKey := fmt.Sprintf("%v%v", messageId, eventParams.SourceJobRunId) + if ok, previousSize := proc.dedup.Set(dedup.KeyValue{Key: dedupKey, Value: messageSize}); !ok { + proc.logger.Debugf("Dropping event with duplicate dedupKey: %s", dedupKey) sourceDupStats[dupStatKey{sourceID: source.ID, equalSize: messageSize == previousSize}] += 1 continue } - uniqueMessageIds[messageId] = struct{}{} + dedupKeys[dedupKey] = struct{}{} } - proc.updateSourceEventStatsDetailed(singularEvent, writeKey) + proc.updateSourceEventStatsDetailed(singularEvent, sourceId) // We count this as one, not destination specific ones totalEvents++ @@ -1457,6 +1466,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf batchEvent, receivedAt, source, + eventParams, ) payloadFunc := ro.Memoize(func() json.RawMessage { @@ -1534,12 +1544,12 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf // Getting all the destinations which are enabled for this // event - if !proc.isDestinationAvailable(singularEvent, writeKey) { + if !proc.isDestinationAvailable(singularEvent, sourceId) { continue } - if _, ok := groupedEventsByWriteKey[WriteKeyT(writeKey)]; !ok { - groupedEventsByWriteKey[WriteKeyT(writeKey)] = make([]transformer.TransformerEvent, 0) + if _, ok := groupedEventsBySourceId[SourceIDT(sourceId)]; !ok { + groupedEventsBySourceId[SourceIDT(sourceId)] = make([]transformer.TransformerEvent, 0) } shallowEventCopy := transformer.TransformerEvent{} shallowEventCopy.Message = singularEvent @@ -1557,7 +1567,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf shallowEventCopy.Metadata.SourceTpConfig = source.DgSourceTrackingPlanConfig.Config shallowEventCopy.Metadata.MergedTpConfig = source.DgSourceTrackingPlanConfig.GetMergedConfig(commonMetadataFromSingularEvent.EventType) - groupedEventsByWriteKey[WriteKeyT(writeKey)] = append(groupedEventsByWriteKey[WriteKeyT(writeKey)], shallowEventCopy) + groupedEventsBySourceId[SourceIDT(sourceId)] = append(groupedEventsBySourceId[SourceIDT(sourceId)], shallowEventCopy) if proc.isReportingEnabled() { proc.updateMetricMaps(inCountMetadataMap, outCountMap, connectionDetailsMap, destFilterStatusDetailMap, event, jobsdb.Succeeded.State, types.DESTINATION_FILTER, func() json.RawMessage { return []byte(`{}`) }) @@ -1677,7 +1687,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf // Placing the trackingPlan validation filters here. // Else further down events are duplicated by destId, so multiple validation takes places for same event validateEventsStart := time.Now() - validatedEventsByWriteKey, validatedReportMetrics, validatedErrorJobs, trackingPlanEnabledMap := proc.validateEvents(groupedEventsByWriteKey, eventsByMessageID) + validatedEventsBySourceId, validatedReportMetrics, validatedErrorJobs, trackingPlanEnabledMap := proc.validateEvents(groupedEventsBySourceId, eventsByMessageID) validateEventsTime := time.Since(validateEventsStart) defer proc.stats.validateEventsTime.SendTiming(validateEventsTime) @@ -1689,23 +1699,23 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf // TRACKING PLAN - END // The below part further segregates events by sourceID and DestinationID. - for writeKeyT, eventList := range validatedEventsByWriteKey { + for sourceIdT, eventList := range validatedEventsBySourceId { for idx := range eventList { event := &eventList[idx] - writeKey := string(writeKeyT) + sourceId := string(sourceIdT) singularEvent := event.Message - backendEnabledDestTypes := proc.getBackendEnabledDestinationTypes(writeKey) + backendEnabledDestTypes := proc.getBackendEnabledDestinationTypes(sourceId) enabledDestTypes := integrations.FilterClientIntegrations(singularEvent, backendEnabledDestTypes) workspaceID := eventList[idx].Metadata.WorkspaceID workspaceLibraries := proc.getWorkspaceLibraries(workspaceID) - source, _ := proc.getSourceByWriteKey(writeKey) + source, _ := proc.getSourceBySourceID(sourceId) for i := range enabledDestTypes { destType := &enabledDestTypes[i] enabledDestinationsList := proc.filterDestinations( singularEvent, - proc.getEnabledDestinations(writeKey, *destType), + proc.getEnabledDestinations(sourceId, *destType), ) // Adding a singular event multiple times if there are multiple destinations of same type @@ -1764,7 +1774,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf statusList, procErrorJobs, sourceDupStats, - uniqueMessageIds, + dedupKeys, totalEvents, start, @@ -1784,7 +1794,7 @@ type transformationMessage struct { statusList []*jobsdb.JobStatusT procErrorJobs []*jobsdb.JobT sourceDupStats map[dupStatKey]int - uniqueMessageIds map[string]struct{} + dedupKeys map[string]struct{} totalEvents int start time.Time @@ -1862,7 +1872,7 @@ func (proc *Handle) transformations(partition string, in *transformationMessage) in.reportMetrics, in.sourceDupStats, - in.uniqueMessageIds, + in.dedupKeys, in.totalEvents, in.start, in.hasMore, @@ -1879,9 +1889,9 @@ type storeMessage struct { procErrorJobs []*jobsdb.JobT routerDestIDs []string - reportMetrics []*types.PUReportedMetric - sourceDupStats map[dupStatKey]int - uniqueMessageIds map[string]struct{} + reportMetrics []*types.PUReportedMetric + sourceDupStats map[dupStatKey]int + dedupKeys map[string]struct{} totalEvents int start time.Time @@ -1904,8 +1914,8 @@ func (sm *storeMessage) merge(subJob *storeMessage) { for dupStatKey, count := range subJob.sourceDupStats { sm.sourceDupStats[dupStatKey] += count } - for id, v := range subJob.uniqueMessageIds { - sm.uniqueMessageIds[id] = v + for id, v := range subJob.dedupKeys { + sm.dedupKeys[id] = v } sm.totalEvents += subJob.totalEvents } @@ -2060,8 +2070,8 @@ func (proc *Handle) Store(partition string, in *storeMessage) { } if proc.config.enableDedup { proc.updateSourceStats(in.sourceDupStats, "processor.write_key_duplicate_events") - if len(in.uniqueMessageIds) > 0 { - if err := proc.dedup.Commit(lo.Keys(in.uniqueMessageIds)); err != nil { + if len(in.dedupKeys) > 0 { + if err := proc.dedup.Commit(lo.Keys(in.dedupKeys)); err != nil { panic(err) } } @@ -2774,10 +2784,10 @@ func (proc *Handle) filterDestinations( // check if event has eligible destinations to send to // // event will be dropped if no destination is found -func (proc *Handle) isDestinationAvailable(event types.SingularEventT, writeKey string) bool { +func (proc *Handle) isDestinationAvailable(event types.SingularEventT, sourceId string) bool { enabledDestTypes := integrations.FilterClientIntegrations( event, - proc.getBackendEnabledDestinationTypes(writeKey), + proc.getBackendEnabledDestinationTypes(sourceId), ) if len(enabledDestTypes) == 0 { proc.logger.Debug("No enabled destination types") @@ -2790,7 +2800,7 @@ func (proc *Handle) isDestinationAvailable(event types.SingularEventT, writeKey lo.Map( enabledDestTypes, func(destType string, _ int) []backendconfig.DestinationT { - return proc.getEnabledDestinations(writeKey, destType) + return proc.getEnabledDestinations(sourceId, destType) }, ), ), diff --git a/processor/processorBenchmark_test.go b/processor/processorBenchmark_test.go index 842427cd05..6f7899f930 100644 --- a/processor/processorBenchmark_test.go +++ b/processor/processorBenchmark_test.go @@ -6,6 +6,8 @@ import ( "github.com/google/uuid" + "github.com/rudderlabs/rudder-server/utils/types" + backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/jobsdb" ) @@ -17,14 +19,19 @@ func Benchmark_makeCommonMetadataFromSingularEvent(b *testing.B) { WorkspaceID: "test", SourceDefinition: backendconfig.SourceDefinitionT{ Name: "test_def", - Category: "eventStream?", + Category: "eventStream", ID: "testDefId", }, + }, + types.EventParams{ + SourceTaskRunId: "source_task_run_id", + SourceJobRunId: "source_job_run_id", + SourceId: "source_id", }) } } -var dummySingularEvent map[string]interface{} = map[string]interface{}{ +var dummySingularEvent = map[string]interface{}{ "type": "track", "channel": "android-srk", "rudderId": "90ca6da0-292e-4e79-9880-f8009e0ae4a3", @@ -55,7 +62,7 @@ var dummySingularEvent map[string]interface{} = map[string]interface{}{ }, } -var dummyBatchEvent jobsdb.JobT = jobsdb.JobT{ +var dummyBatchEvent = jobsdb.JobT{ UUID: uuid.New(), JobID: 1, UserID: "anon_id", @@ -69,9 +76,9 @@ var dummyBatchEvent jobsdb.JobT = jobsdb.JobT{ WorkspaceId: "test", } -var gwParameters []byte = []byte(`{"batch_id": 1, "source_id": "1rNMpysD4lTuzglyfmPzsmihAbK", "source_job_run_id": ""}`) +var gwParameters = []byte(`{"batch_id": 1, "source_id": "1rNMpysD4lTuzglyfmPzsmihAbK", "source_job_run_id": ""}`) -var payload []byte = []byte( +var payload = []byte( `{ "batch": [ { diff --git a/processor/processor_test.go b/processor/processor_test.go index c576fbfdf9..47c27ed24c 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -558,7 +558,7 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() { EventPayload: nil, EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, - Parameters: nil, + Parameters: createBatchParameters(SourceIDEnabled), }, { UUID: uuid.New(), @@ -588,7 +588,7 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() { EventPayload: nil, EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, - Parameters: nil, + Parameters: createBatchParameters(SourceIDEnabled), }, { UUID: uuid.New(), @@ -599,7 +599,7 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() { EventPayload: nil, EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, - Parameters: nil, + Parameters: createBatchParameters(SourceIDEnabled), }, { UUID: uuid.New(), @@ -618,7 +618,7 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() { createMessagePayload, ), EventCount: 3, - Parameters: createBatchParameters(SourceIDEnabledNoUT), + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), }, } mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) @@ -759,7 +759,7 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { EventPayload: nil, EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, - Parameters: nil, + Parameters: createBatchParameters(SourceIDEnabled), }, { UUID: uuid.New(), @@ -789,7 +789,7 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { EventPayload: nil, EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, - Parameters: nil, + Parameters: createBatchParameters(SourceIDEnabled), }, { UUID: uuid.New(), @@ -800,7 +800,7 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { EventPayload: nil, EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, - Parameters: nil, + Parameters: createBatchParameters(SourceIDEnabled), }, { UUID: uuid.New(), @@ -819,7 +819,7 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { createMessagePayload, // shouldn't be stored to archivedb ), EventCount: 3, - Parameters: createBatchParameters(SourceIDEnabledNoUT), + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), }, } mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) @@ -914,7 +914,7 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { EventPayload: nil, EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, - Parameters: nil, + Parameters: createBatchParameters(SourceIDEnabled), }, { UUID: uuid.New(), @@ -944,7 +944,7 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { EventPayload: nil, EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, - Parameters: nil, + Parameters: createBatchParameters(SourceIDEnabled), }, { UUID: uuid.New(), @@ -955,7 +955,7 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { EventPayload: nil, EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, - Parameters: nil, + Parameters: createBatchParameters(SourceIDEnabled), }, { UUID: uuid.New(), @@ -1150,6 +1150,7 @@ var _ = Describe("Processor", Ordered, func() { expectedSentAt: "2000-01-02T01:23:00.000Z", expectedReceivedAt: "2001-01-02T02:23:45.000Z", integrations: map[string]bool{"All": false, "enabled-destination-a-definition-display-name": true}, + params: map[string]string{"source_id": "enabled-source-no-ut"}, }, // this message should not be delivered to destination A "message-2": { @@ -1159,6 +1160,7 @@ var _ = Describe("Processor", Ordered, func() { expectedOriginalTimestamp: "2000-02-02T01:23:45.000Z", expectedReceivedAt: "2001-01-02T02:23:45.000Z", integrations: map[string]bool{"All": true, "enabled-destination-a-definition-display-name": false}, + params: map[string]string{"source_id": "enabled-source-no-ut"}, }, // this message should be delivered to all destinations "message-3": { @@ -1169,6 +1171,7 @@ var _ = Describe("Processor", Ordered, func() { expectedSentAt: "2000-03-02T01:23:15.000Z", expectedReceivedAt: "2002-01-02T02:23:45.000Z", integrations: map[string]bool{"All": true}, + params: map[string]string{"source_id": "enabled-source-no-ut", "source_job_run_id": "job_run_id_1", "source_task_run_id": "task_run_id_1"}, }, // this message should be delivered to all destinations (default All value) "message-4": { @@ -1178,6 +1181,7 @@ var _ = Describe("Processor", Ordered, func() { expectedOriginalTimestamp: "2000-04-02T02:23:15.000Z", expectedReceivedAt: "2002-01-02T02:23:45.000Z", integrations: map[string]bool{}, + params: map[string]string{"source_id": "enabled-source-no-ut", "source_job_run_id": "job_run_id_1", "source_task_run_id": "task_run_id_1"}, }, // this message should not be delivered to any destination "message-5": { @@ -1185,6 +1189,7 @@ var _ = Describe("Processor", Ordered, func() { jobid: 2010, expectedReceivedAt: "2002-01-02T02:23:45.000Z", integrations: map[string]bool{"All": false}, + params: map[string]string{"source_id": "enabled-source-no-ut", "source_job_run_id": "job_run_id_1", "source_task_run_id": "task_run_id_1"}, }, } @@ -1198,7 +1203,7 @@ var _ = Describe("Processor", Ordered, func() { EventPayload: nil, EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, - Parameters: nil, + Parameters: createBatchParameters(SourceIDEnabled), }, { UUID: uuid.New(), @@ -1212,7 +1217,7 @@ var _ = Describe("Processor", Ordered, func() { []mockEventData{ messages["message-1"], messages["message-2"], - }, createMessagePayload, + }, createMessagePayloadWithoutSources, ), EventCount: 2, LastJobStatus: jobsdb.JobStatusT{}, @@ -1227,7 +1232,7 @@ var _ = Describe("Processor", Ordered, func() { EventPayload: nil, EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, - Parameters: nil, + Parameters: createBatchParameters(SourceIDEnabled), }, { UUID: uuid.New(), @@ -1238,7 +1243,7 @@ var _ = Describe("Processor", Ordered, func() { EventPayload: nil, EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, - Parameters: nil, + Parameters: createBatchParameters(SourceIDEnabled), }, { UUID: uuid.New(), @@ -1254,10 +1259,10 @@ var _ = Describe("Processor", Ordered, func() { messages["message-4"], messages["message-5"], }, - createMessagePayload, + createMessagePayloadWithoutSources, ), EventCount: 3, - Parameters: createBatchParameters(SourceIDEnabledNoUT), + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), }, } mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) @@ -1316,6 +1321,22 @@ var _ = Describe("Processor", Ordered, func() { } }) + c.MockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(nil) + c.mockArchivalDB.EXPECT(). + WithStoreSafeTx( + gomock.Any(), + gomock.Any(), + ).Times(1). + Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) { + _ = f(jobsdb.EmptyStoreSafeTx()) + }).Return(nil) + c.mockArchivalDB.EXPECT(). + StoreInTx(gomock.Any(), gomock.Any(), gomock.Any()). + Times(1). + Do(func(ctx context.Context, tx jobsdb.StoreSafeTx, jobs []*jobsdb.JobT) { + Expect(jobs).To(HaveLen(2)) + }) + c.mockGatewayJobsDB.EXPECT().WithUpdateSafeTx(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, f func(tx jobsdb.UpdateSafeTx) error) { _ = f(jobsdb.EmptyUpdateSafeTx()) }).Return(nil).Times(1) @@ -1390,7 +1411,7 @@ var _ = Describe("Processor", Ordered, func() { EventPayload: nil, EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, - Parameters: nil, + Parameters: createBatchParameters(SourceIDEnabled), }, { UUID: uuid.New(), @@ -1405,7 +1426,7 @@ var _ = Describe("Processor", Ordered, func() { messages["message-1"], messages["message-2"], }, - createMessagePayload, + createMessagePayloadWithoutSources, ), EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, @@ -1419,7 +1440,7 @@ var _ = Describe("Processor", Ordered, func() { CustomVal: gatewayCustomVal[0], EventPayload: nil, EventCount: 1, - Parameters: nil, + Parameters: createBatchParameters(SourceIDEnabled), }, { UUID: uuid.New(), @@ -1429,7 +1450,7 @@ var _ = Describe("Processor", Ordered, func() { CustomVal: gatewayCustomVal[0], EventPayload: nil, EventCount: 1, - Parameters: nil, + Parameters: createBatchParameters(SourceIDEnabled), }, { UUID: uuid.New(), @@ -1445,7 +1466,7 @@ var _ = Describe("Processor", Ordered, func() { messages["message-4"], messages["message-5"], }, - createMessagePayload, + createMessagePayloadWithoutSources, ), EventCount: 1, Parameters: createBatchParameters(SourceIDEnabledOnlyUT), @@ -1516,6 +1537,13 @@ var _ = Describe("Processor", Ordered, func() { } }) + c.mockArchivalDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).AnyTimes().Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) { + _ = f(jobsdb.EmptyStoreSafeTx()) + }).Return(nil) + + c.mockArchivalDB.EXPECT(). + StoreInTx(gomock.Any(), gomock.Any(), gomock.Any()). + AnyTimes() c.mockGatewayJobsDB.EXPECT().WithUpdateSafeTx(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, f func(tx jobsdb.UpdateSafeTx) error) { _ = f(jobsdb.EmptyUpdateSafeTx()) }).Return(nil).Times(1) @@ -1608,15 +1636,6 @@ var _ = Describe("Processor", Ordered, func() { c.MockDedup.EXPECT().Set(gomock.Any()).Return(true, int64(0)).After(callUnprocessed).Times(3) c.MockDedup.EXPECT().Commit(gomock.Any()).Times(1) - c.mockArchivalDB.EXPECT(). - WithStoreSafeTx( - gomock.Any(), - gomock.Any(), - ).Times(1) - c.mockArchivalDB.EXPECT(). - StoreInTx(gomock.Any(), gomock.Any(), gomock.Any()). - Times(0) - // We expect one transform call to destination A, after callUnprocessed. mockTransformer.EXPECT().Transform(gomock.Any(), gomock.Any(), gomock.Any()).Times(0).After(callUnprocessed) // One Store call is expected for all events @@ -1625,6 +1644,14 @@ var _ = Describe("Processor", Ordered, func() { }).Return(nil).Times(1) callStoreRouter := c.mockRouterJobsDB.EXPECT().StoreInTx(gomock.Any(), gomock.Any(), gomock.Len(3)).Times(1) + c.mockArchivalDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).AnyTimes().Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) { + _ = f(jobsdb.EmptyStoreSafeTx()) + }).Return(nil) + + c.mockArchivalDB.EXPECT(). + StoreInTx(gomock.Any(), gomock.Any(), gomock.Any()). + AnyTimes() + c.mockGatewayJobsDB.EXPECT().WithUpdateSafeTx(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, f func(tx jobsdb.UpdateSafeTx) error) { _ = f(jobsdb.EmptyUpdateSafeTx()) }).Return(nil).Times(1) @@ -1760,6 +1787,14 @@ var _ = Describe("Processor", Ordered, func() { assertJobStatus(unprocessedJobsList[0], statuses[0], jobsdb.Succeeded.State) }) + c.mockArchivalDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).AnyTimes().Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) { + _ = f(jobsdb.EmptyStoreSafeTx()) + }).Return(nil) + + c.mockArchivalDB.EXPECT(). + StoreInTx(gomock.Any(), gomock.Any(), gomock.Any()). + AnyTimes() + // will be used to save failed events to failed keys table c.mockWriteProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *jobsdb.Tx) error) { _ = f(&jobsdb.Tx{}) @@ -1886,6 +1921,14 @@ var _ = Describe("Processor", Ordered, func() { FailedEvents: transformerResponses, }) + c.mockArchivalDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).AnyTimes().Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) { + _ = f(jobsdb.EmptyStoreSafeTx()) + }).Return(nil) + + c.mockArchivalDB.EXPECT(). + StoreInTx(gomock.Any(), gomock.Any(), gomock.Any()). + AnyTimes() + c.mockGatewayJobsDB.EXPECT().WithUpdateSafeTx(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, f func(tx jobsdb.UpdateSafeTx) error) { _ = f(jobsdb.EmptyUpdateSafeTx()) }).Return(nil).Times(1) @@ -1974,6 +2017,14 @@ var _ = Describe("Processor", Ordered, func() { assertJobStatus(unprocessedJobsList[0], statuses[0], jobsdb.Succeeded.State) }) + c.mockArchivalDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).AnyTimes().Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) { + _ = f(jobsdb.EmptyStoreSafeTx()) + }).Return(nil) + + c.mockArchivalDB.EXPECT(). + StoreInTx(gomock.Any(), gomock.Any(), gomock.Any()). + AnyTimes() + c.mockWriteProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *jobsdb.Tx) error) { _ = f(&jobsdb.Tx{}) }).Return(nil).Times(0) @@ -2132,12 +2183,12 @@ var _ = Describe("Processor", Ordered, func() { len(processor.filterDestinations( event, processor.getEnabledDestinations( - WriteKey3, + SourceID3, "destination-definition-name-enabled", ), )), ).To(Equal(3)) // all except dest-1 - Expect(processor.isDestinationAvailable(event, WriteKey3)).To(BeTrue()) + Expect(processor.isDestinationAvailable(event, SourceID3)).To(BeTrue()) }) }) }) @@ -3284,6 +3335,7 @@ type mockEventData struct { expectedSentAt string expectedReceivedAt string integrations map[string]bool + params map[string]string } type transformExpectation struct { @@ -3336,6 +3388,10 @@ func createBatchParameters(sourceId string) []byte { return []byte(fmt.Sprintf(`{"source_id":%q}`, sourceId)) } +func createBatchParametersWithSources(sourceId string) []byte { + return []byte(fmt.Sprintf(`{"source_id":%q,"source_job_run_id":"job_run_id_1","source_task_run_id":"task_run_id_1"}`, sourceId)) +} + func assertJobStatus(job *jobsdb.JobT, status *jobsdb.JobStatusT, expectedState string) { Expect(status.JobID).To(Equal(job.JobID)) Expect(status.JobState).To(Equal(expectedState)) @@ -3386,9 +3442,6 @@ func assertDestinationTransform( ) transformer.Response { defer GinkgoRecover() - fmt.Println("clientEvents:", len(clientEvents)) - fmt.Println("expect:", expectations.events) - Expect(clientEvents).To(HaveLen(expectations.events)) messageIDs := make([]string, 0) @@ -3412,10 +3465,14 @@ func assertDestinationTransform( rawEvent, err := json.Marshal(event) Expect(err).ToNot(HaveOccurred()) recordID := gjson.GetBytes(rawEvent, "message.recordId").Value() - Expect(event.Metadata.RecordID).To(Equal(recordID)) - jobRunID := gjson.GetBytes(rawEvent, "message.context.sources.job_run_id").String() + if recordID == nil { + Expect(event.Metadata.RecordID).To(BeNil()) + } else { + Expect(event.Metadata.RecordID).To(Equal(recordID)) + } + jobRunID := messages[messageID].params["source_job_run_id"] Expect(event.Metadata.SourceJobRunID).To(Equal(jobRunID)) - taskRunID := gjson.GetBytes(rawEvent, "message.context.sources.task_run_id").String() + taskRunID := messages[messageID].params["source_task_run_id"] Expect(event.Metadata.SourceTaskRunID).To(Equal(taskRunID)) sourcesJobID := gjson.GetBytes(rawEvent, "message.context.sources.job_id").String() Expect(event.Metadata.SourceJobID).To(Equal(sourcesJobID)) diff --git a/processor/trackingplan.go b/processor/trackingplan.go index 6010c31452..7a6555dcb6 100644 --- a/processor/trackingplan.go +++ b/processor/trackingplan.go @@ -54,16 +54,16 @@ func enhanceWithViolation(response transformer.Response, trackingPlanId string, // validateEvents If the TrackingPlanId exist for a particular write key then we are going to Validate from the transformer. // The Response will contain both the Events and FailedEvents -// 1. eventsToTransform gets added to validatedEventsByWriteKey +// 1. eventsToTransform gets added to validatedEventsBySourceId // 2. failedJobs gets added to validatedErrorJobs -func (proc *Handle) validateEvents(groupedEventsByWriteKey map[WriteKeyT][]transformer.TransformerEvent, eventsByMessageID map[string]types.SingularEventWithReceivedAt) (map[WriteKeyT][]transformer.TransformerEvent, []*types.PUReportedMetric, []*jobsdb.JobT, map[SourceIDT]bool) { - validatedEventsByWriteKey := make(map[WriteKeyT][]transformer.TransformerEvent) +func (proc *Handle) validateEvents(groupedEventsBySourceId map[SourceIDT][]transformer.TransformerEvent, eventsByMessageID map[string]types.SingularEventWithReceivedAt) (map[SourceIDT][]transformer.TransformerEvent, []*types.PUReportedMetric, []*jobsdb.JobT, map[SourceIDT]bool) { + validatedEventsBySourceId := make(map[SourceIDT][]transformer.TransformerEvent) validatedReportMetrics := make([]*types.PUReportedMetric, 0) validatedErrorJobs := make([]*jobsdb.JobT, 0) trackingPlanEnabledMap := make(map[SourceIDT]bool) - for writeKey := range groupedEventsByWriteKey { - eventList := groupedEventsByWriteKey[writeKey] + for sourceId := range groupedEventsBySourceId { + eventList := groupedEventsBySourceId[sourceId] validationStat := proc.newValidationStat(&eventList[0].Metadata) validationStat.numEvents.Count(len(eventList)) proc.logger.Debug("Validation input size", len(eventList)) @@ -72,8 +72,8 @@ func (proc *Handle) validateEvents(groupedEventsByWriteKey map[WriteKeyT][]trans isTpExists := eventList[0].Metadata.TrackingPlanId != "" if !isTpExists { // pass on the jobs for transformation(User, Dest) - validatedEventsByWriteKey[writeKey] = make([]transformer.TransformerEvent, 0) - validatedEventsByWriteKey[writeKey] = append(validatedEventsByWriteKey[writeKey], eventList...) + validatedEventsBySourceId[sourceId] = make([]transformer.TransformerEvent, 0) + validatedEventsBySourceId[sourceId] = append(validatedEventsBySourceId[sourceId], eventList...) continue } @@ -85,8 +85,8 @@ func (proc *Handle) validateEvents(groupedEventsByWriteKey map[WriteKeyT][]trans // This is a safety check we are adding so that if something unexpected comes from transformer // We are ignoring it. if (len(response.Events) + len(response.FailedEvents)) != len(eventList) { - validatedEventsByWriteKey[writeKey] = make([]transformer.TransformerEvent, 0) - validatedEventsByWriteKey[writeKey] = append(validatedEventsByWriteKey[writeKey], eventList...) + validatedEventsBySourceId[sourceId] = make([]transformer.TransformerEvent, 0) + validatedEventsBySourceId[sourceId] = append(validatedEventsBySourceId[sourceId], eventList...) continue } @@ -122,10 +122,10 @@ func (proc *Handle) validateEvents(groupedEventsByWriteKey map[WriteKeyT][]trans if len(eventsToTransform) == 0 { continue } - validatedEventsByWriteKey[writeKey] = make([]transformer.TransformerEvent, 0) - validatedEventsByWriteKey[writeKey] = append(validatedEventsByWriteKey[writeKey], eventsToTransform...) + validatedEventsBySourceId[sourceId] = make([]transformer.TransformerEvent, 0) + validatedEventsBySourceId[sourceId] = append(validatedEventsBySourceId[sourceId], eventsToTransform...) } - return validatedEventsByWriteKey, validatedReportMetrics, validatedErrorJobs, trackingPlanEnabledMap + return validatedEventsBySourceId, validatedReportMetrics, validatedErrorJobs, trackingPlanEnabledMap } // makeCommonMetadataFromTransformerEvent Creates a new Metadata instance diff --git a/processor/worker.go b/processor/worker.go index d637f8a52f..e3009bc646 100644 --- a/processor/worker.go +++ b/processor/worker.go @@ -102,7 +102,7 @@ func (w *worker) start() { if firstSubJob { mergedJob = &storeMessage{ rsourcesStats: subJob.rsourcesStats, - uniqueMessageIds: make(map[string]struct{}), + dedupKeys: make(map[string]struct{}), procErrorJobsByDestID: make(map[string][]*jobsdb.JobT), sourceDupStats: make(map[dupStatKey]int), start: subJob.start, diff --git a/router/utils/utils.go b/router/utils/utils.go index 5918ba57ea..fe8e258427 100644 --- a/router/utils/utils.go +++ b/router/utils/utils.go @@ -4,10 +4,10 @@ import ( "strings" "time" - "github.com/tidwall/gjson" - "github.com/tidwall/sjson" "golang.org/x/exp/slices" + "github.com/tidwall/sjson" + "github.com/rudderlabs/rudder-go-kit/config" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/jobsdb" @@ -64,14 +64,9 @@ func getRetentionTimeForDestination(destID string) time.Duration { func ToBeDrained(job *jobsdb.JobT, destID, toAbortDestinationIDs string, destinationsMap map[string]*DestinationWithSources) (bool, string) { // drain if job is older than the destination's retention time - jobReceivedAt := gjson.GetBytes(job.Parameters, "received_at") - if jobReceivedAt.Exists() { - jobReceivedAtTime, err := time.Parse(misc.RFC3339Milli, jobReceivedAt.String()) - if err == nil { - if time.Since(jobReceivedAtTime) > getRetentionTimeForDestination(destID) { - return true, "job expired" - } - } + createdAt := job.CreatedAt + if time.Since(createdAt) > getRetentionTimeForDestination(destID) { + return true, "job expired" } if d, ok := destinationsMap[destID]; ok && !d.Destination.Enabled { diff --git a/utils/types/types.go b/utils/types/types.go index 4c10f6ce01..3cf59da8ba 100644 --- a/utils/types/types.go +++ b/utils/types/types.go @@ -22,11 +22,16 @@ type SingularEventWithReceivedAt struct { // GatewayBatchRequest batch request structure type GatewayBatchRequest struct { Batch []SingularEventT `json:"batch"` - WriteKey string `json:"writeKey"` RequestIP string `json:"requestIP"` ReceivedAt time.Time `json:"receivedAt"` } +type EventParams struct { + SourceJobRunId string `json:"source_job_run_id"` + SourceId string `json:"source_id"` + SourceTaskRunId string `json:"source_task_run_id"` +} + // UserSuppression is interface to access Suppress user feature type UserSuppression interface { GetSuppressedUser(workspaceID, userID, sourceID string) *model.Metadata