Skip to content

Commit

Permalink
feat(gateway): support new event type extract (#2999)
Browse files Browse the repository at this point in the history
As part of supporting a new event type, extract, we are adding
a new endpoint /internal/extract for ingesting the event.
We are not enforcing the event payload to have either userId or anonymousId.
The same is reflected in the test case modification.
  • Loading branch information
debanjan97 committed Mar 9, 2023
1 parent 9a80f45 commit 63dc940
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 26 deletions.
28 changes: 22 additions & 6 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ var BatchEvent = []byte(`
const (
DELIMITER = string("<<>>")
eventStreamSourceCategory = "eventStream"
extractEvent = "extract"
)

func Init() {
Expand Down Expand Up @@ -565,7 +566,12 @@ func (gateway *HandleT) getJobDataFromRequest(req *webRequestT) (jobData *jobFro

anonIDFromReq := strings.TrimSpace(misc.GetStringifiedData(toSet["anonymousId"]))
userIDFromReq := strings.TrimSpace(misc.GetStringifiedData(toSet["userId"]))
if isNonIdentifiable(anonIDFromReq, userIDFromReq) {
eventTypeFromReq, _ := misc.MapLookup(
toSet,
"type",
).(string)

if isNonIdentifiable(anonIDFromReq, userIDFromReq, eventTypeFromReq) {
err = errors.New(response.NonIdentifiableRequest)
return
}
Expand Down Expand Up @@ -620,10 +626,7 @@ func (gateway *HandleT) getJobDataFromRequest(req *webRequestT) (jobData *jobFro
}
toSet["rudderId"] = rudderId
setRandomMessageIDWhenEmpty(toSet)
if eventTypeFromReq, _ := misc.MapLookup(
toSet,
"type",
).(string); eventTypeFromReq == "audiencelist" {
if eventTypeFromReq == "audiencelist" {
containsAudienceList = true
}

Expand Down Expand Up @@ -676,7 +679,11 @@ func (gateway *HandleT) getJobDataFromRequest(req *webRequestT) (jobData *jobFro
return
}

func isNonIdentifiable(anonIDFromReq, userIDFromReq string) bool {
func isNonIdentifiable(anonIDFromReq, userIDFromReq, eventType string) bool {
if eventType == extractEvent {
// extract event is allowed without user id and anonymous id
return false
}
if anonIDFromReq == "" && userIDFromReq == "" {
return !allowReqsWithoutUserIDAndAnonymousID
}
Expand Down Expand Up @@ -839,6 +846,10 @@ func (gateway *HandleT) webAudienceListHandler(w http.ResponseWriter, r *http.Re
gateway.webHandler(w, r, "audiencelist")
}

func (gateway *HandleT) webExtractHandler(w http.ResponseWriter, r *http.Request) {
gateway.webHandler(w, r, "extract")
}

func (gateway *HandleT) webBatchHandler(w http.ResponseWriter, r *http.Request) {
gateway.webHandler(w, r, "batch")
}
Expand Down Expand Up @@ -1236,6 +1247,8 @@ func (gateway *HandleT) StartWebHandler(ctx context.Context) error {
middleware.LimitConcurrentRequests(maxConcurrentRequests),
middleware.UncompressMiddleware,
)
internalMux := srvMux.PathPrefix("/internal").Subrouter() // mux for internal endpoints

srvMux.HandleFunc("/v1/batch", gateway.webBatchHandler).Methods("POST")
srvMux.HandleFunc("/v1/identify", gateway.webIdentifyHandler).Methods("POST")
srvMux.HandleFunc("/v1/track", gateway.webTrackHandler).Methods("POST")
Expand All @@ -1256,6 +1269,9 @@ func (gateway *HandleT) StartWebHandler(ctx context.Context) error {
srvMux.HandleFunc("/version", WithContentType("application/json; charset=utf-8", gateway.versionHandler)).Methods("GET")
srvMux.HandleFunc("/robots.txt", gateway.robots).Methods("GET")

// internal endpoints
internalMux.HandleFunc("/v1/extract", gateway.webExtractHandler).Methods("POST")

if enableEventSchemasFeature {
srvMux.HandleFunc("/schemas/event-models", WithContentType("application/json; charset=utf-8", gateway.eventSchemaWebHandler(gateway.eventSchemaHandler.GetEventModels))).Methods("GET")
srvMux.HandleFunc("/schemas/event-versions", WithContentType("application/json; charset=utf-8", gateway.eventSchemaWebHandler(gateway.eventSchemaHandler.GetEventVersions))).Methods("GET")
Expand Down
83 changes: 63 additions & 20 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,33 +713,63 @@ var _ = Describe("Gateway", func() {
validBody = `{"batch": [{"data": "valid-json"}]}`
reqType = "batch"
}
if handlerType != "extract" {
expectHandlerResponse(
handler,
authorizedRequest(
WriteKeyEnabled,
bytes.NewBufferString(validBody),
),
400,
response.NonIdentifiableRequest+"\n",
)
Eventually(
func() bool {
stat := statsStore.Get(
"gateway.write_key_failed_requests",
map[string]string{
"source": gateway.getSourceTagFromWriteKey(WriteKeyEnabled),
"sourceID": gateway.getSourceIDForWriteKey(WriteKeyEnabled),
"workspaceId": getWorkspaceID(WriteKeyEnabled),
"writeKey": WriteKeyEnabled,
"reqType": reqType,
"reason": response.NonIdentifiableRequest,
"sourceType": sourceType2,
"sdkVersion": "",
},
)
return stat != nil && stat.LastValue() == float64(1)
},
).Should(BeTrue())
}
}
})

It("should allow requests with both userId and anonymousId absent in case of extract events", func() {
extractHandlers := map[string]http.HandlerFunc{
"batch": gateway.webBatchHandler,
"import": gateway.webImportHandler,
"extract": gateway.webExtractHandler,
}
c.mockJobsDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).AnyTimes().Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) {
_ = f(jobsdb.EmptyStoreSafeTx())
}).Return(nil)
c.mockJobsDB.EXPECT().StoreWithRetryEachInTx(gomock.Any(), gomock.Any(), gomock.Any()).Times(3)

validBody := `{"batch": [{"data": "valid-json", "type": "extract"}]}`
for handlerType, handler := range extractHandlers {
if handlerType == "extract" {
validBody = `{"data": "valid-json", "type": "extract"}`
}
expectHandlerResponse(
handler,
authorizedRequest(
WriteKeyEnabled,
bytes.NewBufferString(validBody),
),
400,
response.NonIdentifiableRequest+"\n",
200,
"OK",
)
Eventually(
func() bool {
stat := statsStore.Get(
"gateway.write_key_failed_requests",
map[string]string{
"source": gateway.getSourceTagFromWriteKey(WriteKeyEnabled),
"sourceID": gateway.getSourceIDForWriteKey(WriteKeyEnabled),
"workspaceId": getWorkspaceID(WriteKeyEnabled),
"writeKey": WriteKeyEnabled,
"reqType": reqType,
"reason": response.NonIdentifiableRequest,
"sourceType": sourceType2,
"sdkVersion": "",
},
)
return stat != nil && stat.LastValue() == float64(1)
},
).Should(BeTrue())
}
})

Expand Down Expand Up @@ -985,6 +1015,18 @@ var _ = Describe("Gateway", func() {
_, err = gateway.getJobDataFromRequest(req)
Expect(err).To(BeNil())
})

It("allows extract events even if userID and anonID are not present in the request payload", func() {
req := &webRequestT{
reqType: "batch",
writeKey: WriteKeyEnabled,
done: make(chan<- string),
userIDHeader: userIDHeader,
requestPayload: []byte(`{"batch": [{"type": "extract"}]}`),
}
_, err := gateway.getJobDataFromRequest(req)
Expect(err).To(BeNil())
})
})
})

Expand Down Expand Up @@ -1033,6 +1075,7 @@ func allHandlers(gateway *HandleT) map[string]http.HandlerFunc {
"track": gateway.webTrackHandler,
"import": gateway.webImportHandler,
"audiencelist": gateway.webAudienceListHandler,
"extract": gateway.webExtractHandler,
}
}

Expand Down

0 comments on commit 63dc940

Please sign in to comment.