Skip to content

Commit

Permalink
chore: add support for live events in internal batch endpoint (#4627)
Browse files Browse the repository at this point in the history
* chore: add support for live events in internal batch endpoint

* added unit tests for live events handling in internal batch endpoint
  • Loading branch information
mihir20 committed Apr 30, 2024
1 parent 581d5aa commit 26ecd98
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 17 deletions.
32 changes: 25 additions & 7 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
mocksJobsDB "github.com/rudderlabs/rudder-server/mocks/jobsdb"
mocksTypes "github.com/rudderlabs/rudder-server/mocks/utils/types"
sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source"
mocksrcdebugger "github.com/rudderlabs/rudder-server/services/debugger/source/mocks"
"github.com/rudderlabs/rudder-server/services/rsources"
"github.com/rudderlabs/rudder-server/services/transformer"
"github.com/rudderlabs/rudder-server/utils/misc"
Expand Down Expand Up @@ -77,6 +78,8 @@ const (
RETLSourceType = "retl"
sdkLibrary = "sdkLibrary"
sdkVersion = "v1.2.3"

writeKeyNotPresentInSource = "write-key-not-present-for-source"
)

var (
Expand Down Expand Up @@ -1635,9 +1638,10 @@ var _ = Describe("Gateway", func() {
ctx context.Context
cancel func()
wait chan struct{}
srcDebugger *mocksrcdebugger.MockSourceDebugger
)

createInternalBatchPayload := func(userID, sourceID, workspaceID string) []byte {
createInternalBatchPayload := func(userID, sourceID string) []byte {
validData := fmt.Sprintf(
`{"userId":%q,"data":{"string":"valid-json","nested":{"child":1}},%s}`,
userID, sdkContext,
Expand All @@ -1656,7 +1660,7 @@ var _ = Describe("Gateway", func() {
"traceID": "traceID"
},
"payload": %s
}]`, workspaceID, userID, sourceID, validData)
}]`, WorkspaceID, userID, sourceID, validData)
return []byte(internalBatchPayload)
}

Expand Down Expand Up @@ -1695,6 +1699,7 @@ var _ = Describe("Gateway", func() {
c.mockSuppressUser.EXPECT().GetSuppressedUser(WorkspaceID, SuppressedUserID, SourceIDEnabled).Return(&model.Metadata{
CreatedAt: time.Now(),
}).AnyTimes()
c.mockSuppressUser.EXPECT().GetSuppressedUser(WorkspaceID, NormalUserID, writeKeyNotPresentInSource).Return(nil).AnyTimes()

conf = config.New()
conf.Set("Gateway.enableRateLimit", false)
Expand All @@ -1708,7 +1713,8 @@ var _ = Describe("Gateway", func() {
GinkgoT().Setenv("RSERVER_GATEWAY_WEB_PORT", strconv.Itoa(serverPort))

gateway = &Handle{}
err = gateway.Setup(context.Background(), conf, logger.NOP, stats.NOP, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService(), nil)
srcDebugger = mocksrcdebugger.NewMockSourceDebugger(c.mockCtrl)
err = gateway.Setup(context.Background(), conf, logger.NOP, stats.NOP, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), srcDebugger, nil)
Expect(err).To(BeNil())
waitForBackendConfigInit(gateway)
c.mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).AnyTimes()
Expand Down Expand Up @@ -1736,9 +1742,20 @@ var _ = Describe("Gateway", func() {
Expect(err).To(BeNil())
})

It("Successful request", func() {
It("Successful request, with debugger", func() {
srcDebugger.EXPECT().RecordEvent(WriteKeyEnabled, gomock.Any()).Times(1)
c.mockJobsDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).Times(1)
payload := createInternalBatchPayload(NormalUserID, SourceIDEnabled)
req, err := http.NewRequest(http.MethodPost, internalBatchEndpoint, bytes.NewBuffer(payload))
Expect(err).To(BeNil())
resp, err := client.Do(req)
Expect(err).To(BeNil())
Expect(http.StatusOK, resp.StatusCode)
})

It("Successful request, without debugger", func() {
c.mockJobsDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).Times(1)
payload := createInternalBatchPayload(NormalUserID, SourceIDEnabled, WorkspaceID)
payload := createInternalBatchPayload(NormalUserID, writeKeyNotPresentInSource)
req, err := http.NewRequest(http.MethodPost, internalBatchEndpoint, bytes.NewBuffer(payload))
Expect(err).To(BeNil())
resp, err := client.Do(req)
Expand Down Expand Up @@ -1783,7 +1800,7 @@ var _ = Describe("Gateway", func() {
})

It("request success - suppressed user", func() {
payload := createInternalBatchPayload(SuppressedUserID, SourceIDEnabled, WorkspaceID)
payload := createInternalBatchPayload(SuppressedUserID, SourceIDEnabled)
req, err := http.NewRequest(http.MethodPost, internalBatchEndpoint, bytes.NewBuffer(payload))
Expect(err).To(BeNil())
resp, err := client.Do(req)
Expand All @@ -1792,6 +1809,7 @@ var _ = Describe("Gateway", func() {
})

It("request success - multiple messages", func() {
srcDebugger.EXPECT().RecordEvent(WriteKeyEnabled, gomock.Any()).Times(2)
c.mockJobsDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).Times(1)
payload := createInternalBatchPayloadWithMultipleEvents(NormalUserID, SourceIDEnabled, WorkspaceID)
req, err := http.NewRequest(http.MethodPost, internalBatchEndpoint, bytes.NewBuffer(payload))
Expand All @@ -1803,7 +1821,7 @@ var _ = Describe("Gateway", func() {

It("request failed db error", func() {
c.mockJobsDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).Times(1).Return(fmt.Errorf("db error"))
payload := createInternalBatchPayload(NormalUserID, SourceIDEnabled, WorkspaceID)
payload := createInternalBatchPayload(NormalUserID, SourceIDEnabled)
req, err := http.NewRequest(http.MethodPost, internalBatchEndpoint, bytes.NewBuffer(payload))
Expect(err).To(BeNil())
resp, err := client.Do(req)
Expand Down
39 changes: 35 additions & 4 deletions gateway/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,19 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc {
stats.CountType,
gw.newReqTypeStatsTagsWithReason(reqType, ""),
).Count(len(jobs))

// Sending events to config backend
for _, job := range jobs {
sourceID := gjson.GetBytes(job.Parameters, "source_id").String()
writeKey, ok := gw.getWriteKeyFromSourceID(sourceID)
if !ok {
gw.logger.Warnn("unable to get writeKey for job",
logger.NewStringField("uuid", job.UUID.String()),
obskit.SourceID(sourceID))
continue
}
gw.sourcehandle.RecordEvent(writeKey, job.EventPayload)
}
}

status = http.StatusOK
Expand Down Expand Up @@ -756,16 +769,16 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
return nil, errors.New(response.InvalidStreamMessage)
}
if isUserSuppressed(msg.Properties.WorkspaceID, msg.Properties.UserID, msg.Properties.SourceID) {
sourceConfig := gw.getSourceConfigFromSourceID(msg.Properties.SourceID)
gw.logger.Infon("suppressed event",
obskit.SourceID(msg.Properties.SourceID),
obskit.WorkspaceID(msg.Properties.WorkspaceID),
logger.NewStringField("userIDFromReq", msg.Properties.UserID),
)
arctx := gw.authRequestContextForSourceID(msg.Properties.SourceID)
gw.stats.NewTaggedStat(
"gateway.write_key_suppressed_events",
stats.CountType,
gw.newSourceStatTagsWithReason(arctx, reqType, errEventSuppressed.Error()),
gw.newSourceStatTagsWithReason(&sourceConfig, reqType, errEventSuppressed.Error()),
).Increment()
continue
}
Expand Down Expand Up @@ -801,9 +814,9 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
if err != nil {
return nil, fmt.Errorf("marshalling event batch: %w", err)
}

jobUUID := uuid.New()
jobs = append(jobs, &jobsdb.JobT{
UUID: uuid.New(),
UUID: jobUUID,
UserID: msg.Properties.RoutingKey,
Parameters: marshalledParams,
CustomVal: customVal,
Expand All @@ -819,6 +832,24 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
return jobs, nil
}

func (gw *Handle) getSourceConfigFromSourceID(sourceID string) backendconfig.SourceT {
gw.configSubscriberLock.RLock()
defer gw.configSubscriberLock.RUnlock()
if s, ok := gw.sourceIDSourceMap[sourceID]; ok {
return s
}
return backendconfig.SourceT{}
}

func (gw *Handle) getWriteKeyFromSourceID(sourceID string) (string, bool) {
gw.configSubscriberLock.RLock()
defer gw.configSubscriberLock.RUnlock()
if s, ok := gw.sourceIDSourceMap[sourceID]; ok {
return s.WriteKey, true
}
return "", false
}

func (gw *Handle) storeJobs(ctx context.Context, jobs []*jobsdb.JobT) error {
ctx, cancel := context.WithTimeout(ctx, gw.conf.WriteTimeout)
defer cancel()
Expand Down
14 changes: 8 additions & 6 deletions gateway/handle_observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package gateway

import (
"github.com/rudderlabs/rudder-go-kit/stats"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
gwstats "github.com/rudderlabs/rudder-server/gateway/internal/stats"
gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types"
"github.com/rudderlabs/rudder-server/utils/misc"
)

// NewSourceStat creates a new source stat for a gateway request
Expand All @@ -18,14 +20,14 @@ func (gw *Handle) NewSourceStat(arctx *gwtypes.AuthRequestContext, reqType strin
}
}

func (gw *Handle) newSourceStatTagsWithReason(arctx *gwtypes.AuthRequestContext, reqType, reason string) stats.Tags {
func (gw *Handle) newSourceStatTagsWithReason(s *backendconfig.SourceT, reqType, reason string) stats.Tags {
tags := stats.Tags{
"source": arctx.SourceTag(),
"source_id": arctx.SourceID,
"write_key": arctx.WriteKey,
"source": misc.GetTagName(s.WriteKey, s.Name),
"source_id": s.ID,
"write_key": s.WriteKey,
"req_type": reqType,
"workspace_id": arctx.WorkspaceID,
"source_type": arctx.SourceCategory,
"workspace_id": s.WorkspaceID,
"source_type": s.SourceDefinition.Category,
}
if reason != "" {
tags["reason"] = reason
Expand Down
1 change: 1 addition & 0 deletions services/debugger/source/eventUploader.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package sourcedebugger

//go:generate mockgen -destination=./mocks/mock.go -package=mocks github.com/rudderlabs/rudder-server/services/debugger/source SourceDebugger
import (
"context"
"encoding/json"
Expand Down
60 changes: 60 additions & 0 deletions services/debugger/source/mocks/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 26ecd98

Please sign in to comment.