Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #925 Memory Leak while WebSocketServerHandshakeException or Channel failed #926

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
33 changes: 29 additions & 4 deletions src/main/java/com/corundumstudio/socketio/handler/ClientsBox.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,44 @@
*/
package com.corundumstudio.socketio.handler;

import com.corundumstudio.socketio.HandshakeData;
import io.netty.channel.Channel;
import io.netty.util.internal.PlatformDependent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import com.corundumstudio.socketio.HandshakeData;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;

public class ClientsBox {
private static final Logger log = LoggerFactory.getLogger(ClientsBox.class);

private final Map<UUID, ClientHead> uuid2clients = PlatformDependent.newConcurrentHashMap();
private final Map<Channel, ClientHead> channel2clients = PlatformDependent.newConcurrentHashMap();

private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

public ClientsBox() {
scheduledExecutorService.scheduleWithFixedDelay(() -> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a periodical task for this and schedule this check after client connection?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid there would be bugs in future commit that cause this is not dereferenced after client disconnected

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like the one this PR fix, we cannot confirm there would not be any bugs alike in the future. but we should not let this kind of bug cause Memory Leak. We should actively detect that, remove that and warn it by log

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic should be implemented in com.corundumstudio.socketio.handler.ClientHead#onChannelDisconnect() method without ScheduledExecutorService usage.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic should be implemented in com.corundumstudio.socketio.handler.ClientHead#onChannelDisconnect() method without ScheduledExecutorService usage.

but like the bug this time, onChannelDisconnect is called indeed(due to scheduled ping timeout), but the namespaceClients does not contain the client which throw WebSocketException, which result in the uuid2clients does not remove it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're checking entry.getValue().isConnected() but it can be set only in onChannelDisconnect() handler. I suggest to call clientsBox.removeClient() in this method

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HashZhang

List<UUID> disconnected = uuid2clients.entrySet()
.stream().filter(entry -> !entry.getValue().isConnected())
.map(Map.Entry::getKey)
.collect(Collectors.toList());
disconnected.forEach(uuid -> {
ClientHead clientHead = uuid2clients.remove(uuid);
if (clientHead != null) {
log.warn("Client with sessionId {}-{} was disconnected but still exists in uuid2clients",
clientHead.getSessionId(), clientHead.getEngineIOVersion());
}
});
}, 5, 5, java.util.concurrent.TimeUnit.SECONDS);
}

// TODO use storeFactory
public HandshakeData getHandshakeData(UUID sessionId) {
ClientHead client = uuid2clients.get(sessionId);
Expand All @@ -42,8 +67,8 @@ public void addClient(ClientHead clientHead) {
uuid2clients.put(clientHead.getSessionId(), clientHead);
}

public void removeClient(UUID sessionId) {
uuid2clients.remove(sessionId);
public ClientHead removeClient(UUID sessionId) {
return uuid2clients.remove(sessionId);
}

public ClientHead get(UUID sessionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,31 +165,48 @@ private void handshake(ChannelHandlerContext ctx, final UUID sessionId, String p
new WebSocketServerHandshakerFactory(getWebSocketLocation(req), null, true, configuration.getMaxFramePayloadLength());
WebSocketServerHandshaker handshaker = factory.newHandshaker(req);
if (handshaker != null) {
ChannelFuture f = handshaker.handshake(channel, req);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't handshake " + sessionId, future.cause());
return;
}
try {
ChannelFuture f = handshaker.handshake(channel, req);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.warn("Can't handshake {}", sessionId, future.cause());
closeClient(sessionId, channel);
return;
}

channel.pipeline().addBefore(SocketIOChannelInitializer.WEB_SOCKET_TRANSPORT, SocketIOChannelInitializer.WEB_SOCKET_AGGREGATOR,
new WebSocketFrameAggregator(configuration.getMaxFramePayloadLength()));
connectClient(channel, sessionId);
}
});
channel.pipeline().addBefore(SocketIOChannelInitializer.WEB_SOCKET_TRANSPORT, SocketIOChannelInitializer.WEB_SOCKET_AGGREGATOR,
new WebSocketFrameAggregator(configuration.getMaxFramePayloadLength()));
connectClient(channel, sessionId);
}
});
} catch (Throwable e) {
log.warn("Can't handshake {}, {}", sessionId, e.getMessage(), e);
closeClient(sessionId, channel);
}
} else {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
}
}

private void closeClient(UUID sessionId, Channel channel) {
try {
channel.close();
} catch (Throwable t) {
log.warn("Can't close channel for sessionId: {}", sessionId, t);
}
ClientHead clientHead = clientsBox.removeClient(sessionId);
clientHead.disconnect();
log.info("Client with sessionId: {} was disconnected", sessionId);
}

private void connectClient(final Channel channel, final UUID sessionId) {
ClientHead client = clientsBox.get(sessionId);
if (client == null) {
log.warn("Unauthorized client with sessionId: {} with ip: {}. Channel closed!",
sessionId, channel.remoteAddress());
channel.close();
closeClient(sessionId, channel);
return;
}

Expand Down