Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http3: streams leaked in outgoingStreamsMap #4513

Closed
GeorgeMac opened this issue May 13, 2024 · 10 comments · Fixed by #4523
Closed

http3: streams leaked in outgoingStreamsMap #4513

GeorgeMac opened this issue May 13, 2024 · 10 comments · Fixed by #4523

Comments

@GeorgeMac
Copy link
Contributor

I was just stress testing our little reverse proxy built on quic-go and found that memory increased linearly with requests.
I was performing around 50 rps, all of which were responding OK/200.

A bit of profiling has shown that old stream nums and their relevant streams are accumulating here:

Screenshot 2024-05-13 at 17 39 00

Here I was running around 50 rps at an instance of our project which is proxying requests from one Go stdlib net/http server over a http3.RoundTripper.
The dip and plateau is where I stopped sending requests. The memory is just being held in the streams map now.

Screenshot 2024-05-13 at 17 50 39

Here is screenshot of a heap profile where we can see it accumulating in the map.

Looking at the relevant code, I have yet to see anything clear streams from the StreamMap types.
I tried calling DeleteStream explicitly once isDone (in the code here) was true. But that led to errors on one side of the connection. So I figured I best open this back up to you guys for your thoughts.

Cheers!

@marten-seemann
Copy link
Member

marten-seemann commented May 14, 2024

Thank you for investigating @GeorgeMac! This is likely related to the changes we introduced in v0.43, but the question is if this is happening in the http3 or the quic package.

I tried reproducing the behavior: https://gist.github.com/marten-seemann/33db22a3f7f7d957803ca1d574bfeae7. Everything seems to work here, I don't see a lot of memory in the outgoing streams map, and regularly logging the size of the map confirms that there are never more than a few streams in this map.

Looking at the relevant code, I have yet to see anything clear streams from the StreamMap types.

The stream tracks its transitions through the QUIC state machine, and calls the onCompleted callback once it reaches a terminal state, which then removes it from the streams map (similar for the receive side of the stream):

quic-go/send_stream.go

Lines 365 to 388 in 93c4785

func (s *sendStream) isNewlyCompleted() bool {
if s.completed {
return false
}
// We need to keep the stream around until all frames have been sent and acknowledged.
if s.numOutstandingFrames > 0 || len(s.retransmissionQueue) > 0 {
return false
}
// The stream is completed if we sent the FIN.
if s.finSent {
s.completed = true
return true
}
// The stream is also completed if:
// 1. the application called CancelWrite, or
// 2. we received a STOP_SENDING, and
// * the application consumed the error via Write, or
// * the application called CLsoe
if s.cancelWriteErr != nil && (s.cancellationFlagged || s.finishedWriting) {
s.completed = true
return true
}
return false
}

Any idea what why the leak doesn't show up in my example?

@GeorgeMac
Copy link
Contributor Author

GeorgeMac commented May 14, 2024

Hey @marten-seemann Thanks for the speedy reply!

Im doing a bit more digging this morning. My particular example has me using both the http3 Server and Client. I wonder if that is related.

I misinterpretted the pprof profiles a bit there. It seems the inuse_bytes are all accumulating around OpenStreamSync:

