Skip to content

Commit

Permalink
Merge branch 'master' into mihir/pipe-484
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Jan 8, 2024
2 parents 7fdf520 + b928bfe commit 497aee8
Show file tree
Hide file tree
Showing 52 changed files with 1,550 additions and 200 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ jobs:
- integration_test/reporting_dropped_events
- integration_test/reporting_error_index
- integration_test/warehouse
- integration_test/tracing
- processor
- regulation-worker
- router
Expand All @@ -129,7 +130,7 @@ jobs:
include:
- package: services
exclude: services/rsources

steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
Expand Down
2 changes: 1 addition & 1 deletion app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)

reporting := a.app.Features().Reporting.Setup(ctx, backendconfig.DefaultBackendConfig)
defer reporting.Stop()
syncer := reporting.DatabaseSyncer(types.SyncerConfig{ConnInfo: misc.GetConnectionString(config)})
syncer := reporting.DatabaseSyncer(types.SyncerConfig{ConnInfo: misc.GetConnectionString(config, "reporting")})
g.Go(func() error {
syncer()
return nil
Expand Down
2 changes: 1 addition & 1 deletion app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options

reporting := a.app.Features().Reporting.Setup(ctx, backendconfig.DefaultBackendConfig)
defer reporting.Stop()
syncer := reporting.DatabaseSyncer(types.SyncerConfig{ConnInfo: misc.GetConnectionString(config.Default)})
syncer := reporting.DatabaseSyncer(types.SyncerConfig{ConnInfo: misc.GetConnectionString(config.Default, "reporting")})
g.Go(misc.WithBugsnag(func() error {
syncer()
return nil
Expand Down
12 changes: 10 additions & 2 deletions app/apphandlers/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,17 @@ func NewRsourcesService(deploymentType deployment.Type, shouldSetupSharedDB bool
var rsourcesConfig rsources.JobServiceConfig
rsourcesConfig.MaxPoolSize = config.GetInt("Rsources.MaxPoolSize", 3)
rsourcesConfig.MinPoolSize = config.GetInt("Rsources.MinPoolSize", 1)
rsourcesConfig.LocalConn = misc.GetConnectionString(config.Default)
rsourcesConfig.LocalConn = misc.GetConnectionString(config.Default, "rsources")
rsourcesConfig.LocalHostname = config.GetString("DB.host", "localhost")
rsourcesConfig.SharedConn = config.GetString("SharedDB.dsn", "")
sharedDBConnUrl := config.GetString("SharedDB.dsn", "")
if len(sharedDBConnUrl) != 0 {
var err error
sharedDBConnUrl, err = misc.SetAppNameInDBConnURL(sharedDBConnUrl, "rsources")
if err != nil {
return nil, fmt.Errorf("failed to set application name in dns: %w", err)
}
}
rsourcesConfig.SharedConn = sharedDBConnUrl
rsourcesConfig.SkipFailedRecordsCollection = !config.GetBool("Router.failedKeysEnabled", true)

if deploymentType == deployment.MultiTenantType {
Expand Down
2 changes: 1 addition & 1 deletion backend-config/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (db *cacheStore) Get(ctx context.Context) ([]byte, error) {

// setupDBConn sets up the database connection, creates the config table if it doesn't exist
func setupDBConn() (*sql.DB, error) {
psqlInfo := misc.GetConnectionString(config.Default)
psqlInfo := misc.GetConnectionString(config.Default, "backend-config")
db, err := sql.Open("postgres", psqlInfo)
if err != nil {
pkgLogger.Errorf("failed to open db: %v", err)
Expand Down
4 changes: 3 additions & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,6 @@ PgNotifier:
retriggerInterval: 2s
retriggerCount: 500
trackBatchInterval: 2s
maxAttempt: 3
maxAttempt: 3
Reporting:
eventNameMaxLength: 0
18 changes: 15 additions & 3 deletions enterprise/reporting/event_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strconv"

"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/jobsdb"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand All @@ -14,6 +15,8 @@ type EventStatsReporter struct {
configSubscriber *configSubscriber
}

const EventStream = "event-stream"

func NewEventStatsReporter(configSubscriber *configSubscriber, stats stats.Stats) *EventStatsReporter {
return &EventStatsReporter{
stats: stats,
Expand All @@ -25,16 +28,25 @@ const EventsProcessedMetricName = "events_processed_total"

func (es *EventStatsReporter) Record(metrics []*types.PUReportedMetric) {
for index := range metrics {
sourceCategory := metrics[index].ConnectionDetails.SourceCategory
if sourceCategory == "" {
sourceCategory = EventStream
}
terminal := strconv.FormatBool(metrics[index].PUDetails.TerminalPU)
status := metrics[index].StatusDetail.Status
if status == jobsdb.Aborted.State {
terminal = "true"
}
tags := stats.Tags{
"workspaceId": es.configSubscriber.WorkspaceIDFromSource(metrics[index].ConnectionDetails.SourceID),
"sourceId": metrics[index].ConnectionDetails.SourceID,
"destinationId": metrics[index].ConnectionDetails.DestinationID,
"reportedBy": metrics[index].PUDetails.PU,
"sourceCategory": metrics[index].ConnectionDetails.SourceCategory,
"sourceCategory": sourceCategory,
"statusCode": strconv.Itoa(metrics[index].StatusDetail.StatusCode),
"terminal": strconv.FormatBool(metrics[index].PUDetails.TerminalPU),
"terminal": terminal,
"destinationType": es.configSubscriber.GetDestDetail(metrics[index].ConnectionDetails.DestinationID).destType,
"status": metrics[index].StatusDetail.Status,
"status": status,
}
es.stats.NewTaggedStat(EventsProcessedMetricName, stats.CountType, tags).Count(int(metrics[index].StatusDetail.Count))
}
Expand Down
29 changes: 28 additions & 1 deletion enterprise/reporting/event_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,22 @@ func TestEventStatsReporter(t *testing.T) {
StatusCode: 500,
},
},
{
ConnectionDetails: types.ConnectionDetails{
SourceID: sourceID,
DestinationID: destinationID,
SourceCategory: "",
},
PUDetails: types.PUDetails{
PU: reportedBy,
TerminalPU: true,
},
StatusDetail: &types.StatusDetail{
Count: 20,
Status: jobsdb.Succeeded.State,
StatusCode: 200,
},
},
}
esr := NewEventStatsReporter(cs, statsStore)
esr.Record(testReports)
Expand Down Expand Up @@ -183,7 +199,18 @@ func TestEventStatsReporter(t *testing.T) {
"terminal": "false",
"status": "non-terminal",
}).LastValue(), float64(100))

require.Equal(t, statsStore.Get(EventsProcessedMetricName, map[string]string{
"workspaceId": workspaceID,
"sourceId": sourceID,
"destinationId": destinationID,
"reportedBy": reportedBy,
"sourceCategory": EventStream,
"statusCode": "200",
"destinationType": "test-destination-name",
"terminal": "true",
"status": jobsdb.Succeeded.State,
}).LastValue(), float64(20))
require.Len(t, statsStore.GetAll(), 5)
t.Cleanup(func() {
cancel()
<-subscribeDone
Expand Down
5 changes: 5 additions & 0 deletions enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ func (r *DefaultReporter) Report(ctx context.Context, metrics []*types.PUReporte
}
defer func() { _ = stmt.Close() }()

eventNameMaxLength := config.GetInt("Reporting.eventNameMaxLength", 0)
reportedAt := time.Now().UTC().Unix() / 60
for _, metric := range metrics {
workspaceID := r.configSubscriber.WorkspaceIDFromSource(metric.ConnectionDetails.SourceID)
Expand All @@ -589,6 +590,10 @@ func (r *DefaultReporter) Report(ctx context.Context, metrics []*types.PUReporte
metric = transformMetricForPII(metric, getPIIColumnsToExclude())
}

if eventNameMaxLength > 0 && len(metric.StatusDetail.EventName) > eventNameMaxLength {
metric.StatusDetail.EventName = types.MaxLengthExceeded
}

_, err = stmt.Exec(
workspaceID, r.namespace, r.instanceID,
metric.ConnectionDetails.SourceDefinitionId,
Expand Down
11 changes: 7 additions & 4 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,13 @@ var _ = Describe("Gateway", func() {

var paramsMap, expectedParamsMap map[string]interface{}
_ = json.Unmarshal(job.Parameters, &paramsMap)
expectedStr := []byte(fmt.Sprintf(`{"source_id": "%v", "source_job_run_id": "", "source_task_run_id": ""}`, SourceIDEnabled))
expectedStr := []byte(fmt.Sprintf(
`{"source_id": "%v", "source_job_run_id": "", "source_task_run_id": "", "traceparent": ""}`,
SourceIDEnabled,
))
_ = json.Unmarshal(expectedStr, &expectedParamsMap)
equals := reflect.DeepEqual(paramsMap, expectedParamsMap)
Expect(equals).To(Equal(true))
Expect(equals).To(BeTrue())

Expect(job.CustomVal).To(Equal(customVal))

Expand Down Expand Up @@ -1569,7 +1572,7 @@ var _ = Describe("Gateway", func() {
jobs []*jobsdb.JobT,
) error {
for idx, job := range jobs {
Expect(misc.IsValidUUID(job.UUID.String())).To(Equal(true))
Expect(misc.IsValidUUID(job.UUID.String())).To(BeTrue())
Expect(job.CustomVal).To(Equal("WEBHOOK"))

var paramsMap, expectedParamsMap map[string]interface{}
Expand All @@ -1587,7 +1590,7 @@ var _ = Describe("Gateway", func() {
_ = json.Unmarshal(job.Parameters, &paramsMap)
_ = json.Unmarshal(expectedStr, &expectedParamsMap)
equals := reflect.DeepEqual(paramsMap, expectedParamsMap)
Expect(equals).To(Equal(true))
Expect(equals).To(BeTrue())
}
return nil
}).
Expand Down
23 changes: 21 additions & 2 deletions gateway/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Handle struct {
config *config.Config
logger logger.Logger
stats stats.Stats
tracer stats.Tracer
application app.App
backendConfig backendconfig.BackendConfig
jobsDB jobsdb.JobsDB
Expand Down Expand Up @@ -271,6 +272,9 @@ func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq,

// values retrieved from first event in batch
sourcesJobRunID, sourcesTaskRunID = req.authContext.SourceJobRunID, req.authContext.SourceTaskRunID

// tracing
traceParent = req.traceParent
)

fillMessageID := func(event map[string]interface{}) {
Expand Down Expand Up @@ -423,10 +427,11 @@ func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq,
return
}

params := map[string]interface{}{
params := map[string]any{
"source_id": sourceID,
"source_job_run_id": sourcesJobRunID,
"source_task_run_id": sourcesTaskRunID,
"traceparent": traceParent,
}
marshalledParams, err = json.Marshal(params)
if err != nil {
Expand Down Expand Up @@ -588,6 +593,20 @@ func (gw *Handle) addToWebRequestQ(_ *http.ResponseWriter, req *http.Request, do
}
userWebRequestWorker := gw.findUserWebRequestWorker(workerKey)
ipAddr := misc.GetIPFromReq(req)
webReq := webRequestT{done: done, reqType: reqType, requestPayload: requestPayload, authContext: arctx, ipAddr: ipAddr, userIDHeader: userIDHeader}

traceParent := stats.GetTraceParentFromContext(req.Context())
if traceParent == "" {
gw.logger.Debugw("traceParent not found in request")
}

webReq := webRequestT{
done: done,
reqType: reqType,
requestPayload: requestPayload,
authContext: arctx,
traceParent: traceParent,
ipAddr: ipAddr,
userIDHeader: userIDHeader,
}
userWebRequestWorker.webRequestQ <- &webReq
}
21 changes: 19 additions & 2 deletions gateway/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package gateway
import (
"context"
"net/http"
"time"

"github.com/rudderlabs/rudder-go-kit/stats"
gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types"
"github.com/rudderlabs/rudder-server/gateway/response"
"github.com/rudderlabs/rudder-server/utils/misc"
Expand Down Expand Up @@ -74,12 +76,27 @@ func (gw *Handle) webHandler() http.HandlerFunc {
// webRequestHandler - handles web requests containing rudder events as payload.
// It parses the payload and calls the request handler to process the request.
func (gw *Handle) webRequestHandler(rh RequestHandler, w http.ResponseWriter, r *http.Request) {
reqType := r.Context().Value(gwtypes.CtxParamCallType).(string)
arctx := r.Context().Value(gwtypes.CtxParamAuthRequestContext).(*gwtypes.AuthRequestContext)
ctx := r.Context()
reqType := ctx.Value(gwtypes.CtxParamCallType).(string)
arctx := ctx.Value(gwtypes.CtxParamAuthRequestContext).(*gwtypes.AuthRequestContext)

ctx, span := gw.tracer.Start(ctx, "gw.webRequestHandler", stats.SpanKindServer,
stats.SpanWithTimestamp(time.Now()),
stats.SpanWithTags(stats.Tags{
"reqType": reqType,
"path": r.URL.Path,
"workspaceId": arctx.WorkspaceID,
"sourceId": arctx.SourceID,
}),
)
r = r.WithContext(ctx)

gw.logger.LogRequest(r)
var errorMessage string
defer func() {
defer span.End()
if errorMessage != "" {
span.SetStatus(stats.SpanStatusError, errorMessage)
status := response.GetErrorStatusCode(errorMessage)
responseBody := response.GetStatus(errorMessage)
gw.logger.Infow("response",
Expand Down
1 change: 1 addition & 0 deletions gateway/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (gw *Handle) Setup(
gw.config = config
gw.logger = logger
gw.stats = stat
gw.tracer = stat.NewTracer("gateway")
gw.application = application
gw.backendConfig = backendConfig
gw.jobsDB = jobsDB
Expand Down
1 change: 1 addition & 0 deletions gateway/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type webRequestT struct {
reqType string
requestPayload []byte
authContext *gwtypes.AuthRequestContext
traceParent string
ipAddr string
userIDHeader string
errors []string
Expand Down
Loading

0 comments on commit 497aee8

Please sign in to comment.