Skip to content

Commit

Permalink
chore: add mock reporting server in mtu integration tests (#4894)
Browse files Browse the repository at this point in the history
* chore: add mock reporting server in mtu integration tests

* addressed comments
  • Loading branch information
mihir20 committed Jul 19, 2024
1 parent 96558d0 commit 55cdb59
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 58 deletions.
1 change: 1 addition & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ jobs:
- integration_test/warehouse
- integration_test/tracing
- integration_test/backendconfigunavailability
- integration_test/trackedusersreporting
- processor
- regulation-worker
- router
Expand Down
247 changes: 189 additions & 58 deletions integration_test/trackedusersreporting/tracked_users_reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math/rand"
Expand All @@ -12,9 +13,14 @@ import (
"os"
"path"
"strconv"
"sync"
"testing"
"time"

"github.com/rudderlabs/rudder-go-kit/testhelper/httptest"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-server/testhelper/transformertest"

"github.com/segmentio/go-hll"
Expand Down Expand Up @@ -46,13 +52,161 @@ type testConfig struct {
webhook *webhookutil.Recorder
configBEServer *nethttptest.Server
transformerUrl string
reportingServer *mockReportingServer
}

type userIdentifier struct {
userID string
anonymousID string
}

type mockReportingServer struct {
Server *httptest.Server
hllMap map[string]map[string]struct {
userIDHll *hll.Hll
anonIDHll *hll.Hll
identifiedUsersHll *hll.Hll
}
hllMapMutex sync.RWMutex
}

func newMockReportingServer() *mockReportingServer {
whr := mockReportingServer{
hllMap: make(map[string]map[string]struct {
userIDHll *hll.Hll
anonIDHll *hll.Hll
identifiedUsersHll *hll.Hll
}),
}
whr.Server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var err error
defer func() {
if err != nil {
http.Error(w, fmt.Sprint(err), http.StatusInternalServerError)
return
} else {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
return
}
}()
type trackedUsersEntry struct {
ReportedAt time.Time `json:"reportedAt"`
WorkspaceID string `json:"workspaceId"`
SourceID string `json:"sourceId"`
InstanceID string `json:"instanceId"`
UserIDHLLHex string `json:"userIdHLL"`
AnonymousIDHLLHex string `json:"anonymousIdHLL"`
IdentifiedAnonymousIDHLLHex string `json:"identifiedAnonymousIdHLL"`
}
unmarshalledReq := make([]*trackedUsersEntry, 0)
err = json.NewDecoder(r.Body).Decode(&unmarshalledReq)
if err != nil {
return
}
whr.hllMapMutex.Lock()
for _, e := range unmarshalledReq {
if whr.hllMap[e.WorkspaceID] == nil {
whr.hllMap[e.WorkspaceID] = make(map[string]struct {
userIDHll *hll.Hll
anonIDHll *hll.Hll
identifiedUsersHll *hll.Hll
})
}
cardinalityMap := whr.hllMap[e.WorkspaceID][e.SourceID]
var userHllBytes, annIDHllBytes, combineHllBytes []byte
var userHll, annHll, combHll hll.Hll
userHllBytes, err = hex.DecodeString(e.UserIDHLLHex)
if err != nil {
return
}
userHll, err = hll.FromBytes(userHllBytes)
if err != nil {
return
}
if cardinalityMap.userIDHll == nil {
cardinalityMap.userIDHll = &userHll
} else {
cardinalityMap.userIDHll.Union(userHll)
}
annIDHllBytes, err = hex.DecodeString(e.AnonymousIDHLLHex)
if err != nil {
return
}
annHll, err := hll.FromBytes(annIDHllBytes)
if err != nil {
return
}
if cardinalityMap.anonIDHll == nil {
cardinalityMap.anonIDHll = &annHll
} else {
cardinalityMap.anonIDHll.Union(annHll)
}
combineHllBytes, err = hex.DecodeString(e.IdentifiedAnonymousIDHLLHex)
if err != nil {
return
}
combHll, err = hll.FromBytes(combineHllBytes)
if err != nil {
return
}
if cardinalityMap.identifiedUsersHll == nil {
cardinalityMap.identifiedUsersHll = &combHll
} else {
cardinalityMap.identifiedUsersHll.Union(combHll)
}
whr.hllMap[e.WorkspaceID][e.SourceID] = cardinalityMap
}
whr.hllMapMutex.Unlock()
}))

return &whr
}

func (m *mockReportingServer) Close() {
m.Server.Close()
}

func (m *mockReportingServer) getCardinalityFromReportingServer() map[string]map[string]struct {
userIDCount int
anonIDCount int
identifiedUsersCount int
} {
m.hllMapMutex.RLock()
defer m.hllMapMutex.RUnlock()
return lo.MapEntries(m.hllMap, func(workspaceID string, mp map[string]struct {
userIDHll *hll.Hll
anonIDHll *hll.Hll
identifiedUsersHll *hll.Hll
}) (string, map[string]struct {
userIDCount int
anonIDCount int
identifiedUsersCount int
},
) {
return workspaceID, lo.MapEntries(mp, func(sourceID string, value struct {
userIDHll *hll.Hll
anonIDHll *hll.Hll
identifiedUsersHll *hll.Hll
}) (string, struct {
userIDCount int
anonIDCount int
identifiedUsersCount int
},
) {
return sourceID, struct {
userIDCount int
anonIDCount int
identifiedUsersCount int
}{
userIDCount: int(value.userIDHll.Cardinality()),
anonIDCount: int(value.anonIDHll.Cardinality()),
identifiedUsersCount: int(value.identifiedUsersHll.Cardinality()),
}
})
})
}

func TestTrackedUsersReporting(t *testing.T) {
tc := setup(t)

Expand All @@ -61,7 +215,7 @@ func TestTrackedUsersReporting(t *testing.T) {

wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
err := runRudderServer(t, ctx, tc.gwPort, tc.postgresResource, tc.configBEServer.URL, t.TempDir(), tc.transformerUrl)
err := runRudderServer(t, ctx, tc)
if err != nil {
t.Logf("rudder-server exited with error: %v", err)
}
Expand All @@ -85,55 +239,23 @@ func TestTrackedUsersReporting(t *testing.T) {
return tc.webhook.RequestsCount() == eventsCount
}, 1*time.Minute, 5*time.Second, "unexpected number of events received, count of events: %d", tc.webhook.RequestsCount())

// TODO: once flusher is implemented, add a mock reporting server to check is we are getting correct cardinality of users
cardinalityMap := getCardinalityFromDB(t, tc.postgresResource)
require.Equal(t, 2, cardinalityMap[workspaceID][sourceID])
require.Eventuallyf(t, func() bool {
return getCardinalityOfReportingTable(t, tc.postgresResource) == 0
}, 1*time.Minute, 5*time.Second, "data not reported to reporting service")

cardinalityMap := tc.reportingServer.getCardinalityFromReportingServer()
require.Equal(t, 2, cardinalityMap[workspaceID][sourceID].userIDCount)
require.Equal(t, 2, cardinalityMap[workspaceID][sourceID].anonIDCount)
require.Equal(t, 2, cardinalityMap[workspaceID][sourceID].identifiedUsersCount)
cancel()
require.NoError(t, wg.Wait())
}