(pprof) list openRequestStream
Total: 12.01MB
ROUTINE ======================== github.com/quic-go/quic-go/http3.(*connection).openRequestStream in /Users/georgemac/github/quic-go/quic-go/http3/conn.go
         0     8.50MB (flat, cum) 70.78% of Total
         .          .     96:func (c *connection) openRequestStream(
         .          .     97:	ctx context.Context,
         .          .     98:	requestWriter *requestWriter,
         .          .     99:	reqDone chan<- struct{},
         .          .    100:	disableCompression bool,
         .          .    101:	maxHeaderBytes uint64,
         .          .    102:) (*requestStream, error) {
         .        7MB    103:	str, err := c.Connection.OpenStreamSync(ctx)
         .          .    104:	if err != nil {
         .          .    105:		return nil, err
         .          .    106:	}
         .     1.50MB    107:	datagrams := newDatagrammer(func(b []byte) error { return c.sendDatagram(str.StreamID(), b) })
         .          .    108:	c.streamMx.Lock()
         .          .    109:	c.streams[str.StreamID()] = datagrams
         .          .    110:	c.streamMx.Unlock()
         .          .    111:	qstr := newStateTrackingStream(str, func(s streamState, e error) { c.onStreamStateChange(str.StreamID(), s, e) })
         .          .    112:	hstr := newStream(qstr, c, datagrams)

I am going to see if I can get your reproduction to demonstrate this.

@GeorgeMac
Copy link
Contributor Author

Here is some more context:

ROUTINE ======================== github.com/quic-go/quic-go.(*outgoingStreamsMap[go.shape.167fca9d82513da8bdac0b9e3effc756f99b9bba143ea1f3a5bdf24154cc753a]).OpenStreamSync in /Users/georgemac/github/quic-go/quic-go/streams_map_outgoing.go
         0        7MB (flat, cum) 58.29% of Total
         .          .     69:func (m *outgoingStreamsMap[T]) OpenStreamSync(ctx context.Context) (T, error) {
         .          .     70:	m.mutex.Lock()
         .          .     71:	defer m.mutex.Unlock()
         .          .     72:
         .          .     73:	if m.closeErr != nil {
         .          .     74:		return *new(T), m.closeErr
         .          .     75:	}
         .          .     76:
         .          .     77:	if err := ctx.Err(); err != nil {
         .          .     78:		return *new(T), err
         .          .     79:	}
         .          .     80:
         .          .     81:	if len(m.openQueue) == 0 && m.nextStream <= m.maxStream {
         .        7MB     82:		return m.openStream(), nil
         .          .     83:	}
         .          .     84:
         .          .     85:	waitChan := make(chan struct{}, 1)
         .          .     86:	queuePos := m.highestInQueue
         .          .     87:	m.highestInQueue++
ROUTINE ======================== github.com/quic-go/quic-go.(*streamsMap).OpenStreamSync in /Users/georgemac/github/quic-go/quic-go/streams_map.go
         0        7MB (flat, cum) 58.29% of Total
         .          .    136:func (m *streamsMap) OpenStreamSync(ctx context.Context) (Stream, error) {
         .          .    137:	m.mutex.Lock()
         .          .    138:	reset := m.reset
         .          .    139:	mm := m.outgoingBidiStreams
         .          .    140:	m.mutex.Unlock()
         .          .    141:	if reset {
         .          .    142:		return nil, Err0RTTRejected
         .          .    143:	}
         .        7MB    144:	str, err := mm.OpenStreamSync(ctx)
         .          .    145:	return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective)
         .          .    146:}
         .          .    147:
         .          .    148:func (m *streamsMap) OpenUniStream() (SendStream, error) {
         .          .    149:	m.mutex.Lock()
(pprof) list openStream
Total: 12.01MB
ROUTINE ======================== github.com/quic-go/quic-go.(*outgoingStreamsMap[go.shape.167fca9d82513da8bdac0b9e3effc756f99b9bba143ea1f3a5bdf24154cc753a]).openStream in /Users/georgemac/github/quic-go/quic-go/streams_map_outgoing.go
         0        7MB (flat, cum) 58.29% of Total
         .          .    120:func (m *outgoingStreamsMap[T]) openStream() T {
         .        7MB    121:	s := m.newStream(m.nextStream)
         .          .    122:	m.streams[m.nextStream] = s
         .          .    123:	fmt.Println("Streams size", len(m.streams))
         .          .    124:	m.nextStream++
         .          .    125:	return s
         .          .    126:}
(pprof) list newStream
Total: 12.01MB
ROUTINE ======================== github.com/quic-go/quic-go.newStream in /Users/georgemac/github/quic-go/quic-go/stream.go
       3MB        6MB (flat, cum) 49.96% of Total
         .          .     89:func newStream(
         .          .     90:	ctx context.Context,
         .          .     91:	streamID protocol.StreamID,
         .          .     92:	sender streamSender,
         .          .     93:	flowController flowcontrol.StreamFlowController,
         .          .     94:) *stream {
    2.50MB     2.50MB     95:	s := &stream{sender: sender}
         .          .     96:	senderForSendStream := &uniStreamSender{
         .          .     97:		streamSender: sender,
         .          .     98:		onStreamCompletedImpl: func() {
         .          .     99:			s.completedMutex.Lock()
         .          .    100:			s.sendStreamCompleted = true
         .          .    101:			s.checkIfCompleted()
         .          .    102:			s.completedMutex.Unlock()
         .          .    103:		},
         .          .    104:	}
         .        2MB    105:	s.sendStream = *newSendStream(ctx, streamID, senderForSendStream, flowController)
         .          .    106:	senderForReceiveStream := &uniStreamSender{
         .          .    107:		streamSender: sender,
  512.01kB   512.01kB    108:		onStreamCompletedImpl: func() {
         .          .    109:			s.completedMutex.Lock()
         .          .    110:			s.receiveStreamCompleted = true
         .          .    111:			s.checkIfCompleted()
         .          .    112:			s.completedMutex.Unlock()
         .          .    113:		},
         .          .    114:	}
         .        1MB    115:	s.receiveStream = *newReceiveStream(streamID, senderForReceiveStream, flowController)
         .          .    116:	return s
         .          .    117:}
         .          .    118:
         .          .    119:// need to define StreamID() here, since both receiveStream and readStream have a StreamID()
         .          .    120:func (s *stream) StreamID() protocol.StreamID {

@GeorgeMac
Copy link
Contributor Author

GeorgeMac commented May 14, 2024

Still digging, but my latest theory is that these streams are being held hostage by an uncancelled context via newStateTrackingStream:

(pprof) list newStateTrackingStream
Total: 37.57MB
ROUTINE ======================== github.com/quic-go/quic-go/http3.newStateTrackingStream in /Users/georgemac/github/quic-go/quic-go/http3/state_tracking_stream.go
         0     1.50MB (flat, cum)  3.99% of Total
         .          .     30:func newStateTrackingStream(s quic.Stream, onStateChange func(streamState, error)) *stateTrackingStream {
         .     1.50MB     31:	context.AfterFunc(s.Context(), func() {
         .          .     32:		onStateChange(streamStateSendClosed, context.Cause(s.Context()))
         .          .     33:	})
         .          .     34:	return &stateTrackingStream{
         .          .     35:		Stream:        s,
         .          .     36:		state:         streamStateOpen,

Update: Not convinced of this anymore. I added some atomic counts around creating and canceling these and they match up perfectly.

@GeorgeMac
Copy link
Contributor Author

GeorgeMac commented May 14, 2024

Been staring at it all day 😂 Have gone around a few ideas. It looks like it might be the datagrams map:

Screenshot 2024-05-14 at 17 21 47
time=2024-05-14T17:21:28.845+01:00 level=DEBUG msg="Finished handling request" method=GET path=/ error=<nil>
time=2024-05-14T17:21:28.855+01:00 level=DEBUG msg="Handling request" method=GET path=/
Datagrams 6180
time=2024-05-14T17:21:28.856+01:00 level=DEBUG msg="Finished handling request" method=GET path=/ error=<nil>
time=2024-05-14T17:21:28.865+01:00 level=DEBUG msg="Handling request" method=GET path=/
Datagrams 6181
time=2024-05-14T17:21:28.866+01:00 level=DEBUG msg="Finished handling request" method=GET path=/ error=<nil>
time=2024-05-14T17:21:28.875+01:00 level=DEBUG msg="Handling request" method=GET path=/
Datagrams 6182
time=2024-05-14T17:21:28.875+01:00 level=DEBUG msg="Finished handling request" method=GET path=/ error=<nil>
time=2024-05-14T17:21:28.885+01:00 level=DEBUG msg="Handling request" method=GET path=/
Datagrams 6183
time=2024-05-14T17:21:28.886+01:00 level=DEBUG msg="Finished handling request" method=GET path=/ error=<nil>

Seems to just grow and grow

Seems like in the client case I am never seeing a onStreamStateChange(id, streamStateReceiveClosed, ...).

Update:

I think I see that the state tracker is not currently handling the streamSendAndReceiveClosed condition.

@tobyxdd
Copy link
Contributor

tobyxdd commented May 14, 2024

I have a very similar issue since 0.43.0. Large amount of streams accumulate in memory without being freed:

Showing nodes accounting for 671.27MB, 88.68% of 756.93MB total
Dropped 65 nodes (cum <= 3.78MB)
Showing top 10 nodes out of 61
      flat  flat%   sum%        cum   cum%
  160.06MB 21.15% 21.15%   357.57MB 47.24%  github.com/apernet/quic-go.newStream
  110.27MB 14.57% 35.71%   121.28MB 16.02%  github.com/apernet/quic-go.(*frameSorter).push
   68.01MB  8.98% 44.70%   100.51MB 13.28%  github.com/apernet/quic-go.newSendStream
   63.01MB  8.32% 53.02%    97.01MB 12.82%  github.com/apernet/quic-go.newReceiveStream (inline)
   58.28MB  7.70% 60.72%    58.78MB  7.77%  io.copyBuffer
   57.01MB  7.53% 68.25%    57.01MB  7.53%  github.com/apernet/quic-go/internal/flowcontrol.NewStreamFlowController
      52MB  6.87% 75.12%       52MB  6.87%  github.com/apernet/quic-go/http3.newDatagrammer (inline)
   43.56MB  5.76% 80.88%    43.56MB  5.76%  github.com/apernet/quic-go/internal/wire.init.0.func1
   34.06MB  4.50% 85.38%    99.57MB 13.15%  github.com/apernet/quic-go/http3.(*connection).acceptStream
      25MB  3.30% 88.68%       25MB  3.30%  context.withCancel (inline)
(pprof) list new.*Stream
Total: 756.93MB
ROUTINE ======================== github.com/apernet/quic-go.newReceiveStream in C:\Users\tobyx\go\pkg\mod\github.com\apernet\quic-go@v0.43.1-0.20240429030958-51a0843014d6\receive_stream.go
   63.01MB    97.01MB (flat, cum) 12.82% of Total
         .          .     61:func newReceiveStream(
         .          .     62:   streamID protocol.StreamID,
         .          .     63:   sender streamSender,
         .          .     64:   flowController flowcontrol.StreamFlowController,
         .          .     65:) *receiveStream {
         .          .     66:   return &receiveStream{
         .          .     67:           streamID:       streamID,
         .          .     68:           sender:         sender,
         .          .     69:           flowController: flowController,
         .       34MB     70:           frameQueue:     newFrameSorter(),
   35.50MB    35.50MB     71:           readChan:       make(chan struct{}, 1),
   27.50MB    27.50MB     72:           readOnce:       make(chan struct{}, 1),
         .          .     73:           finalOffset:    protocol.MaxByteCount,
         .          .     74:   }
         .          .     75:}
         .          .     76:
         .          .     77:func (s *receiveStream) StreamID() protocol.StreamID {
ROUTINE ======================== github.com/apernet/quic-go.newSendStream in C:\Users\tobyx\go\pkg\mod\github.com\apernet\quic-go@v0.43.1-0.20240429030958-51a0843014d6\send_stream.go
   68.01MB   100.51MB (flat, cum) 13.28% of Total
         .          .     66:func newSendStream(
         .          .     67:   ctx context.Context,
         .          .     68:   streamID protocol.StreamID,
         .          .     69:   sender streamSender,
         .          .     70:   flowController flowcontrol.StreamFlowController,
         .          .     71:) *sendStream {
         .          .     72:   s := &sendStream{
         .          .     73:           streamID:       streamID,
         .          .     74:           sender:         sender,
         .          .     75:           flowController: flowController,
   34.50MB    34.50MB     76:           writeChan:      make(chan struct{}, 1),
   33.50MB    33.50MB     77:           writeOnce:      make(chan struct{}, 1), // cap: 1, to protect against concurrent use of Write
         .          .     78:   }
         .    32.50MB     79:   s.ctx, s.ctxCancel = context.WithCancelCause(ctx)
         .          .     80:   return s
         .          .     81:}
         .          .     82:
         .          .     83:func (s *sendStream) StreamID() protocol.StreamID {
         .          .     84:   return s.streamID // same for receiveStream and sendStream
ROUTINE ======================== github.com/apernet/quic-go.newStream in C:\Users\tobyx\go\pkg\mod\github.com\apernet\quic-go@v0.43.1-0.20240429030958-51a0843014d6\stream.go
  160.06MB   357.57MB (flat, cum) 47.24% of Total
         .          .     89:func newStream(
         .          .     90:   ctx context.Context,
         .          .     91:   streamID protocol.StreamID,
         .          .     92:   sender streamSender,
         .          .     93:   flowController flowcontrol.StreamFlowController,
         .          .     94:) *stream {
  135.56MB   135.56MB     95:   s := &stream{sender: sender}
       2MB        2MB     96:   senderForSendStream := &uniStreamSender{
         .          .     97:           streamSender: sender,
    5.50MB     5.50MB     98:           onStreamCompletedImpl: func() {
         .          .     99:                   s.completedMutex.Lock()
         .          .    100:                   s.sendStreamCompleted = true
         .          .    101:                   s.checkIfCompleted()
         .          .    102:                   s.completedMutex.Unlock()
         .          .    103:           },
         .          .    104:   }
         .   100.51MB    105:   s.sendStream = *newSendStream(ctx, streamID, senderForSendStream, flowController)
      12MB       12MB    106:   senderForReceiveStream := &uniStreamSender{
         .          .    107:           streamSender: sender,
       5MB        5MB    108:           onStreamCompletedImpl: func() {
         .          .    109:                   s.completedMutex.Lock()
         .          .    110:                   s.receiveStreamCompleted = true
         .          .    111:                   s.checkIfCompleted()
         .          .    112:                   s.completedMutex.Unlock()
         .          .    113:           },
         .          .    114:   }
         .    97.01MB    115:   s.receiveStream = *newReceiveStream(streamID, senderForReceiveStream, flowController)
         .          .    116:   return s
         .          .    117:}
         .          .    118:
         .          .    119:// need to define StreamID() here, since both receiveStream and readStream have a StreamID()
         .          .    120:func (s *stream) StreamID() protocol.StreamID {
ROUTINE ======================== github.com/apernet/quic-go/http3.newStateTrackingStream in C:\Users\tobyx\go\pkg\mod\github.com\apernet\quic-go@v0.43.1-0.20240429030958-51a0843014d6\http3\state_tracking_stream.go
   13.50MB    13.50MB (flat, cum)  1.78% of Total
         .          .     28:func newStateTrackingStream(s quic.Stream, onStateChange func(streamState, error)) *stateTrackingStream {
   13.50MB    13.50MB     29:   return &stateTrackingStream{
         .          .     30:           Stream:        s,
         .          .     31:           state:         streamStateOpen,
         .          .     32:           onStateChange: onStateChange,
         .          .     33:   }
         .          .     34:}

@tobyxdd
Copy link
Contributor

tobyxdd commented May 14, 2024

Graph in svg if needed:

profile001

@marten-seemann
Copy link
Member

@tobyxdd Can you try out #4523? Would be good to know if this is the same issue.

@GeorgeMac
Copy link
Contributor Author

Looks pretty identical to my observations in pprof 👍

@tobyxdd
Copy link
Contributor

tobyxdd commented May 17, 2024

@marten-seemann @GeorgeMac yup I can confirm #4523 fixes it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants