Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync.Pool leaks #146

Closed
WinstonPrivacy opened this issue Oct 26, 2019 · 70 comments
Closed

sync.Pool leaks #146

WinstonPrivacy opened this issue Oct 26, 2019 · 70 comments

Comments

@WinstonPrivacy
Copy link

We've noticed memory leaks with KCP-Go. We have traced this down to sync.Pool leaks by instrumenting with counters for each of the calls to Get() and Put(), ie:

					// recycle the recovers
					xmitBuf.Put(r)
					// Temporary counter
					atomic.AddUint64(&Framesput, 1)

Over time, about 5-8% of byte buffers are not being returned to the sync.Pool and at 1500 bytes each, this adds up.

Are you seeing the same thing? Any thoughts on how to fix this?

@xtaci
Copy link
Owner

xtaci commented Oct 26, 2019

runtime will recycle the rest

@WinstonPrivacy
Copy link
Author

That doesn't appear to be the case. We are seeing blocks allocated by sync.pool and held by kcp-go which are never released.

Here's an example comparison of two heap dumps, taken about 60 minutes apart:

image

The sync pool has grown by 13Mb here. 9Mb of this was allocated by output(). Not shown are the other 4Mb which was allocated by Input().

@WinstonPrivacy
Copy link
Author

As an additional experiment, I shut down all smux/kcp-go sessions. Afterwards, the memory remains allocated by sync pool (28Mb total):

image

@xtaci
Copy link
Owner

xtaci commented Oct 26, 2019

sync.Pool will not return memory to system immediately

@WinstonPrivacy
Copy link
Author

WinstonPrivacy commented Oct 26, 2019 via email

@xtaci
Copy link
Owner

xtaci commented Oct 27, 2019

https://github.com/xtaci/kcp-go/blob/master/fec.go#L30
FEC will hold some data

@WinstonPrivacy
Copy link
Author

WinstonPrivacy commented Oct 27, 2019 via email

@xtaci
Copy link
Owner

xtaci commented Oct 28, 2019

it's a fixed buffer, and the logic of retransmission, no thoughts on that

@WinstonPrivacy
Copy link
Author

WinstonPrivacy commented Oct 28, 2019 via email

@WinstonPrivacy
Copy link
Author

I was able to capture a better heap dump. The severity of the leak is pretty bad... the process can only run for about 4-6 hours before it crashes due to running out of memory.

It looks like the FECEncoder and Decoder are being held on to primarily along with the sync.Pool buffers. I am trying to trace it down further and it appears that updater is calling kcp:flush(). That in turn is opening a new UDP session in some cases. Maybe it is trying to send frames to a closed connection?

image

image

@xtaci
Copy link
Owner

xtaci commented Oct 29, 2019

FEC contains a fixed-size sliding window(shardsize * 3), it will not return nor grow, i'v added a timeout policy to purge this window, don't know it will help or not.

if your process crashed after 4 hrs, it's probably because your code has some problem,
kcptun uses this kcp-go for long running routers, it means they will run over months.

@WinstonPrivacy
Copy link
Author

WinstonPrivacy commented Oct 29, 2019 via email

@xtaci
Copy link
Owner

xtaci commented Oct 30, 2019

how many concurrent connections on this 1GB ram server?

@xtaci
Copy link
Owner

xtaci commented Oct 30, 2019

and have you invoked sess.Close()

@WinstonPrivacy
Copy link
Author

WinstonPrivacy commented Oct 30, 2019 via email

@xtaci
Copy link
Owner

xtaci commented Oct 30, 2019

a recent change is to allocate a goroutine for each session.updater, so a possible leak is you didn't invoke sess.Close to close the goroutine

@WinstonPrivacy
Copy link
Author

WinstonPrivacy commented Oct 30, 2019 via email

@WinstonPrivacy
Copy link
Author

WinstonPrivacy commented Oct 30, 2019

We're closing sessions properly... the counts are always exactly what they should be (I found your snmp counters, which is very helpful).

Is it possible that there is some kind of underlying Golang thing happening here? We are using sync.pool elsewhere in our application.

Edit: This seems doubtful. We're creating separate sync pools and I've seen other implementations which do the same thing, so I don't think that can be the problem.

@WinstonPrivacy
Copy link
Author

After a preliminary analysis, your latest commits look very good. After 5minutes, we're seeing a 1.5Mb reduction in RAM as a result of cleaning up timed out FEC packets. 20% of FEC packets was typical before, now it's down to 4%.

I will continue to watch and report back!

@WinstonPrivacy
Copy link
Author

