Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[amazonechocontrol] fix memory/thread leak and failing connection #7919

Merged
merged 4 commits into from Jun 16, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -35,7 +35,11 @@
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.io.UpgradeListener;
Expand All @@ -60,7 +64,7 @@ public class WebSocketConnection {
private final Gson gson = new Gson();
private final WebSocketClient webSocketClient;
private final IWebSocketCommandHandler webSocketCommandHandler;
private final Listener listener;
private final AmazonEchoControlWebSocket amazonEchoControlWebSocket;

private @Nullable Session session;
private @Nullable Timer pingTimer;
Expand All @@ -72,11 +76,10 @@ public class WebSocketConnection {
public WebSocketConnection(String amazonSite, List<HttpCookie> sessionCookies,
IWebSocketCommandHandler webSocketCommandHandler) throws IOException {
Comment on lines 73 to 74
Copy link
Contributor

@cpmeister cpmeister Jun 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It always bothered me that we are not overriding the default executor used by the WebSocketClient. The default one has a min thread count of 8 and all the threads are non-daemon. I'd prefer if we provided our own thread pool so we can use an existing thread pool for all these connections instead of creating new ones each time.
It would also give us an opportunity to give the threads names that would tie them to this binding.

this.webSocketCommandHandler = webSocketCommandHandler;
listener = new Listener();
amazonEchoControlWebSocket = new AmazonEchoControlWebSocket();

SslContextFactory sslContextFactory = new SslContextFactory();
webSocketClient = new WebSocketClient(sslContextFactory);

try {
String host;
if (StringUtils.equalsIgnoreCase(amazonSite, "amazon.com")) {
Expand Down Expand Up @@ -110,16 +113,14 @@ public WebSocketConnection(String amazonSite, List<HttpCookie> sessionCookies,
}

ClientUpgradeRequest request = new ClientUpgradeRequest();
request.setHeader("host", host);
request.setHeader("Cache-Control", "no-cache");
request.setHeader("Pragma", "no-cache");
request.setHeader("Host", host);
request.setHeader("Origin", "alexa." + amazonSite);

request.setCookies(cookiesForWs);

initPongTimeoutTimer();
sessionFuture = webSocketClient.connect(listener, uri, request);

sessionFuture = webSocketClient.connect(amazonEchoControlWebSocket, uri, request,
amazonEchoControlWebSocket);
} catch (URISyntaxException e) {
logger.debug("Initialize web socket failed", e);
}
Expand All @@ -134,7 +135,7 @@ private void setSession(Session session) {

@Override
public void run() {
listener.sendPing();
amazonEchoControlWebSocket.sendPing();
}
}, 180000, 180000);
}
Expand All @@ -156,7 +157,7 @@ public void close() {
try {
session.close();
} catch (Exception e) {
logger.debug("Closing sessing failed", e);
logger.debug("Closing session failed", e);
}
}
logger.trace("Connect future = {}", sessionFuture);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how useful logging the future would be since I don't think it implements a toString().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. It print the status of the future (cancelled, completed, etc):

Connect future = java.util.concurrent.CompletableFuture@485b17bb[Completed normally]

Expand Down Expand Up @@ -200,13 +201,13 @@ public void run() {
}, 60000);
}

class Listener implements WebSocketListener, UpgradeListener {
private final Logger logger = LoggerFactory.getLogger(Listener.class);
@WebSocket(maxTextMessageSize = 64 * 1024, maxBinaryMessageSize = 64 * 1024)
@SuppressWarnings("unused")
public class AmazonEchoControlWebSocket {
int msgCounter = -1;
int messageId;

Listener() {
logger.trace("Initialized listener");
AmazonEchoControlWebSocket() {
this.messageId = ThreadLocalRandom.current().nextInt(0, Short.MAX_VALUE);
}

Expand Down Expand Up @@ -377,7 +378,7 @@ Message parseIncomingMessage(byte[] data) {
return message;
}

@Override
@OnWebSocketConnect
public void onWebSocketConnect(@Nullable Session session) {
if (session != null) {
this.msgCounter = -1;
Expand All @@ -388,7 +389,7 @@ public void onWebSocketConnect(@Nullable Session session) {
}
}

@Override
@OnWebSocketMessage
public void onWebSocketBinary(byte @Nullable [] data, int offset, int len) {
if (data == null) {
return;
Expand Down Expand Up @@ -427,37 +428,25 @@ public void onWebSocketBinary(byte @Nullable [] data, int offset, int len) {
}
}

@Override
@OnWebSocketMessage
public void onWebSocketText(@Nullable String message) {
logger.trace("Received text message: '{}'", message);
}

@Override
@OnWebSocketClose
public void onWebSocketClose(int code, @Nullable String reason) {
logger.info("Web Socket close {}. Reason: {}", code, reason);
WebSocketConnection.this.close();
}

@Override
@OnWebSocketError
public void onWebSocketError(@Nullable Throwable error) {
logger.info("Web Socket error", error);
J-N-K marked this conversation as resolved.
Show resolved Hide resolved
if (!closed) {
WebSocketConnection.this.close();
}
}

@Override
public void onHandshakeResponse(@Nullable UpgradeResponse response) {
if (response == null) {
logger.trace("Received null response in onHandshakeResponse");
return;
}
logger.trace("UpgradeResponse: {}", response);
}

@Override
public void onHandshakeRequest(@Nullable UpgradeRequest request) {
}

public void sendPing() {
logger.debug("Send Ping");
WebSocketConnection.this.initPongTimeoutTimer();
Expand Down