func getCardinalityFromDB(t *testing.T, postgresResource *postgres.Resource) map[string]map[string]int {
type trackedUsersEntry struct {
WorkspaceID string
SourceID string
InstanceID string
userIDHll string
annIDHll string
combHll string
}
rows, err := postgresResource.DB.Query("SELECT workspace_id, source_id, instance_id, userid_hll, anonymousid_hll, identified_anonymousid_hll FROM tracked_users_reports")
func getCardinalityOfReportingTable(t *testing.T, db *postgres.Resource) int {
var count int
err := db.DB.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s", "tracked_users_reports")).Scan(&count)
require.NoError(t, err)
require.NoError(t, rows.Err())
defer func() { _ = rows.Close() }()
var entry trackedUsersEntry
entries := make([]trackedUsersEntry, 0)
for rows.Next() {
err = rows.Scan(&entry.WorkspaceID, &entry.SourceID, &entry.InstanceID, &entry.userIDHll, &entry.annIDHll, &entry.combHll)
require.NoError(t, err)
entries = append(entries, entry)
}
result := make(map[string]map[string]int)
for _, e := range entries {
if result[e.WorkspaceID] == nil {
result[e.WorkspaceID] = make(map[string]int)
}
userHllBytes, err := hex.DecodeString(e.userIDHll)
require.NoError(t, err)
userHll, err := hll.FromBytes(userHllBytes)
require.NoError(t, err)
result[e.WorkspaceID][e.SourceID] += int(userHll.Cardinality())
annIDHllBytes, err := hex.DecodeString(e.annIDHll)
require.NoError(t, err)
annHll, err := hll.FromBytes(annIDHllBytes)
require.NoError(t, err)
result[e.WorkspaceID][e.SourceID] += int(annHll.Cardinality())
combineHllBytes, err := hex.DecodeString(e.combHll)
require.NoError(t, err)
combHll, err := hll.FromBytes(combineHllBytes)
require.NoError(t, err)
result[e.WorkspaceID][e.SourceID] -= int(combHll.Cardinality())
}
return result
return count
}

func setup(t testing.TB) testConfig {
Expand All @@ -155,6 +277,9 @@ func setup(t testing.TB) testConfig {
t.Cleanup(webhook.Close)
webhookURL := webhook.Server.URL

reportingServer := newMockReportingServer()
t.Cleanup(reportingServer.Close)

trServer := transformertest.NewBuilder().
WithDestTransformHandler(
"WEBHOOK",
Expand Down Expand Up @@ -189,23 +314,22 @@ func setup(t testing.TB) testConfig {
webhook: webhook,
configBEServer: bcServer,
transformerUrl: trServer.URL,
reportingServer: reportingServer,
}
}

func runRudderServer(
t testing.TB,
ctx context.Context,
port int,
postgresContainer *postgres.Resource,
cbURL, tmpDir, transformerURL string,
tc testConfig,
) (err error) {
t.Setenv("CONFIG_BACKEND_URL", cbURL)
t.Setenv("CONFIG_BACKEND_URL", tc.configBEServer.URL)
t.Setenv("WORKSPACE_TOKEN", "token")
t.Setenv("DEST_TRANSFORM_URL", transformerURL)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.port"), postgresContainer.Port)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.user"), postgresContainer.User)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.name"), postgresContainer.Database)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.password"), postgresContainer.Password)
t.Setenv("DEST_TRANSFORM_URL", tc.transformerUrl)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.port"), tc.postgresResource.Port)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.user"), tc.postgresResource.User)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.name"), tc.postgresResource.Database)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.password"), tc.postgresResource.Password)

t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.mode"), "off")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DestinationDebugger.disableEventDeliveryStatusUploads"), "true")
Expand All @@ -217,15 +341,21 @@ func runRudderServer(
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.syncer.enabled"), "false")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "BatchRouter.mainLoopFreq"), "1s")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "BatchRouter.uploadFreq"), "1s")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.webPort"), strconv.Itoa(port))
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.webPort"), strconv.Itoa(tc.gwPort))
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "RUDDER_TMPDIR"), os.TempDir())
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "recovery.storagePath"), path.Join(tmpDir, "/recovery_data.json"))
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "recovery.storagePath"), path.Join(t.TempDir(), "/recovery_data.json"))
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "recovery.enabled"), "false")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Profiler.Enabled"), "false")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.enableSuppressUserFeature"), "false")
// enable tracked users feature
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "TrackedUsers.enabled"), "true")

// setup reporting server
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "REPORTING_URL"), tc.reportingServer.Server.URL)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.flusher.flushWindow"), "1s")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.flusher.recentExclusionWindowInSeconds"), "1s")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.flusher.sleepInterval"), "2s")
// so that multiple processor batches are processed
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Processor.maxLoopProcessEvents"), "10")
t.Setenv("Processor.maxRetry", strconv.Itoa(1))

defer func() {
Expand All @@ -249,7 +379,8 @@ func sendEvents(
) (int, error) {
count := 0
for _, identifier := range identifiers {
num := rand.Intn(10)
// generate 1 or more events
num := 1 + rand.Intn(100)
for i := 0; i < num; i++ {
count++
payload := []byte(fmt.Sprintf(`
Expand Down

0 comments on commit 55cdb59

Please sign in to comment.