Skip to content

Commit

Permalink
chore: vacuum reports table when size > 5GB
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Mar 11, 2024
1 parent 9f440ef commit 0d35e41
Showing 1 changed file with 28 additions and 2 deletions.
30 changes: 28 additions & 2 deletions enterprise/reporting/reporting.go
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/lib/pq"
"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
Expand All @@ -43,6 +44,7 @@ const (
StatReportingHttpReq = "reporting_client_http_request"
StatReportingGetMinReportedAtQueryTime = "reporting_client_get_min_reported_at_query_time"
StatReportingGetReportsQueryTime = "reporting_client_get_reports_query_time"
StatReportingVacuumDuration = "reporting_vacuum_duration"
)

type DefaultReporter struct {
Expand Down Expand Up @@ -354,6 +356,7 @@ func (r *DefaultReporter) mainLoop(ctx context.Context, c types.SyncerConfig) {
getReportsCount := r.stats.NewTaggedStat(StatReportingGetReportsCount, stats.HistogramType, tags)
getAggregatedReportsTimer := r.stats.NewTaggedStat(StatReportingGetAggregatedReportsTime, stats.TimerType, tags)
getAggregatedReportsCount := r.stats.NewTaggedStat(StatReportingGetAggregatedReportsCount, stats.HistogramType, tags)
vacuumDuration := r.stats.NewTaggedStat(StatReportingVacuumDuration, stats.TimerType, tags)

r.getMinReportedAtQueryTime = r.stats.NewTaggedStat(StatReportingGetMinReportedAtQueryTime, stats.TimerType, tags)
r.getReportsQueryTime = r.stats.NewTaggedStat(StatReportingGetReportsQueryTime, stats.TimerType, tags)
Expand Down Expand Up @@ -389,10 +392,10 @@ func (r *DefaultReporter) mainLoop(ctx context.Context, c types.SyncerConfig) {
}
requestChan := make(chan struct{}, r.maxConcurrentRequests.Load())
loopStart := time.Now()
currentMs := time.Now().UTC().Unix() / 60
currentMin := time.Now().UTC().Unix() / 60

getReportsStart := time.Now()
reports, reportedAt, err := r.getReports(currentMs, c.ConnInfo)
reports, reportedAt, err := r.getReports(currentMin, c.ConnInfo)
getReportsTimer.Since(getReportsStart)
getReportsCount.Observe(float64(len(reports)))
if len(reports) == 0 {
Expand All @@ -417,6 +420,8 @@ func (r *DefaultReporter) mainLoop(ctx context.Context, c types.SyncerConfig) {
getAggregatedReportsCount.Observe(float64(len(metrics)))

errGroup, errCtx := errgroup.WithContext(ctx)
// default to -1 to allow unlimited concurrency
errGroup.SetLimit(config.GetInt("Reporting.maxConcurrentRequests", -1))
for _, metric := range metrics {
if r.whActionsOnly && metric.SourceCategory != "warehouse" {
// if whActionsOnly is true, we only send reports for wh actions sources
Expand Down Expand Up @@ -446,6 +451,27 @@ func (r *DefaultReporter) mainLoop(ctx context.Context, c types.SyncerConfig) {
if err != nil {
r.log.Errorf(`[ Reporting ]: Error deleting local reports from %s: %v`, ReportsTable, err)
}

vacuumStart := time.Now()
var sizeEstimate int64
if err := dbHandle.QueryRowContext(
ctx,
fmt.Sprintf(`SELECT pg_table_size(oid) from pg_class where relname='%s';`, ReportsTable),
).Scan(&sizeEstimate); err != nil {
r.log.Errorn(
`[ Reporting ]: Error getting table size estimate`,
logger.NewErrorField(err),
)
}
if sizeEstimate > config.GetInt64("Reporting.vacuumThresholdBytes", 5*bytesize.GB) {
if _, err := dbHandle.ExecContext(ctx, `vacuum full analyze reports;`); err != nil {
r.log.Errorn(
`[ Reporting ]: Error vacuuming reports table`,
logger.NewErrorField(err),
)
}
vacuumDuration.Since(vacuumStart)
}
}

mainLoopTimer.Since(loopStart)
Expand Down

0 comments on commit 0d35e41

Please sign in to comment.