Skip to content
118 changes: 108 additions & 10 deletions internal/uplog/request_uplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -59,6 +60,18 @@ type (
PerceptiveSpeed int64 `json:"perceptive_speed,omitempty"`
getUpToken GetUpToken `json:"-"`
}

// callbackTracker manages uplog data updates from concurrent httptrace callbacks
// and implements delayed uplog submission with timer reset mechanism
callbackTracker struct {
uplog *RequestUplog
mu sync.RWMutex
timer *time.Timer
submitted bool
submitDelay time.Duration
onceSubmit sync.Once
getUpToken GetUpToken
}
)

func NewRequestUplog(apiName, targetBucket, targetKey string, getUpToken GetUpToken) (*RequestUplog, error) {
Expand All @@ -84,11 +97,77 @@ func (uplog *RequestUplog) Priority() clientv2.InterceptorPriority {
return clientv2.InterceptorPriorityUplog
}

// newCallbackTracker creates a new tracker for managing uplog updates
func newCallbackTracker(uplog *RequestUplog, getUpToken GetUpToken, submitDelay time.Duration) *callbackTracker {
return &callbackTracker{
uplog: uplog,
getUpToken: getUpToken,
submitDelay: submitDelay,
}
}

// update safely updates uplog data with read-write lock protection
// and resets the submission timer if submission hasn't occurred yet
func (t *callbackTracker) update(updateFunc func(*RequestUplog)) {
t.mu.Lock()
defer t.mu.Unlock()

// Ignore updates after submission
if t.submitted {
return
}

// Apply the update
updateFunc(t.uplog)

// Reset the timer
if t.timer != nil {
t.timer.Stop()
}
t.timer = time.AfterFunc(t.submitDelay, func() {
t.submit()
})
}

// submit triggers the final uplog submission
// This method is called either by the timer or explicitly
// Once submitted, all subsequent updates are ignored
func (t *callbackTracker) submit() {
// Ensure timer is cleared after submission to help GC
// This defer is outside onceSubmit.Do so it runs every time submit() is called
defer func() {
t.mu.Lock()
t.timer = nil
t.mu.Unlock()
}()

t.onceSubmit.Do(func() {
t.mu.Lock()
t.submitted = true

// Create a snapshot of the uplog data
snapshot := *t.uplog
t.mu.Unlock()

// Serialize and send to uplog channel
if uplogBytes, jsonError := json.Marshal(&snapshot); jsonError == nil {
uplogChan <- uplogSerializedEntry{
serializedUplog: uplogBytes,
getUpToken: t.getUpToken,
}
}
})
}

func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler) (resp *http.Response, err error) {
if !IsUplogEnabled() {
return handler(req)
}

// Create callback tracker with 2s delay for uplog submission
// This allows capturing timing data from async callbacks
tracker := newCallbackTracker(uplog, uplog.getUpToken, 2*time.Second)

var dnsStartTime, gotFirstResponseByteTime, connectStartTime, tlsHandshakeStartTime, wroteHeadersTime, wroteRequestTime time.Time

uplog.Method = truncate(req.Method, maxFieldValueLength)
Expand Down Expand Up @@ -134,7 +213,9 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler
},
GotFirstResponseByte: func() {
if !wroteRequestTime.IsZero() {
uplog.WaitElapsedTime = getElapsedTime(wroteRequestTime)
tracker.update(func(l *RequestUplog) {
l.WaitElapsedTime = getElapsedTime(wroteRequestTime)
})
}
gotFirstResponseByteTime = time.Now()
},
Expand All @@ -143,31 +224,39 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler
},
DNSDone: func(info httptrace.DNSDoneInfo) {
if !dnsStartTime.IsZero() {
uplog.DNSElapsedTime = getElapsedTime(dnsStartTime)
tracker.update(func(l *RequestUplog) {
l.DNSElapsedTime = getElapsedTime(dnsStartTime)
})
}
},
ConnectStart: func(network string, addr string) {
connectStartTime = time.Now()
},
ConnectDone: func(network string, addr string, err error) {
if !connectStartTime.IsZero() {
uplog.ConnectElapsedTime = getElapsedTime(connectStartTime)
tracker.update(func(l *RequestUplog) {
l.ConnectElapsedTime = getElapsedTime(connectStartTime)
})
}
},
TLSHandshakeStart: func() {
tlsHandshakeStartTime = time.Now()
},
TLSHandshakeDone: func(tls.ConnectionState, error) {
if !tlsHandshakeStartTime.IsZero() {
uplog.TLSConnectElapsedTime = getElapsedTime(tlsHandshakeStartTime)
tracker.update(func(l *RequestUplog) {
l.TLSConnectElapsedTime = getElapsedTime(tlsHandshakeStartTime)
})
}
},
WroteHeaders: func() {
wroteHeadersTime = time.Now()
},
WroteRequest: func(info httptrace.WroteRequestInfo) {
if !wroteHeadersTime.IsZero() {
uplog.RequestElapsedTime = getElapsedTime(wroteHeadersTime)
tracker.update(func(l *RequestUplog) {
l.RequestElapsedTime = getElapsedTime(wroteHeadersTime)
})
}
wroteRequestTime = time.Now()
},
Expand All @@ -177,9 +266,13 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler
uplog.UpTime = beginAt.Unix()
resp, err = handler(req)
if !gotFirstResponseByteTime.IsZero() {
uplog.ResponseElapsedTime = getElapsedTime(gotFirstResponseByteTime)
tracker.update(func(l *RequestUplog) {
l.ResponseElapsedTime = getElapsedTime(gotFirstResponseByteTime)
})
}
uplog.TotalElapsedTime = getElapsedTime(beginAt)
tracker.update(func(l *RequestUplog) {
l.TotalElapsedTime = getElapsedTime(beginAt)
})
if err != nil {
uplog.ErrorType, uplog.ErrorDescription = uplog.detect(resp, err)
uplog.ErrorDescription = truncate(uplog.ErrorDescription, maxFieldValueLength)
Expand Down Expand Up @@ -215,9 +308,14 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler
uplog.PerceptiveSpeed = uplog.BytesReceived * 1000 / int64(uplog.TotalElapsedTime)
}
}
if uplogBytes, jsonError := json.Marshal(uplog); jsonError == nil {
uplogChan <- uplogSerializedEntry{serializedUplog: uplogBytes, getUpToken: uplog.getUpToken}
}

// Trigger uplog submission
// The timer will fire after delay unless more updates arrive
// Note: We allow the last timer to keep running to capture any late async callbacks
// Once submit() is called, no new timers will be created (checked in update()),
// allowing the tracker to be garbage collected after the timer fires.
tracker.submit()

return
}

Expand Down
Loading