Skip to content

Commit

Permalink
chore: add receivedAt and requestIP in payload [PIPE-1135] (#4764)
Browse files Browse the repository at this point in the history
  • Loading branch information
BonapartePC authored Jun 6, 2024
1 parent d9884fd commit bc595d5
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 0 deletions.
98 changes: 98 additions & 0 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-schemas/go/stream"
"github.com/rudderlabs/rudder-server/utils/httputil"

kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
Expand Down Expand Up @@ -1893,6 +1894,103 @@ var _ = Describe("Gateway", func() {
Expect(http.StatusInternalServerError, resp.StatusCode)
})
})

Context("extractJobsFromInternalBatchPayload", func() {
var gateway *Handle
BeforeEach(func() {
c.initializeAppFeatures()
gateway = &Handle{}
err := gateway.Setup(context.Background(), conf, logger.NOP, stats.NOP, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService(), nil)
Expect(err).To(BeNil())
waitForBackendConfigInit(gateway)
})

AfterEach(func() {
err := gateway.Shutdown()
Expect(err).To(BeNil())
})

It("doesn't override if receivedAt or requestIP already exists in payload", func() {
properties := stream.MessageProperties{
MessageID: "messageID",
RoutingKey: "anonymousId_header<<>>anonymousId_1<<>>identified_user_id",
WorkspaceID: "workspaceID",
SourceID: "sourceID",
ReceivedAt: time.Date(2024, 1, 1, 1, 1, 1, 1, time.UTC),
RequestIP: "dummyIP",
DestinationID: "destinationID",
}
msg := stream.Message{
Properties: properties,
Payload: []byte(`{"receivedAt": "dummyReceivedAtFromPayload", "requestIP": "dummyIPFromPayload"}`),
}
messages := []stream.Message{msg}
payload, err := json.Marshal(messages)
Expect(err).To(BeNil())

req := &webRequestT{
reqType: "batch",
authContext: rCtxEnabled,
done: make(chan<- string),
requestPayload: payload,
}
jobForm, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload)
Expect(err).To(BeNil())

var job struct {
Batch []struct {
ReceivedAt string `json:"receivedAt"`
RequestIP string `json:"requestIP"`
} `json:"batch"`
}
Expect(jobForm).To(HaveLen(1))
err = json.Unmarshal(jobForm[0].EventPayload, &job)
Expect(err).To(BeNil())
Expect(job.Batch).To(HaveLen(1))
Expect(job.Batch[0].ReceivedAt).To(ContainSubstring("dummyReceivedAtFromPayload"))
Expect(job.Batch[0].RequestIP).To(ContainSubstring("dummyIPFromPayload"))
})

It("adds receivedAt and requestIP in the request payload if it's not already present", func() {
properties := stream.MessageProperties{
MessageID: "messageID",
RoutingKey: "anonymousId_header<<>>anonymousId_1<<>>identified_user_id",
WorkspaceID: "workspaceID",
SourceID: "sourceID",
ReceivedAt: time.Date(2024, 1, 1, 1, 1, 1, 1, time.UTC),
RequestIP: "dummyIP",
DestinationID: "destinationID",
}
msg := stream.Message{
Properties: properties,
Payload: []byte(`{}`),
}
messages := []stream.Message{msg}
payload, err := json.Marshal(messages)
Expect(err).To(BeNil())
req := &webRequestT{
reqType: "batch",
authContext: rCtxEnabled,
done: make(chan<- string),
requestPayload: payload,
}
jobForm, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload)
Expect(err).To(BeNil())

var job struct {
Batch []struct {
ReceivedAt string `json:"receivedAt"`
RequestIP string `json:"requestIP"`
} `json:"batch"`
}
Expect(jobForm).To(HaveLen(1))
err = json.Unmarshal(jobForm[0].EventPayload, &job)
Expect(err).To(BeNil())
Expect(job.Batch).To(HaveLen(1))
Expect(job.Batch[0].ReceivedAt).To(ContainSubstring("2024-01-01T01:01:01.000Z"))
Expect(job.Batch[0].RequestIP).To(ContainSubstring("dummyIP"))
})
})
})

func unauthorizedRequest(body io.Reader) *http.Request {
Expand Down
23 changes: 23 additions & 0 deletions gateway/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,15 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
)
}

msg.Payload, err = fillReceivedAt(msg.Payload, msg.Properties.ReceivedAt)
if err != nil {
return nil, fmt.Errorf("filling receivedAt: %w", err)
}
msg.Payload, err = fillRequestIP(msg.Payload, msg.Properties.RequestIP)
if err != nil {
return nil, fmt.Errorf("filling requestIP: %w", err)
}

eventBatch := singularEventBatch{
Batch: []json.RawMessage{msg.Payload},
ReceivedAt: msg.Properties.ReceivedAt.Format(misc.RFC3339Milli),
Expand Down Expand Up @@ -852,6 +861,20 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
return jobs, nil
}

func fillReceivedAt(event []byte, receivedAt time.Time) ([]byte, error) {
if !gjson.GetBytes(event, "receivedAt").Exists() {
return sjson.SetBytes(event, "receivedAt", receivedAt.Format(misc.RFC3339Milli))
}
return event, nil
}

func fillRequestIP(event []byte, ip string) ([]byte, error) {
if !gjson.GetBytes(event, "requestIP").Exists() {
return sjson.SetBytes(event, "requestIP", ip)
}
return event, nil
}

func (gw *Handle) getSourceConfigFromSourceID(sourceID string) backendconfig.SourceT {
gw.configSubscriberLock.RLock()
defer gw.configSubscriberLock.RUnlock()
Expand Down

0 comments on commit bc595d5

Please sign in to comment.