This latest commit definitely helped a lot. We're still experiencing a significant memory leak but I've been able to run for 13h under heavy load now, about 3x longer than before.

@xtaci
Copy link
Owner

xtaci commented Nov 1, 2019

glad to hear, how about setting like GOGC=20 to recycle aggresively?

@WinstonPrivacy
Copy link
Author

WinstonPrivacy commented Nov 1, 2019 via email

@xtaci
Copy link
Owner

xtaci commented Nov 1, 2019

theoretically, xmitBuf will be recycled automatically, yes it's true a closed session may left unacknowledge data not being returned to xmitBuf, but it will be recycled by runtime eventually.
setting a lower GOGC may mitgate this, but I don't know why this is not working. Did you find goroutine leaking in tests?

@xtaci
Copy link
Owner

xtaci commented Nov 1, 2019

though I can have the segments to return to xmitBuf when closing, but I want to figure out why
I mean if ,by accident, UDPSession(s) were held by some data structure in you program, leaking is inevitable.

@WinstonPrivacy
Copy link
Author

WinstonPrivacy commented Nov 1, 2019 via email

@xtaci
Copy link
Owner

xtaci commented Nov 1, 2019

no, normally, UDPSession shouldn't close itself in any condition, like what socket() does, the only way to close is to check the errors returned from Read() and Write(), including timeout errors, so , a program MUST implement keepalive mechanism to guarantee the session will be closed when there's no error returned from Read() and Write(), it's exactly what smux's keepalive does.

btw, purging buffers when calling Close() is ok, since Close() has resource releasing semantic like in libc close(), but I don't know if this change could work for your scenario.

@WinstonPrivacy
Copy link
Author

WinstonPrivacy commented Nov 1, 2019 via email

@xtaci
Copy link
Owner

xtaci commented Nov 1, 2019

try v5.4.14

@WinstonPrivacy
Copy link
Author

Will do!

@WinstonPrivacy
Copy link
Author

It seems a bit better at this point but I have yet to really stress test it. I have added some debug messages to your patch and can see that more segments are being returned to the sync pool, however after shutting everything down and GC'ing a few times, there still appears to be pinned objects. Here's a heap dump:

image

I will hit this harder later to see how long the process can run without crashing.

@WinstonPrivacy
Copy link
Author

WinstonPrivacy commented Nov 4, 2019 via email

@WinstonPrivacy
Copy link
Author

Looks like this resolved most of the problem. I ran the process for about 3 hours, which would typically result in about 40Mb of RAM leaked to the sync pool. Now it is down to just 4Mb. Half of this is in the sync.pool, which as a global variable should be expected to remain behind for awhile. The other is in newFECEncoder.

image

Relevant code snippet illustrating removing the function parameter:

