diff --git a/internal/uplog/request_uplog.go b/internal/uplog/request_uplog.go index 3a584ce0..dc231434 100644 --- a/internal/uplog/request_uplog.go +++ b/internal/uplog/request_uplog.go @@ -11,6 +11,7 @@ import ( "os" "runtime" "strconv" + "sync" "sync/atomic" "time" @@ -59,6 +60,18 @@ type ( PerceptiveSpeed int64 `json:"perceptive_speed,omitempty"` getUpToken GetUpToken `json:"-"` } + + // callbackTracker manages uplog data updates from concurrent httptrace callbacks + // and implements delayed uplog submission with timer reset mechanism + callbackTracker struct { + uplog *RequestUplog + mu sync.RWMutex + timer *time.Timer + submitted bool + submitDelay time.Duration + onceSubmit sync.Once + getUpToken GetUpToken + } ) func NewRequestUplog(apiName, targetBucket, targetKey string, getUpToken GetUpToken) (*RequestUplog, error) { @@ -84,11 +97,77 @@ func (uplog *RequestUplog) Priority() clientv2.InterceptorPriority { return clientv2.InterceptorPriorityUplog } +// newCallbackTracker creates a new tracker for managing uplog updates +func newCallbackTracker(uplog *RequestUplog, getUpToken GetUpToken, submitDelay time.Duration) *callbackTracker { + return &callbackTracker{ + uplog: uplog, + getUpToken: getUpToken, + submitDelay: submitDelay, + } +} + +// update safely updates uplog data with read-write lock protection +// and resets the submission timer if submission hasn't occurred yet +func (t *callbackTracker) update(updateFunc func(*RequestUplog)) { + t.mu.Lock() + defer t.mu.Unlock() + + // Ignore updates after submission + if t.submitted { + return + } + + // Apply the update + updateFunc(t.uplog) + + // Reset the timer + if t.timer != nil { + t.timer.Stop() + } + t.timer = time.AfterFunc(t.submitDelay, func() { + t.submit() + }) +} + +// submit triggers the final uplog submission +// This method is called either by the timer or explicitly +// Once submitted, all subsequent updates are ignored +func (t *callbackTracker) submit() { + // Ensure timer is cleared after submission to help GC + // This defer is outside onceSubmit.Do so it runs every time submit() is called + defer func() { + t.mu.Lock() + t.timer = nil + t.mu.Unlock() + }() + + t.onceSubmit.Do(func() { + t.mu.Lock() + t.submitted = true + + // Create a snapshot of the uplog data + snapshot := *t.uplog + t.mu.Unlock() + + // Serialize and send to uplog channel + if uplogBytes, jsonError := json.Marshal(&snapshot); jsonError == nil { + uplogChan <- uplogSerializedEntry{ + serializedUplog: uplogBytes, + getUpToken: t.getUpToken, + } + } + }) +} + func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler) (resp *http.Response, err error) { if !IsUplogEnabled() { return handler(req) } + // Create callback tracker with 2s delay for uplog submission + // This allows capturing timing data from async callbacks + tracker := newCallbackTracker(uplog, uplog.getUpToken, 2*time.Second) + var dnsStartTime, gotFirstResponseByteTime, connectStartTime, tlsHandshakeStartTime, wroteHeadersTime, wroteRequestTime time.Time uplog.Method = truncate(req.Method, maxFieldValueLength) @@ -134,7 +213,9 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler }, GotFirstResponseByte: func() { if !wroteRequestTime.IsZero() { - uplog.WaitElapsedTime = getElapsedTime(wroteRequestTime) + tracker.update(func(l *RequestUplog) { + l.WaitElapsedTime = getElapsedTime(wroteRequestTime) + }) } gotFirstResponseByteTime = time.Now() }, @@ -143,7 +224,9 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler }, DNSDone: func(info httptrace.DNSDoneInfo) { if !dnsStartTime.IsZero() { - uplog.DNSElapsedTime = getElapsedTime(dnsStartTime) + tracker.update(func(l *RequestUplog) { + l.DNSElapsedTime = getElapsedTime(dnsStartTime) + }) } }, ConnectStart: func(network string, addr string) { @@ -151,7 +234,9 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler }, ConnectDone: func(network string, addr string, err error) { if !connectStartTime.IsZero() { - uplog.ConnectElapsedTime = getElapsedTime(connectStartTime) + tracker.update(func(l *RequestUplog) { + l.ConnectElapsedTime = getElapsedTime(connectStartTime) + }) } }, TLSHandshakeStart: func() { @@ -159,7 +244,9 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler }, TLSHandshakeDone: func(tls.ConnectionState, error) { if !tlsHandshakeStartTime.IsZero() { - uplog.TLSConnectElapsedTime = getElapsedTime(tlsHandshakeStartTime) + tracker.update(func(l *RequestUplog) { + l.TLSConnectElapsedTime = getElapsedTime(tlsHandshakeStartTime) + }) } }, WroteHeaders: func() { @@ -167,7 +254,9 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler }, WroteRequest: func(info httptrace.WroteRequestInfo) { if !wroteHeadersTime.IsZero() { - uplog.RequestElapsedTime = getElapsedTime(wroteHeadersTime) + tracker.update(func(l *RequestUplog) { + l.RequestElapsedTime = getElapsedTime(wroteHeadersTime) + }) } wroteRequestTime = time.Now() }, @@ -177,9 +266,13 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler uplog.UpTime = beginAt.Unix() resp, err = handler(req) if !gotFirstResponseByteTime.IsZero() { - uplog.ResponseElapsedTime = getElapsedTime(gotFirstResponseByteTime) + tracker.update(func(l *RequestUplog) { + l.ResponseElapsedTime = getElapsedTime(gotFirstResponseByteTime) + }) } - uplog.TotalElapsedTime = getElapsedTime(beginAt) + tracker.update(func(l *RequestUplog) { + l.TotalElapsedTime = getElapsedTime(beginAt) + }) if err != nil { uplog.ErrorType, uplog.ErrorDescription = uplog.detect(resp, err) uplog.ErrorDescription = truncate(uplog.ErrorDescription, maxFieldValueLength) @@ -215,9 +308,14 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler uplog.PerceptiveSpeed = uplog.BytesReceived * 1000 / int64(uplog.TotalElapsedTime) } } - if uplogBytes, jsonError := json.Marshal(uplog); jsonError == nil { - uplogChan <- uplogSerializedEntry{serializedUplog: uplogBytes, getUpToken: uplog.getUpToken} - } + + // Trigger uplog submission + // The timer will fire after delay unless more updates arrive + // Note: We allow the last timer to keep running to capture any late async callbacks + // Once submit() is called, no new timers will be created (checked in update()), + // allowing the tracker to be garbage collected after the timer fires. + tracker.submit() + return }