Skip to content

Commit

Permalink
chore: add provision to disable tracking event names from a source fo…
Browse files Browse the repository at this point in the history
…r reporting (#3632)
  • Loading branch information
satishrudderstack committed Jul 18, 2023
1 parent 26c1791 commit d80fee2
Showing 1 changed file with 37 additions and 27 deletions.
64 changes: 37 additions & 27 deletions enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/lib/pq"
"github.com/samber/lo"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"
Expand Down Expand Up @@ -45,21 +46,22 @@ const (
)

type HandleT struct {
init chan struct{}
onceInit sync.Once
clients map[string]*types.Client
clientsMapLock sync.RWMutex
log logger.Logger
reportingServiceURL string
namespace string
workspaceID string
instanceID string
workspaceIDForSourceIDMap map[string]string
piiReportingSettings map[string]bool
whActionsOnly bool
region string
sleepInterval time.Duration
mainLoopSleepInterval time.Duration
init chan struct{}
onceInit sync.Once
clients map[string]*types.Client
clientsMapLock sync.RWMutex
log logger.Logger
reportingServiceURL string
namespace string
workspaceID string
instanceID string
workspaceIDForSourceIDMap map[string]string
piiReportingSettings map[string]bool
whActionsOnly bool
region string
sleepInterval time.Duration
mainLoopSleepInterval time.Duration
sourcesWithEventNameTrackingDisabled []string

getMinReportedAtQueryTime stats.Measurement
getReportsQueryTime stats.Measurement
Expand All @@ -70,6 +72,8 @@ func NewFromEnvConfig(log logger.Logger) *HandleT {
var sleepInterval, mainLoopSleepInterval time.Duration
reportingServiceURL := config.GetString("REPORTING_URL", "https://reporting.rudderstack.com/")
reportingServiceURL = strings.TrimSuffix(reportingServiceURL, "/")
sourcesWithEventNameTrackingDisabled := config.GetStringSlice("Reporting.sourcesWithEventNameTrackingDisabled", []string{})

config.RegisterDurationConfigVariable(5, &mainLoopSleepInterval, true, time.Second, "Reporting.mainLoopSleepInterval")
config.RegisterDurationConfigVariable(30, &sleepInterval, true, time.Second, "Reporting.sleepInterval")
config.RegisterIntConfigVariable(32, &maxConcurrentRequests, true, 1, "Reporting.maxConcurrentRequests")
Expand All @@ -80,18 +84,19 @@ func NewFromEnvConfig(log logger.Logger) *HandleT {
}

return &HandleT{
init: make(chan struct{}),
log: log,
clients: make(map[string]*types.Client),
reportingServiceURL: reportingServiceURL,
namespace: config.GetKubeNamespace(),
instanceID: config.GetString("INSTANCE_ID", "1"),
workspaceIDForSourceIDMap: make(map[string]string),
piiReportingSettings: make(map[string]bool),
whActionsOnly: whActionsOnly,
sleepInterval: sleepInterval,
mainLoopSleepInterval: mainLoopSleepInterval,
region: config.GetString("region", ""),
init: make(chan struct{}),
log: log,
clients: make(map[string]*types.Client),
reportingServiceURL: reportingServiceURL,
namespace: config.GetKubeNamespace(),
instanceID: config.GetString("INSTANCE_ID", "1"),
workspaceIDForSourceIDMap: make(map[string]string),
piiReportingSettings: make(map[string]bool),
whActionsOnly: whActionsOnly,
sleepInterval: sleepInterval,
mainLoopSleepInterval: mainLoopSleepInterval,
region: config.GetString("region", ""),
sourcesWithEventNameTrackingDisabled: sourcesWithEventNameTrackingDisabled,
}
}

Expand Down Expand Up @@ -564,6 +569,11 @@ func (r *HandleT) Report(metrics []*types.PUReportedMetric, txn *sql.Tx) {
for _, metric := range metrics {
workspaceID := r.getWorkspaceID(metric.ConnectionDetails.SourceID)
metric := *metric

if slices.Contains(r.sourcesWithEventNameTrackingDisabled, metric.ConnectionDetails.SourceID) {
metric.StatusDetail.EventName = metric.StatusDetail.EventType
}

if r.IsPIIReportingDisabled(workspaceID) {
metric = transformMetricForPII(metric, getPIIColumnsToExclude())
}
Expand Down

0 comments on commit d80fee2

Please sign in to comment.