func (s *UDPSession) tx() {
	if s.xconn == nil || s.xconnWriteError != nil {
		s.defaultTx()
		return
	}

	// x/net version
	nbytes := 0
	npkts := 0
	for len(s.txqueue) > 0 {
		if n, err := s.xconn.WriteBatch(s.txqueue, 0); err == nil {

@xtaci
Copy link
Owner

xtaci commented Nov 4, 2019

changing tx() parameters is another story, but s.txqueue should be just set to nil to recycle immediately

@xtaci
Copy link
Owner

xtaci commented Nov 4, 2019

@WinstonPrivacy
Copy link
Author

WinstonPrivacy commented Nov 4, 2019 via email

@WinstonPrivacy
Copy link
Author

Just saw the latest commit. Wouldn't it be easier and probably better design to just eliminate the parameter? There is no code path in which a tx buffer is sent in that is not the same one as *UDPSession.txqueue. Here's what I'm using and it clears out almost everything when the UDPSessions are closed:

func (s *UDPSession) tx() {
	if s.xconn == nil || s.xconnWriteError != nil {
		s.defaultTx()
		return
	}

	// x/net version
	nbytes := 0
	npkts := 0
	for len(s.txqueue) > 0 {
		if n, err := s.xconn.WriteBatch(s.txqueue, 0); err == nil {
			for k := range s.txqueue[:n] {

				nbytes += len(s.txqueue[k].Buffers[0])
				xmitBuf.Put(s.txqueue[k].Buffers[0])
				// TODO: Record Put
				atomic.AddUint64(&Framesput, 1)
			}
			npkts += n
			s.txqueue = s.txqueue[n:]
		} else {			
			// compatibility issue:
			// for linux kernel<=2.6.32, support for sendmmsg is not available
			// an error of type os.SyscallError will be returned
			if operr, ok := err.(*net.OpError); ok {
				if se, ok := operr.Err.(*os.SyscallError); ok {
					if se.Syscall == "sendmmsg" {
						s.xconnWriteError = se
						s.defaultTx()
						return
					}
				}
			}
			s.notifyWriteError(errors.WithStack(err))
			break
		}
	}

	atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
	atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
}

@WinstonPrivacy
Copy link
Author

Looks like we're still leaking FEC Decoder packets. Here's a chart showing how RAM usage directly correlates to packets which weren't Put() back in the sync.Pool:

image

The leak is still pretty severe - between 10-30% of total Get()'s are never returned, based on traffic conditions. Some are still present when the UDPSession is closed but cleaning those up amounts for only a small percentage of the total.

Any chance there is a race condition which could be overwriting the decoder.rx queue?

@WinstonPrivacy
Copy link
Author

Cleaning up fecDecoder.rx packets after update() closes reduces the total leak by half or more, so I think that calling Close() was contributing to the leak.

Unfortunately, this still doesn't explain how so many packets are still being lost in the decode() logic. I am beginning to suspect a race condition which results in a conflict when updating the decoder.rx entries.

@xtaci
Copy link
Owner

xtaci commented Nov 5, 2019

I don't want to discuss parameters changing in tx() now, it's a long story.
And let's focus on xmitBuf now.

@xtaci
Copy link
Owner

xtaci commented Nov 5, 2019

FEC will occupy 3x(datashard+parityshard) * mtuLimit at ANY time, this is how FEC works-- recovery data from previous data and parity.

@xtaci
Copy link
Owner

xtaci commented Nov 5, 2019

and, Yes, we can recycle FEC shards after close()

@WinstonPrivacy
Copy link
Author

WinstonPrivacy commented Nov 5, 2019 via email

@WinstonPrivacy
Copy link
Author

Great news, I found the root cause(s).

In decode(), there are two cases in which []byte slices are not being returned to the pool:

  1. After the call to dec.codec.ReconstructData(), the recovered shards are added to an array and returned to the caller, where they are later Put(). However, the remaining parity shards (30 in our case), are not returned. This is the fix:
			if err := dec.codec.ReconstructData(shards); err == nil {
				for k := range shards[:dec.dataShards] {
					if !shardsflag[k] {
						// recovered data should be recycled
						recovered = append(recovered, shards[k])
					}
				}
				// Recover the extra shards
				for k := range shards[dec.dataShards:] {
					if shards[k] != nil {
						xmitBuf.Put(shards[k])
					}
				}
  1. The above fix resolves most of them, but there is still the case where shard recovery fails. In this case, all of the shards which were taken from the sync.Pool should be returned. This is pretty substantial, as there is always at least one missing data shard and 30 additional parity shards. That amounts to around 45k RAM leak per unrecoverable transmission error.

Here's the code for that, which immediately follows the code above:

			} else {
				// Couldn't reconstruct. Recover all shards.
				for k := range shards {
					if shards[k] != nil && !shardsflag[k] {
						xmitBuf.Put(shards[k])
					}
				}
			}
  1. I have added some code to clean up after updater() and monitor() close. I prefer to have it here as it catches situations where a UDPSession enters an error state and closes, even if the client doesn't explicitly call Close().
func (s *UDPSession) updater() {
	defer func() {
		if s.txqueue != nil && len(s.txqueue) > 0 {
			for i, _ := range s.txqueue {
				if len(s.txqueue[i].Buffers) > 0 {
					xmitBuf.Put([]byte(s.txqueue[i].Buffers[0]))
				}
			}
		}

		// Clean FECDecoder queue
		if s.fecDecoder != nil && len(s.fecDecoder.rx) > 0 {
			s.fecDecoder.rx = s.fecDecoder.freeRange(0, len(s.fecDecoder.rx), s.fecDecoder.rx)
		}

		s.fecDecoder = nil
		s.fecEncoder = nil
	}()
func (l *Listener) monitor() {
	defer func() {
		for _, s := range l.sessions {
			// Free FECDecoder frames
			if s.fecDecoder != nil && len(s.fecDecoder.rx) > 0 {
				s.fecDecoder.freeRange(0, len(s.fecDecoder.rx), s.fecDecoder.rx)
			}

			s.fecDecoder = nil
			s.fecEncoder = nil
		}

	}()
  1. Finally, the above cleanup code introduces a possible race condition, where a UDPSession is aborted while decode() is still in progress. This can cause a panic in freeRange. This is resolved by simply checking for a zero-length slice at the beginning:
func (dec *fecDecoder) freeRange(first, n int, q []fecElement) []fecElement {
	if len(q) == 0 {
		// Prevents panic on race condition when a session is unexpectedly closed and
		// updater() or monitor() cleans up the rx queue while decode() is in progress.
		return q
	}

I'm still testing the above changes, but thought I would paste them here for your review.

@xtaci
Copy link
Owner

xtaci commented Nov 6, 2019

EDIT: looks like
dec.rx = dec.freeRange(first, numshard, dec.rx)
[first, numshards] contains datashard + parityshard

@xtaci
Copy link
Owner

xtaci commented Nov 6, 2019

and even if parityshards were not put back while reconstructing, the rxlimit rule and expire rule can still recycle them.

@WinstonPrivacy
Copy link
Author

Sorry, it looks like a message was deleted and I'm not following. Can you clarify?

@WinstonPrivacy
Copy link
Author

Also, we ran almost 12 hours without a major memory leak with the changes above. I don't think they are perfect though because I'm seeing more dropped smux connections.

@xtaci
Copy link
Owner

xtaci commented Nov 6, 2019

		if numDataShard == dec.dataShards {
			// case 1: no loss on data shards
			dec.rx = dec.freeRange(first, numshard, dec.rx)
		} else if numshard >= dec.dataShards {
			// case 2: loss on data shards, but it's recoverable from parity shards
			for k := range shards {
				if shards[k] != nil {
					dlen := len(shards[k])
					shards[k] = shards[k][:maxlen]
					copy(shards[k][dlen:], dec.zeros)
				} else {
					shards[k] = xmitBuf.Get().([]byte)[:0]
				}
			}
			if err := dec.codec.ReconstructData(shards); err == nil {
				for k := range shards[:dec.dataShards] {
					if !shardsflag[k] {
						// recovered data should be recycled
						recovered = append(recovered, shards[k])
					}
				}
			}
			dec.rx = dec.freeRange(first, numshard, dec.rx)  
		}

check:
dec.rx = dec.freeRange(first, numshard, dec.rx)
[first, numshards] contains datashard + parityshard

and even if parityshards were not put back while reconstructing, the rxlimit rule and expire rule can still recycle them.

@WinstonPrivacy
Copy link
Author

Yes, I put counters on all of those. freeRange won't return the 30 shards at the end, because they were taken from the sync.pool up above.

@xtaci
Copy link
Owner

xtaci commented Nov 7, 2019

I know your point now, there are some problem with reedsolomon codes library, the parity shards recovery is unstable, sometimes it will allocate a new buffer with incorrect capacity, sometimes not. so the only way is not to preallocate buffer to parityshards.

@xtaci
Copy link
Owner

xtaci commented Nov 7, 2019

@WinstonPrivacy
Copy link
Author

Interesting. I will try this out in preparation for our next release and let you know how it goes!

@WinstonPrivacy
Copy link
Author

Just tried this version and it doesn't resolve the memory leak... if anything, it might be worse than before.

@xtaci
Copy link
Owner

xtaci commented Nov 11, 2019

another way is to use Reconstruct instead of ReconstructData to make parity recovery fully controllable.
but I've tested , the length might be smaller than 1500. we should contact klauspost to fix that.

@WinstonPrivacy
Copy link
Author

I'm trying to narrow down the problem now. The issue is that RAM is under control but kcp sessions seem to be breaking at a much higher rate... possibly ReconstructData is re-using the allocated []byte buffers instead of copying them? If that's the case, then the new code would potentially result in sync.Pool allocating them for some other session to use, resulting in a conflict.

@WinstonPrivacy
Copy link
Author

WinstonPrivacy commented Nov 11, 2019

Looks like that is the problem. When I comment out the code which Puts() the parity shards back in the sync.Pool, KCP sessions are stable again. So perhaps three possible solutions:

  1. Don't Get() the parity shards from the sync.Pool.
  2. Implement Reconstruct to be sync.Pool friendly
  3. Modify reedsolomon.Encoder to copy bytes in a manner friendlier to sync.Pool

@WinstonPrivacy
Copy link
Author

Shame on me, I think your fix may have actually worked after all. After reading the reedsolomon docs, I saw that it allocates []byte slices when the missing shards are nil. I went back to my code and saw that I was still putting those back in the sync.Pool (a change made from an earlier version). Removing that has restored stability to the connections and the leak rate has gone down quite a bit.

Still not zero, but getting closer.

@WinstonPrivacy
Copy link
Author

So far so good! Sync pool leaks are down to < 1%, which allows us to run for a few days before restarting.

@WinstonPrivacy
Copy link
Author

Can't rule out all memory leaks but things are a lot better now. Am closing out this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants