Skip to content

Commit

Permalink
refactor: timerStat.RecordDuration, Since instead of timerStat.Start,…
Browse files Browse the repository at this point in the history
… End (#2870)
  • Loading branch information
Sidddddarth committed Jan 13, 2023
1 parent 56f84df commit 134bb0a
Show file tree
Hide file tree
Showing 29 changed files with 233 additions and 273 deletions.
6 changes: 2 additions & 4 deletions event-schema/event_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,7 @@ func (manager *EventSchemaManagerT) handleEvent(writeKey string, event EventT) {
}

processingTimer := stats.Default.NewTaggedStat("archive_event_model", stats.TimerType, stats.Tags{"module": "event_schemas", "writeKey": writeKey, "eventIdentifier": eventIdentifier})
processingTimer.Start()
defer processingTimer.End()
defer processingTimer.RecordDuration()()

// TODO: Create locks on every event_model to improve scaling this
manager.eventModelLock.Lock()
Expand Down Expand Up @@ -1147,8 +1146,7 @@ func (manager *EventSchemaManagerT) Setup() {
defer setEventSchemasPopulated(true)

populateESTimer := stats.Default.NewTaggedStat("populate_event_schemas", stats.TimerType, stats.Tags{"module": "event_schemas"})
populateESTimer.Start()
defer populateESTimer.End()
defer populateESTimer.RecordDuration()()

manager.populateEventSchemas()
})
Expand Down
4 changes: 2 additions & 2 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func (gateway *HandleT) userWebRequestWorkerProcess(userWebRequestWorker *userWe
// Saving the event data read from req.request.Body to the splice.
// Using this to send event schema to the config backend.
var eventBatchesToRecord []sourceDebugger
userWebRequestWorker.batchTimeStat.Start()
batchStart := time.Now()
for _, req := range breq.batchRequest {
writeKey := req.writeKey
sourceTag := gateway.getSourceTagFromWriteKey(writeKey)
Expand Down Expand Up @@ -496,7 +496,7 @@ func (gateway *HandleT) userWebRequestWorkerProcess(userWebRequestWorker *userWe
sourcedebugger.RecordEvent(eventBatch.writeKey, eventBatch.data)
}

userWebRequestWorker.batchTimeStat.End()
userWebRequestWorker.batchTimeStat.Since(batchStart)
gateway.batchSizeStat.Observe(float64(len(breq.batchRequest)))

for _, v := range sourceStats {
Expand Down
4 changes: 2 additions & 2 deletions gateway/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,12 +350,12 @@ func (bt *batchWebhookTransformerT) batchTransformLoop() {

// stats
bt.stats.sourceStats[breq.sourceType].numEvents.Count(len(payloadArr))
bt.stats.sourceStats[breq.sourceType].sourceTransform.Start()

transformStart := time.Now()
batchResponse := bt.transform(payloadArr, breq.sourceType)

// stats
bt.stats.sourceStats[breq.sourceType].sourceTransform.End()
bt.stats.sourceStats[breq.sourceType].sourceTransform.Since(transformStart)

if batchResponse.batchError == nil && len(batchResponse.responses) != len(payloadArr) {
batchResponse.batchError = errors.New("webhook batch transform response events size does not equal sent events size")
Expand Down
5 changes: 3 additions & 2 deletions gateway/webhook/webhookTransformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"net/http"
"strings"
"time"

"github.com/rudderlabs/rudder-server/gateway/response"
"github.com/rudderlabs/rudder-server/utils/httputil"
Expand Down Expand Up @@ -46,13 +47,13 @@ func (bt *batchWebhookTransformerT) markResponseFail(reason string) transformerR

func (bt *batchWebhookTransformerT) transform(events [][]byte, sourceType string) transformerBatchResponseT {
bt.stats.sentStat.Count(len(events))
bt.stats.transformTimerStat.Start()
transformStart := time.Now()

payload := misc.MakeJSONArray(events)
url := fmt.Sprintf(`%s/%s`, bt.sourceTransformerURL, strings.ToLower(sourceType))
resp, err := bt.webhook.netClient.Post(url, "application/json; charset=utf-8", bytes.NewBuffer(payload))

bt.stats.transformTimerStat.End()
bt.stats.transformTimerStat.Since(transformStart)
if err != nil {
err := fmt.Errorf("JS HTTP connection error to source transformer: URL: %v Error: %+v", url, err)
return transformerBatchResponseT{batchError: err, statusCode: http.StatusServiceUnavailable}
Expand Down
4 changes: 2 additions & 2 deletions internal/throttling/throttling.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ func (l *Limiter) getTimer(key, algo string, rate, window int64) func() {
"rate": strconv.FormatInt(rate, 10),
"window": strconv.FormatInt(window, 10),
})
m.Start()
start := time.Now()
return func() {
m.End()
m.Since(start)
}
}
30 changes: 14 additions & 16 deletions jobsdb/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ So, we don't have to worry about dsEmptyResultCache
*/
func (jd *HandleT) deleteJobStatus() {
err := jd.WithUpdateSafeTx(context.TODO(), func(tx UpdateSafeTx) error {
tags := statTags{CustomValFilters: []string{jd.tablePrefix}}
queryStat := jd.getTimerStat("jobsdb_delete_job_status_time", &tags)
queryStat.Start()
defer queryStat.End()
defer jd.getTimerStat(
"jobsdb_delete_job_status_time",
&statTags{
CustomValFilters: []string{jd.tablePrefix},
}).RecordDuration()()

dsList := jd.getDSList()

Expand All @@ -100,10 +101,11 @@ func (jd *HandleT) deleteJobStatus() {
}

func (jd *HandleT) deleteJobStatusDSInTx(txHandler transactionHandler, ds dataSetT) error {
tags := statTags{CustomValFilters: []string{jd.tablePrefix}}
queryStat := jd.getTimerStat("jobsdb_delete_job_status_ds_time", &tags)
queryStat.Start()
defer queryStat.End()
defer jd.getTimerStat(
"jobsdb_delete_job_status_ds_time",
&statTags{
CustomValFilters: []string{jd.tablePrefix},
}).RecordDuration()()

_, err := txHandler.Exec(
fmt.Sprintf(
Expand Down Expand Up @@ -140,12 +142,10 @@ So, we don't have to worry about dsEmptyResultCache
*/
func (jd *HandleT) failExecuting() {
err := jd.WithUpdateSafeTx(context.TODO(), func(tx UpdateSafeTx) error {
queryStat := jd.getTimerStat(
defer jd.getTimerStat(
"jobsdb_fail_executing_time",
&statTags{CustomValFilters: []string{jd.tablePrefix}},
)
queryStat.Start()
defer queryStat.End()
).RecordDuration()()

dsList := jd.getDSList()

Expand All @@ -165,12 +165,10 @@ func (jd *HandleT) failExecuting() {
}

func (jd *HandleT) failExecutingDSInTx(txHandler transactionHandler, ds dataSetT) error {
queryStat := jd.getTimerStat(
defer jd.getTimerStat(
"jobsdb_fail_executing_ds_time",
&statTags{CustomValFilters: []string{jd.tablePrefix}},
)
queryStat.Start()
defer queryStat.End()
).RecordDuration()()

_, err := txHandler.Exec(
fmt.Sprintf(
Expand Down
14 changes: 8 additions & 6 deletions jobsdb/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,9 +495,11 @@ func getStatusBackupQueryFn(backupDSRange *dataSetRangeT) func(int64) string {
}

func (jd *HandleT) createTableDumps(queryFunc func(int64) string, pathFunc func(string) (string, error), totalCount int64) (map[string]string, error) {
defer jd.getTimerStat(
"table_FileDump_TimeStat",
&statTags{CustomValFilters: []string{jd.tablePrefix}},
).RecordDuration()()
filesWriter := fileuploader.NewGzMultiFileWriter()
tableFileDumpTimeStat := stats.Default.NewTaggedStat("table_FileDump_TimeStat", stats.TimerType, stats.Tags{"customVal": jd.tablePrefix})
tableFileDumpTimeStat.Start()

var offset int64
dumps := make(map[string]string)
Expand Down Expand Up @@ -558,13 +560,14 @@ func (jd *HandleT) createTableDumps(queryFunc func(int64) string, pathFunc func(
if err != nil {
return dumps, err
}
tableFileDumpTimeStat.End()
return dumps, nil
}

func (jd *HandleT) uploadTableDump(ctx context.Context, workspaceID, path string) error {
fileUploadTimeStat := stats.Default.NewTaggedStat("fileUpload_TimeStat", stats.TimerType, stats.Tags{"customVal": jd.tablePrefix})
fileUploadTimeStat.Start()
defer jd.getTimerStat(
"fileUpload_TimeStat",
&statTags{CustomValFilters: []string{jd.tablePrefix}},
).RecordDuration()()

file, err := os.Open(path)
if err != nil {
Expand All @@ -587,7 +590,6 @@ func (jd *HandleT) uploadTableDump(ctx context.Context, workspaceID, path string
return err
}
jd.logger.Infof("[JobsDB] :: Backed up table at %s for workspaceId %s", output.Location, workspaceID)
fileUploadTimeStat.End()
return nil
}

Expand Down
95 changes: 58 additions & 37 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,9 +427,11 @@ type HandleT struct {
statPreDropTableCount stats.Measurement
statDSCount stats.Measurement
statNewDSPeriod stats.Measurement
newDSCreationTime time.Time
invalidCacheKeyStat stats.Measurement
isStatNewDSPeriodInitialized bool
statDropDSPeriod stats.Measurement
dsDropTime time.Time
unionQueryTime stats.Measurement
isStatDropDSPeriodInitialized bool
logger logger.Logger
Expand Down Expand Up @@ -1211,13 +1213,14 @@ func (jd *HandleT) addNewDS(l lock.LockToken, ds dataSetT) {

// NOTE: If addNewDSInTx is directly called, make sure to explicitly call refreshDSRangeList(l) to update the DS list in cache, once transaction has completed.
func (jd *HandleT) addNewDSInTx(tx *Tx, l lock.LockToken, dsList []dataSetT, ds dataSetT) error {
defer jd.getTimerStat(
"add_new_ds",
&statTags{CustomValFilters: []string{jd.tablePrefix}},
).RecordDuration()()
if l == nil {
return errors.New("nil ds list lock token provided")
}
jd.logger.Infof("Creating new DS %+v", ds)
queryStat := stats.Default.NewTaggedStat("add_new_ds", stats.TimerType, stats.Tags{"customVal": jd.tablePrefix})
queryStat.Start()
defer queryStat.End()
err := jd.createDSInTx(tx, ds)
if err != nil {
return err
Expand All @@ -1228,19 +1231,20 @@ func (jd *HandleT) addNewDSInTx(tx *Tx, l lock.LockToken, dsList []dataSetT, ds
}
// Tracking time interval between new ds creations. Hence calling end before start
if jd.isStatNewDSPeriodInitialized {
jd.statNewDSPeriod.End()
jd.statNewDSPeriod.Since(jd.newDSCreationTime)
}
jd.statNewDSPeriod.Start()
jd.newDSCreationTime = time.Now()
jd.isStatNewDSPeriodInitialized = true

return nil
}

func (jd *HandleT) addDSInTx(tx *Tx, ds dataSetT) error {
defer jd.getTimerStat(
"add_new_ds",
&statTags{CustomValFilters: []string{jd.tablePrefix}},
).RecordDuration()()
jd.logger.Infof("Creating DS %+v", ds)
queryStat := stats.Default.NewTaggedStat("add_new_ds", stats.TimerType, stats.Tags{"customVal": jd.tablePrefix})
queryStat.Start()
defer queryStat.End()
return jd.createDSInTx(tx, ds)
}

Expand Down Expand Up @@ -1478,9 +1482,9 @@ func (jd *HandleT) postDropDs(ds dataSetT) {

// Tracking time interval between drop ds operations. Hence calling end before start
if jd.isStatDropDSPeriodInitialized {
jd.statDropDSPeriod.End()
jd.statDropDSPeriod.Since(jd.dsDropTime)
}
jd.statDropDSPeriod.Start()
jd.dsDropTime = time.Now()
jd.isStatDropDSPeriodInitialized = true
}

Expand Down Expand Up @@ -1630,9 +1634,10 @@ func (jd *HandleT) dropAllDS(l lock.LockToken) error {
}

func (jd *HandleT) internalStoreJobsInTx(ctx context.Context, tx *Tx, ds dataSetT, jobList []*JobT) error {
queryStat := jd.getTimerStat("store_jobs", nil)
queryStat.Start()
defer queryStat.End()
defer jd.getTimerStat(
"store_jobs",
&statTags{CustomValFilters: []string{jd.tablePrefix}},
).RecordDuration()()

tx.AddSuccessListener(func() {
jd.clearCache(ds, jobList)
Expand All @@ -1646,9 +1651,10 @@ Next set of functions are for reading/writing jobs and job_status for
a given dataset. The names should be self explainatory
*/
func (jd *HandleT) copyJobsDS(tx *Tx, ds dataSetT, jobList []*JobT) error { // When fixing callers make sure error is handled with assertError
queryStat := jd.getTimerStat("copy_jobs", nil)
queryStat.Start()
defer queryStat.End()
defer jd.getTimerStat(
"copy_jobs",
&statTags{CustomValFilters: []string{jd.tablePrefix}},
).RecordDuration()()

tx.AddSuccessListener(func() {
jd.clearCache(ds, jobList)
Expand Down Expand Up @@ -1748,9 +1754,10 @@ func (jd *HandleT) internalStoreWithRetryEachInTx(ctx context.Context, tx *Tx, d
}
return errorMessagesMap
}
queryStat := jd.getTimerStat("store_jobs_retry_each", nil)
queryStat.Start()
defer queryStat.End()
defer jd.getTimerStat(
"store_jobs_retry_each",
nil,
).RecordDuration()()

_, err := tx.ExecContext(ctx, savepointSql)
if err != nil {
Expand Down Expand Up @@ -2164,9 +2171,10 @@ func (jd *HandleT) markClearEmptyResult(ds dataSetT, workspace string, stateFilt
// * The entry is noJobs
// * The entry is not expired (entry time + cache expiration > now)
func (jd *HandleT) isEmptyResult(ds dataSetT, workspace string, stateFilters, customValFilters []string, parameterFilters []ParameterFilterT) bool {
queryStat := stats.Default.NewTaggedStat("isEmptyCheck", stats.TimerType, stats.Tags{"customVal": jd.tablePrefix})
queryStat.Start()
defer queryStat.End()
defer jd.getTimerStat(
"isEmptyCheck",
&statTags{CustomValFilters: []string{jd.tablePrefix}},
).RecordDuration()()
jd.dsCacheLock.Lock()
defer jd.dsCacheLock.Unlock()

Expand Down Expand Up @@ -2586,9 +2594,10 @@ func (jd *HandleT) updateJobStatusDSInTx(ctx context.Context, tx *Tx, ds dataSet
return
}

queryStat := jd.getTimerStat("update_job_status_ds_time", &tags)
queryStat.Start()
defer queryStat.End()
defer jd.getTimerStat(
"update_job_status_ds_time",
&tags,
).RecordDuration()()
updatedStatesMap := map[string]map[string]bool{}
store := func() error {
stmt, err := tx.PrepareContext(ctx, pq.CopyIn(ds.JobStatusTable, "job_id", "job_state", "attempt", "exec_time",
Expand Down Expand Up @@ -3014,10 +3023,14 @@ Later we can move this to query
*/
func (jd *HandleT) internalUpdateJobStatusInTx(ctx context.Context, tx *Tx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error {
// capture stats
tags := statTags{CustomValFilters: customValFilters, ParameterFilters: parameterFilters}
queryStat := jd.getTimerStat("update_job_status_time", &tags)
queryStat.Start()
defer queryStat.End()
tags := statTags{
CustomValFilters: customValFilters,
ParameterFilters: parameterFilters,
}
defer jd.getTimerStat(
"update_job_status_time",
&tags,
).RecordDuration()()

// do update
updatedStatesByDS, err := jd.doUpdateJobStatusInTx(ctx, tx, statusList, tags)
Expand Down Expand Up @@ -3225,10 +3238,14 @@ func (jd *HandleT) getUnprocessed(ctx context.Context, params GetQueryParamsT) (
return JobsResult{}, nil
}

tags := statTags{CustomValFilters: params.CustomValFilters, ParameterFilters: params.ParameterFilters}
queryStat := jd.getTimerStat("unprocessed_jobs_time", &tags)
queryStat.Start()
defer queryStat.End()
tags := statTags{
CustomValFilters: params.CustomValFilters,
ParameterFilters: params.ParameterFilters,
}
defer jd.getTimerStat(
"unprocessed_jobs_time",
&tags,
).RecordDuration()()

// The order of lock is very important. The migrateDSLoop
// takes lock in this order so reversing this will cause
Expand Down Expand Up @@ -3332,10 +3349,14 @@ func (jd *HandleT) GetProcessed(ctx context.Context, params GetQueryParamsT) (Jo
return JobsResult{}, nil
}

tags := statTags{CustomValFilters: params.CustomValFilters, StateFilters: params.StateFilters, ParameterFilters: params.ParameterFilters}
queryStat := jd.getTimerStat("processed_jobs_time", &tags)
queryStat.Start()
defer queryStat.End()
defer jd.getTimerStat(
"processed_jobs_time",
&statTags{
CustomValFilters: params.CustomValFilters,
StateFilters: params.StateFilters,
ParameterFilters: params.ParameterFilters,
},
).RecordDuration()()

// The order of lock is very important. The migrateDSLoop
// takes lock in this order so reversing this will cause
Expand Down
Loading

0 comments on commit 134bb0a

Please sign in to comment.