From 71b59f9acb81727c870e77328c698866ec87cf63 Mon Sep 17 00:00:00 2001 From: xgopilot Date: Tue, 11 Nov 2025 10:09:29 +0000 Subject: [PATCH 1/8] fix(uplog): resolve data race in RequestUplog timing fields - Use atomic operations for concurrent access to timing fields - Add atomicSnapshot method for thread-safe JSON marshaling - Prevent race conditions in httptrace callbacks Fixes #174 Generated with [codeagent](https://github.com/qbox/codeagent) Co-authored-by: zhangzqs <34616640+zhangzqs@users.noreply.github.com> --- internal/uplog/request_uplog.go | 40 ++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/internal/uplog/request_uplog.go b/internal/uplog/request_uplog.go index 3a584ce0..32fe2bbc 100644 --- a/internal/uplog/request_uplog.go +++ b/internal/uplog/request_uplog.go @@ -134,7 +134,7 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler }, GotFirstResponseByte: func() { if !wroteRequestTime.IsZero() { - uplog.WaitElapsedTime = getElapsedTime(wroteRequestTime) + atomic.StoreUint64(&uplog.WaitElapsedTime, getElapsedTime(wroteRequestTime)) } gotFirstResponseByteTime = time.Now() }, @@ -143,7 +143,7 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler }, DNSDone: func(info httptrace.DNSDoneInfo) { if !dnsStartTime.IsZero() { - uplog.DNSElapsedTime = getElapsedTime(dnsStartTime) + atomic.StoreUint64(&uplog.DNSElapsedTime, getElapsedTime(dnsStartTime)) } }, ConnectStart: func(network string, addr string) { @@ -151,7 +151,7 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler }, ConnectDone: func(network string, addr string, err error) { if !connectStartTime.IsZero() { - uplog.ConnectElapsedTime = getElapsedTime(connectStartTime) + atomic.StoreUint64(&uplog.ConnectElapsedTime, getElapsedTime(connectStartTime)) } }, TLSHandshakeStart: func() { @@ -159,7 +159,7 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler }, TLSHandshakeDone: func(tls.ConnectionState, error) { if !tlsHandshakeStartTime.IsZero() { - uplog.TLSConnectElapsedTime = getElapsedTime(tlsHandshakeStartTime) + atomic.StoreUint64(&uplog.TLSConnectElapsedTime, getElapsedTime(tlsHandshakeStartTime)) } }, WroteHeaders: func() { @@ -167,7 +167,7 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler }, WroteRequest: func(info httptrace.WroteRequestInfo) { if !wroteHeadersTime.IsZero() { - uplog.RequestElapsedTime = getElapsedTime(wroteHeadersTime) + atomic.StoreUint64(&uplog.RequestElapsedTime, getElapsedTime(wroteHeadersTime)) } wroteRequestTime = time.Now() }, @@ -177,9 +177,9 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler uplog.UpTime = beginAt.Unix() resp, err = handler(req) if !gotFirstResponseByteTime.IsZero() { - uplog.ResponseElapsedTime = getElapsedTime(gotFirstResponseByteTime) + atomic.StoreUint64(&uplog.ResponseElapsedTime, getElapsedTime(gotFirstResponseByteTime)) } - uplog.TotalElapsedTime = getElapsedTime(beginAt) + atomic.StoreUint64(&uplog.TotalElapsedTime, getElapsedTime(beginAt)) if err != nil { uplog.ErrorType, uplog.ErrorDescription = uplog.detect(resp, err) uplog.ErrorDescription = truncate(uplog.ErrorDescription, maxFieldValueLength) @@ -208,19 +208,37 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler } } } - if uplog.TotalElapsedTime > 0 { + totalElapsedTime := atomic.LoadUint64(&uplog.TotalElapsedTime) + if totalElapsedTime > 0 { if uplog.BytesSent > uplog.BytesReceived { - uplog.PerceptiveSpeed = uplog.BytesSent * 1000 / int64(uplog.TotalElapsedTime) + uplog.PerceptiveSpeed = uplog.BytesSent * 1000 / int64(totalElapsedTime) } else { - uplog.PerceptiveSpeed = uplog.BytesReceived * 1000 / int64(uplog.TotalElapsedTime) + uplog.PerceptiveSpeed = uplog.BytesReceived * 1000 / int64(totalElapsedTime) } } - if uplogBytes, jsonError := json.Marshal(uplog); jsonError == nil { + if uplogBytes, jsonError := json.Marshal(uplog.atomicSnapshot()); jsonError == nil { uplogChan <- uplogSerializedEntry{serializedUplog: uplogBytes, getUpToken: uplog.getUpToken} } return } +// atomicSnapshot creates a thread-safe copy of the RequestUplog struct by +// atomically reading the timing fields that may be concurrently modified +func (uplog *RequestUplog) atomicSnapshot() RequestUplog { + snapshot := *uplog // Copy all fields first + + // Atomically read the timing fields that are modified concurrently + snapshot.TotalElapsedTime = atomic.LoadUint64(&uplog.TotalElapsedTime) + snapshot.DNSElapsedTime = atomic.LoadUint64(&uplog.DNSElapsedTime) + snapshot.ConnectElapsedTime = atomic.LoadUint64(&uplog.ConnectElapsedTime) + snapshot.TLSConnectElapsedTime = atomic.LoadUint64(&uplog.TLSConnectElapsedTime) + snapshot.RequestElapsedTime = atomic.LoadUint64(&uplog.RequestElapsedTime) + snapshot.WaitElapsedTime = atomic.LoadUint64(&uplog.WaitElapsedTime) + snapshot.ResponseElapsedTime = atomic.LoadUint64(&uplog.ResponseElapsedTime) + + return snapshot +} + func (uplog *RequestUplog) detect(response *http.Response, err error) (errorType ErrorType, errorDescription string) { if err == nil { if response.StatusCode >= 300 { From 730cdad59c69b7f6ed3538bfdba2e778b52ad5f7 Mon Sep 17 00:00:00 2001 From: xgopilot Date: Mon, 17 Nov 2025 07:10:23 +0000 Subject: [PATCH 2/8] fix: implement callback tracking with async uplog optimization - Add callbackTracker mechanism to ensure data completeness - Track DNS, Connect, and TLS async callbacks with timeout - Wait up to 500ms for callbacks to complete before serialization - Make uplog serialization and sending async to reduce latency - Resolves data race and ensures complete timing information Requested by @YangSen-qn (Solution C with async optimization) Generated with [codeagent](https://github.com/qbox/codeagent) Co-authored-by: YangSen-qn <49863787+YangSen-qn@users.noreply.github.com> --- internal/uplog/request_uplog.go | 64 +++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 3 deletions(-) diff --git a/internal/uplog/request_uplog.go b/internal/uplog/request_uplog.go index 32fe2bbc..5833dcf6 100644 --- a/internal/uplog/request_uplog.go +++ b/internal/uplog/request_uplog.go @@ -59,6 +59,13 @@ type ( PerceptiveSpeed int64 `json:"perceptive_speed,omitempty"` getUpToken GetUpToken `json:"-"` } + + // callbackTracker tracks async httptrace callbacks to ensure data completeness + callbackTracker struct { + expectedCallbacks int32 + completedCallbacks int32 + doneChan chan struct{} + } ) func NewRequestUplog(apiName, targetBucket, targetKey string, getUpToken GetUpToken) (*RequestUplog, error) { @@ -89,6 +96,11 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler return handler(req) } + // Create callback tracker to ensure data completeness + tracker := &callbackTracker{ + doneChan: make(chan struct{}), + } + var dnsStartTime, gotFirstResponseByteTime, connectStartTime, tlsHandshakeStartTime, wroteHeadersTime, wroteRequestTime time.Time uplog.Method = truncate(req.Method, maxFieldValueLength) @@ -139,28 +151,34 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler gotFirstResponseByteTime = time.Now() }, DNSStart: func(httptrace.DNSStartInfo) { + atomic.AddInt32(&tracker.expectedCallbacks, 1) dnsStartTime = time.Now() }, DNSDone: func(info httptrace.DNSDoneInfo) { if !dnsStartTime.IsZero() { atomic.StoreUint64(&uplog.DNSElapsedTime, getElapsedTime(dnsStartTime)) } + tracker.markDone() }, ConnectStart: func(network string, addr string) { + atomic.AddInt32(&tracker.expectedCallbacks, 1) connectStartTime = time.Now() }, ConnectDone: func(network string, addr string, err error) { if !connectStartTime.IsZero() { atomic.StoreUint64(&uplog.ConnectElapsedTime, getElapsedTime(connectStartTime)) } + tracker.markDone() }, TLSHandshakeStart: func() { + atomic.AddInt32(&tracker.expectedCallbacks, 1) tlsHandshakeStartTime = time.Now() }, TLSHandshakeDone: func(tls.ConnectionState, error) { if !tlsHandshakeStartTime.IsZero() { atomic.StoreUint64(&uplog.TLSConnectElapsedTime, getElapsedTime(tlsHandshakeStartTime)) } + tracker.markDone() }, WroteHeaders: func() { wroteHeadersTime = time.Now() @@ -176,6 +194,10 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler beginAt := time.Now() uplog.UpTime = beginAt.Unix() resp, err = handler(req) + + // Wait for async callbacks to complete (with timeout) + tracker.waitForCallbacks(500 * time.Millisecond) + if !gotFirstResponseByteTime.IsZero() { atomic.StoreUint64(&uplog.ResponseElapsedTime, getElapsedTime(gotFirstResponseByteTime)) } @@ -216,9 +238,14 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler uplog.PerceptiveSpeed = uplog.BytesReceived * 1000 / int64(totalElapsedTime) } } - if uplogBytes, jsonError := json.Marshal(uplog.atomicSnapshot()); jsonError == nil { - uplogChan <- uplogSerializedEntry{serializedUplog: uplogBytes, getUpToken: uplog.getUpToken} - } + + // Async uplog serialization and sending (optimization requested by YangSen-qn) + go func() { + if uplogBytes, jsonError := json.Marshal(uplog.atomicSnapshot()); jsonError == nil { + uplogChan <- uplogSerializedEntry{serializedUplog: uplogBytes, getUpToken: uplog.getUpToken} + } + }() + return } @@ -281,3 +308,34 @@ func addCounter(c *uint64, n uint64) { atomic.AddUint64(c, n) } } + +// markDone marks a callback as completed and signals if all expected callbacks are done +func (t *callbackTracker) markDone() { + if atomic.AddInt32(&t.completedCallbacks, 1) == atomic.LoadInt32(&t.expectedCallbacks) { + select { + case t.doneChan <- struct{}{}: + default: + // Channel already signaled, do nothing + } + } +} + +// waitForCallbacks waits for all async callbacks to complete or timeout +func (t *callbackTracker) waitForCallbacks(timeout time.Duration) { + // If no callbacks are expected, return immediately + if atomic.LoadInt32(&t.expectedCallbacks) == 0 { + return + } + + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case <-t.doneChan: + // All async callbacks completed + return + case <-timer.C: + // Timeout reached, continue with current data + return + } +} From 133fa5f76291e5f35ff6df75f226dbb9c801ff63 Mon Sep 17 00:00:00 2001 From: xgopilot Date: Mon, 17 Nov 2025 07:28:45 +0000 Subject: [PATCH 3/8] refactor: use callbackTracker with delayed uplog submission - Implement callbackTracker structure containing uplog instance - Use RWMutex in update() method for thread-safe data access - Add delayed uplog submission mechanism with timer reset - Update methods ignore updates after submission - Async uplog serialization and sending for better performance This addresses data race while ensuring complete timing data capture from async httptrace callbacks. Generated with [codeagent](https://github.com/qbox/codeagent) Co-authored-by: YangSen-qn <49863787+YangSen-qn@users.noreply.github.com> --- internal/uplog/request_uplog.go | 190 ++++++++++++++++++-------------- 1 file changed, 107 insertions(+), 83 deletions(-) diff --git a/internal/uplog/request_uplog.go b/internal/uplog/request_uplog.go index 5833dcf6..bbf77443 100644 --- a/internal/uplog/request_uplog.go +++ b/internal/uplog/request_uplog.go @@ -11,6 +11,7 @@ import ( "os" "runtime" "strconv" + "sync" "sync/atomic" "time" @@ -60,11 +61,16 @@ type ( getUpToken GetUpToken `json:"-"` } - // callbackTracker tracks async httptrace callbacks to ensure data completeness + // callbackTracker manages uplog data updates from concurrent httptrace callbacks + // and implements delayed uplog submission with timer reset mechanism callbackTracker struct { - expectedCallbacks int32 - completedCallbacks int32 - doneChan chan struct{} + uplog *RequestUplog + mu sync.RWMutex + timer *time.Timer + submitted bool + submitDelay time.Duration + onceSubmit sync.Once + getUpToken GetUpToken } ) @@ -91,15 +97,81 @@ func (uplog *RequestUplog) Priority() clientv2.InterceptorPriority { return clientv2.InterceptorPriorityUplog } +// newCallbackTracker creates a new tracker for managing uplog updates +func newCallbackTracker(uplog *RequestUplog, getUpToken GetUpToken, submitDelay time.Duration) *callbackTracker { + return &callbackTracker{ + uplog: uplog, + getUpToken: getUpToken, + submitDelay: submitDelay, + } +} + +// update safely updates uplog data with read-write lock protection +// and resets the submission timer if submission hasn't occurred yet +func (t *callbackTracker) update(updateFunc func(*RequestUplog)) { + t.mu.Lock() + defer t.mu.Unlock() + + // Ignore updates after submission + if t.submitted { + return + } + + // Apply the update + updateFunc(t.uplog) + + // Reset the timer + if t.timer != nil { + t.timer.Stop() + } + t.timer = time.AfterFunc(t.submitDelay, func() { + t.submit() + }) +} + +// submit triggers the final uplog submission +// This method is called either by the timer or explicitly +// Once submitted, all subsequent updates are ignored +func (t *callbackTracker) submit() { + t.onceSubmit.Do(func() { + t.mu.Lock() + t.submitted = true + + // Create a snapshot of the uplog data + snapshot := *t.uplog + t.mu.Unlock() + + // Serialize and send asynchronously + go func() { + if uplogBytes, jsonError := json.Marshal(&snapshot); jsonError == nil { + uplogChan <- uplogSerializedEntry{ + serializedUplog: uplogBytes, + getUpToken: t.getUpToken, + } + } + }() + }) +} + +// cancelTimer stops the submission timer if it exists +func (t *callbackTracker) cancelTimer() { + t.mu.Lock() + defer t.mu.Unlock() + + if t.timer != nil { + t.timer.Stop() + } +} + func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler) (resp *http.Response, err error) { if !IsUplogEnabled() { return handler(req) } - // Create callback tracker to ensure data completeness - tracker := &callbackTracker{ - doneChan: make(chan struct{}), - } + // Create callback tracker with 50ms delay for uplog submission + // This allows capturing timing data from async callbacks + tracker := newCallbackTracker(uplog, uplog.getUpToken, 50*time.Millisecond) + defer tracker.cancelTimer() var dnsStartTime, gotFirstResponseByteTime, connectStartTime, tlsHandshakeStartTime, wroteHeadersTime, wroteRequestTime time.Time @@ -146,46 +218,50 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler }, GotFirstResponseByte: func() { if !wroteRequestTime.IsZero() { - atomic.StoreUint64(&uplog.WaitElapsedTime, getElapsedTime(wroteRequestTime)) + tracker.update(func(l *RequestUplog) { + l.WaitElapsedTime = getElapsedTime(wroteRequestTime) + }) } gotFirstResponseByteTime = time.Now() }, DNSStart: func(httptrace.DNSStartInfo) { - atomic.AddInt32(&tracker.expectedCallbacks, 1) dnsStartTime = time.Now() }, DNSDone: func(info httptrace.DNSDoneInfo) { if !dnsStartTime.IsZero() { - atomic.StoreUint64(&uplog.DNSElapsedTime, getElapsedTime(dnsStartTime)) + tracker.update(func(l *RequestUplog) { + l.DNSElapsedTime = getElapsedTime(dnsStartTime) + }) } - tracker.markDone() }, ConnectStart: func(network string, addr string) { - atomic.AddInt32(&tracker.expectedCallbacks, 1) connectStartTime = time.Now() }, ConnectDone: func(network string, addr string, err error) { if !connectStartTime.IsZero() { - atomic.StoreUint64(&uplog.ConnectElapsedTime, getElapsedTime(connectStartTime)) + tracker.update(func(l *RequestUplog) { + l.ConnectElapsedTime = getElapsedTime(connectStartTime) + }) } - tracker.markDone() }, TLSHandshakeStart: func() { - atomic.AddInt32(&tracker.expectedCallbacks, 1) tlsHandshakeStartTime = time.Now() }, TLSHandshakeDone: func(tls.ConnectionState, error) { if !tlsHandshakeStartTime.IsZero() { - atomic.StoreUint64(&uplog.TLSConnectElapsedTime, getElapsedTime(tlsHandshakeStartTime)) + tracker.update(func(l *RequestUplog) { + l.TLSConnectElapsedTime = getElapsedTime(tlsHandshakeStartTime) + }) } - tracker.markDone() }, WroteHeaders: func() { wroteHeadersTime = time.Now() }, WroteRequest: func(info httptrace.WroteRequestInfo) { if !wroteHeadersTime.IsZero() { - atomic.StoreUint64(&uplog.RequestElapsedTime, getElapsedTime(wroteHeadersTime)) + tracker.update(func(l *RequestUplog) { + l.RequestElapsedTime = getElapsedTime(wroteHeadersTime) + }) } wroteRequestTime = time.Now() }, @@ -194,14 +270,14 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler beginAt := time.Now() uplog.UpTime = beginAt.Unix() resp, err = handler(req) - - // Wait for async callbacks to complete (with timeout) - tracker.waitForCallbacks(500 * time.Millisecond) - if !gotFirstResponseByteTime.IsZero() { - atomic.StoreUint64(&uplog.ResponseElapsedTime, getElapsedTime(gotFirstResponseByteTime)) + tracker.update(func(l *RequestUplog) { + l.ResponseElapsedTime = getElapsedTime(gotFirstResponseByteTime) + }) } - atomic.StoreUint64(&uplog.TotalElapsedTime, getElapsedTime(beginAt)) + tracker.update(func(l *RequestUplog) { + l.TotalElapsedTime = getElapsedTime(beginAt) + }) if err != nil { uplog.ErrorType, uplog.ErrorDescription = uplog.detect(resp, err) uplog.ErrorDescription = truncate(uplog.ErrorDescription, maxFieldValueLength) @@ -230,42 +306,21 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler } } } - totalElapsedTime := atomic.LoadUint64(&uplog.TotalElapsedTime) - if totalElapsedTime > 0 { + if uplog.TotalElapsedTime > 0 { if uplog.BytesSent > uplog.BytesReceived { - uplog.PerceptiveSpeed = uplog.BytesSent * 1000 / int64(totalElapsedTime) + uplog.PerceptiveSpeed = uplog.BytesSent * 1000 / int64(uplog.TotalElapsedTime) } else { - uplog.PerceptiveSpeed = uplog.BytesReceived * 1000 / int64(totalElapsedTime) + uplog.PerceptiveSpeed = uplog.BytesReceived * 1000 / int64(uplog.TotalElapsedTime) } } - // Async uplog serialization and sending (optimization requested by YangSen-qn) - go func() { - if uplogBytes, jsonError := json.Marshal(uplog.atomicSnapshot()); jsonError == nil { - uplogChan <- uplogSerializedEntry{serializedUplog: uplogBytes, getUpToken: uplog.getUpToken} - } - }() + // Trigger uplog submission + // The timer will fire after delay unless more updates arrive + tracker.submit() return } -// atomicSnapshot creates a thread-safe copy of the RequestUplog struct by -// atomically reading the timing fields that may be concurrently modified -func (uplog *RequestUplog) atomicSnapshot() RequestUplog { - snapshot := *uplog // Copy all fields first - - // Atomically read the timing fields that are modified concurrently - snapshot.TotalElapsedTime = atomic.LoadUint64(&uplog.TotalElapsedTime) - snapshot.DNSElapsedTime = atomic.LoadUint64(&uplog.DNSElapsedTime) - snapshot.ConnectElapsedTime = atomic.LoadUint64(&uplog.ConnectElapsedTime) - snapshot.TLSConnectElapsedTime = atomic.LoadUint64(&uplog.TLSConnectElapsedTime) - snapshot.RequestElapsedTime = atomic.LoadUint64(&uplog.RequestElapsedTime) - snapshot.WaitElapsedTime = atomic.LoadUint64(&uplog.WaitElapsedTime) - snapshot.ResponseElapsedTime = atomic.LoadUint64(&uplog.ResponseElapsedTime) - - return snapshot -} - func (uplog *RequestUplog) detect(response *http.Response, err error) (errorType ErrorType, errorDescription string) { if err == nil { if response.StatusCode >= 300 { @@ -308,34 +363,3 @@ func addCounter(c *uint64, n uint64) { atomic.AddUint64(c, n) } } - -// markDone marks a callback as completed and signals if all expected callbacks are done -func (t *callbackTracker) markDone() { - if atomic.AddInt32(&t.completedCallbacks, 1) == atomic.LoadInt32(&t.expectedCallbacks) { - select { - case t.doneChan <- struct{}{}: - default: - // Channel already signaled, do nothing - } - } -} - -// waitForCallbacks waits for all async callbacks to complete or timeout -func (t *callbackTracker) waitForCallbacks(timeout time.Duration) { - // If no callbacks are expected, return immediately - if atomic.LoadInt32(&t.expectedCallbacks) == 0 { - return - } - - timer := time.NewTimer(timeout) - defer timer.Stop() - - select { - case <-t.doneChan: - // All async callbacks completed - return - case <-timer.C: - // Timeout reached, continue with current data - return - } -} From f378191a6c25b8c604019749aac82fc70204c2ff Mon Sep 17 00:00:00 2001 From: xgopilot Date: Mon, 17 Nov 2025 07:43:27 +0000 Subject: [PATCH 4/8] refactor: remove unnecessary defer cancelTimer and goroutine in submit Based on review feedback from @YangSen-qn: 1. Removed defer tracker.cancelTimer(): - Timer should continue running after Intercept() returns - Response body may not be fully read when function returns - Allows async callbacks to complete and update data properly 2. Removed unnecessary goroutine in submit(): - submit() is already called asynchronously by time.AfterFunc - Direct channel send is sufficient for async behavior - Reduces unnecessary goroutine overhead Generated with [codeagent](https://github.com/qbox/codeagent) Co-authored-by: YangSen-qn <49863787+YangSen-qn@users.noreply.github.com> --- internal/uplog/request_uplog.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/internal/uplog/request_uplog.go b/internal/uplog/request_uplog.go index bbf77443..1774fba4 100644 --- a/internal/uplog/request_uplog.go +++ b/internal/uplog/request_uplog.go @@ -141,15 +141,13 @@ func (t *callbackTracker) submit() { snapshot := *t.uplog t.mu.Unlock() - // Serialize and send asynchronously - go func() { - if uplogBytes, jsonError := json.Marshal(&snapshot); jsonError == nil { - uplogChan <- uplogSerializedEntry{ - serializedUplog: uplogBytes, - getUpToken: t.getUpToken, - } + // Serialize and send to uplog channel + if uplogBytes, jsonError := json.Marshal(&snapshot); jsonError == nil { + uplogChan <- uplogSerializedEntry{ + serializedUplog: uplogBytes, + getUpToken: t.getUpToken, } - }() + } }) } @@ -171,7 +169,6 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler // Create callback tracker with 50ms delay for uplog submission // This allows capturing timing data from async callbacks tracker := newCallbackTracker(uplog, uplog.getUpToken, 50*time.Millisecond) - defer tracker.cancelTimer() var dnsStartTime, gotFirstResponseByteTime, connectStartTime, tlsHandshakeStartTime, wroteHeadersTime, wroteRequestTime time.Time From 1ff94dff7d1b2cbec9b78b05a31198434e4d6b3e Mon Sep 17 00:00:00 2001 From: xgopilot Date: Mon, 17 Nov 2025 07:56:12 +0000 Subject: [PATCH 5/8] fix(uplog): increase callback delay to 2s and fix timer memory leak MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Changes: 1. Increase submitDelay from 50ms to 2s to ensure all async callbacks complete 2. Add timer cleanup after submit() to prevent goroutine leak The timer created by time.AfterFunc in the update() method could remain active even after explicit submit() call, causing unnecessary goroutine execution. Now we properly stop the timer after submission to prevent this memory leak. Addresses feedback from code review. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- internal/uplog/request_uplog.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/uplog/request_uplog.go b/internal/uplog/request_uplog.go index 1774fba4..5c969357 100644 --- a/internal/uplog/request_uplog.go +++ b/internal/uplog/request_uplog.go @@ -166,9 +166,9 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler return handler(req) } - // Create callback tracker with 50ms delay for uplog submission + // Create callback tracker with 2s delay for uplog submission // This allows capturing timing data from async callbacks - tracker := newCallbackTracker(uplog, uplog.getUpToken, 50*time.Millisecond) + tracker := newCallbackTracker(uplog, uplog.getUpToken, 2*time.Second) var dnsStartTime, gotFirstResponseByteTime, connectStartTime, tlsHandshakeStartTime, wroteHeadersTime, wroteRequestTime time.Time @@ -315,6 +315,9 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler // The timer will fire after delay unless more updates arrive tracker.submit() + // Stop the timer to prevent unnecessary goroutine execution and memory leak + tracker.cancelTimer() + return } From 2637b02e68e39f7b52f88c9179581cce097d059b Mon Sep 17 00:00:00 2001 From: xgopilot Date: Mon, 17 Nov 2025 08:10:28 +0000 Subject: [PATCH 6/8] fix: allow final timer to run for proper GC of callbackTracker - Remove cancelTimer() call after submit() to allow last timer to keep running - Remove unused cancelTimer() method - After submit() is called, update() ignores new requests via submitted flag - This ensures no new timers are created and callbackTracker can be GC'd properly Generated with [codeagent](https://github.com/qbox/codeagent) Co-authored-by: YangSen-qn <49863787+YangSen-qn@users.noreply.github.com> --- internal/uplog/request_uplog.go | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/internal/uplog/request_uplog.go b/internal/uplog/request_uplog.go index 5c969357..4b857d70 100644 --- a/internal/uplog/request_uplog.go +++ b/internal/uplog/request_uplog.go @@ -151,16 +151,6 @@ func (t *callbackTracker) submit() { }) } -// cancelTimer stops the submission timer if it exists -func (t *callbackTracker) cancelTimer() { - t.mu.Lock() - defer t.mu.Unlock() - - if t.timer != nil { - t.timer.Stop() - } -} - func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler) (resp *http.Response, err error) { if !IsUplogEnabled() { return handler(req) @@ -313,11 +303,11 @@ func (uplog *RequestUplog) Intercept(req *http.Request, handler clientv2.Handler // Trigger uplog submission // The timer will fire after delay unless more updates arrive + // Note: We allow the last timer to keep running to capture any late async callbacks + // Once submit() is called, no new timers will be created (checked in update()), + // allowing the tracker to be garbage collected after the timer fires. tracker.submit() - // Stop the timer to prevent unnecessary goroutine execution and memory leak - tracker.cancelTimer() - return } From 530ba9256ad6c024bd97dcbd2411f23dbaf7c30b Mon Sep 17 00:00:00 2001 From: xgopilot Date: Mon, 17 Nov 2025 08:32:35 +0000 Subject: [PATCH 7/8] fix(uplog): clear timer reference after submission for GC - Add defer to set timer to nil after submit() completes - Use proper locking (mu.Lock/Unlock) for thread-safe access to timer field - This helps garbage collector reclaim timer resources Generated with [codeagent](https://github.com/qbox/codeagent) Co-authored-by: YangSen-qn <49863787+YangSen-qn@users.noreply.github.com> --- internal/uplog/request_uplog.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/internal/uplog/request_uplog.go b/internal/uplog/request_uplog.go index 4b857d70..f6912ce7 100644 --- a/internal/uplog/request_uplog.go +++ b/internal/uplog/request_uplog.go @@ -134,6 +134,13 @@ func (t *callbackTracker) update(updateFunc func(*RequestUplog)) { // Once submitted, all subsequent updates are ignored func (t *callbackTracker) submit() { t.onceSubmit.Do(func() { + // Ensure timer is cleared after submission to help GC + defer func() { + t.mu.Lock() + t.timer = nil + t.mu.Unlock() + }() + t.mu.Lock() t.submitted = true From 8061e1047019a754975f58def338fe7338183967 Mon Sep 17 00:00:00 2001 From: xgopilot Date: Mon, 17 Nov 2025 09:15:15 +0000 Subject: [PATCH 8/8] refactor: move timer nil defer outside sync.Once for proper cleanup - Move defer statement to reset timer to nil outside of onceSubmit.Do() - This ensures timer is cleared every time submit() is called, not just once - Timer can be reassigned in update(), so it needs to be cleared on each submit Generated with [codeagent](https://github.com/qbox/codeagent) Co-authored-by: YangSen-qn <49863787+YangSen-qn@users.noreply.github.com> --- internal/uplog/request_uplog.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/internal/uplog/request_uplog.go b/internal/uplog/request_uplog.go index f6912ce7..dc231434 100644 --- a/internal/uplog/request_uplog.go +++ b/internal/uplog/request_uplog.go @@ -133,14 +133,15 @@ func (t *callbackTracker) update(updateFunc func(*RequestUplog)) { // This method is called either by the timer or explicitly // Once submitted, all subsequent updates are ignored func (t *callbackTracker) submit() { - t.onceSubmit.Do(func() { - // Ensure timer is cleared after submission to help GC - defer func() { - t.mu.Lock() - t.timer = nil - t.mu.Unlock() - }() + // Ensure timer is cleared after submission to help GC + // This defer is outside onceSubmit.Do so it runs every time submit() is called + defer func() { + t.mu.Lock() + t.timer = nil + t.mu.Unlock() + }() + t.onceSubmit.Do(func() { t.mu.Lock() t.submitted = true