Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Detect and recycle dangling websockets.
Browse files Browse the repository at this point in the history
Co-authored-by: Dimitris Papavasiliou <dpapavas@gmail.com>
Co-authored-by: Ralf Kohrt <rkohrt@phys.ethz.ch>
  • Loading branch information
Dimitris Papavasiliou and Ralf Kohrt committed Sep 30, 2018
1 parent 3946b22 commit 5f5bab7
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,14 @@ public RealtimeSleepTimer(Context context) {
}

@Override
public void sleep(long millis) {
public void sleep(long millis) throws InterruptedException {
context.registerReceiver(alarmReceiver,
new IntentFilter(AlarmReceiver.WAKE_UP_THREAD_ACTION));

final long startTime = System.currentTimeMillis();
alarmReceiver.setAlarm(millis);

while (System.currentTimeMillis() - startTime < millis) {
try {
synchronized (this) {
wait(millis - System.currentTimeMillis() + startTime);
}
} catch (InterruptedException e) {
Log.w(TAG, e);
}
synchronized (this) {
wait(millis);
}

context.unregisterReceiver(alarmReceiver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class SignalServiceMessagePipe {
* @throws TimeoutException
*/
public SignalServiceEnvelope read(long timeout, TimeUnit unit)
throws InvalidVersionException, IOException, TimeoutException
throws InvalidVersionException, IOException, TimeoutException, InterruptedException
{
return read(timeout, unit, new NullMessagePipeCallback());
}
Expand All @@ -86,7 +86,7 @@ public SignalServiceEnvelope read(long timeout, TimeUnit unit)
* @throws InvalidVersionException
*/
public SignalServiceEnvelope read(long timeout, TimeUnit unit, MessagePipeCallback callback)
throws TimeoutException, IOException, InvalidVersionException
throws TimeoutException, IOException, InvalidVersionException, InterruptedException
{
while (true) {
WebSocketRequestMessage request = websocket.readRequest(unit.toMillis(timeout));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -116,22 +117,20 @@ public synchronized void disconnect() {
}

if (keepAliveSender != null) {
keepAliveSender.shutdown();
keepAliveSender.interrupt();
keepAliveSender = null;
}
}

public synchronized WebSocketRequestMessage readRequest(long timeoutMillis)
throws TimeoutException, IOException
throws TimeoutException, IOException, InterruptedException
{
if (client == null) {
throw new IOException("Connection closed!");
}

long startTime = System.currentTimeMillis();

while (client != null && incomingRequests.isEmpty() && elapsedTime(startTime) < timeoutMillis) {
Util.wait(this, Math.max(1, timeoutMillis - elapsedTime(startTime)));
if (client != null && incomingRequests.isEmpty()) {
wait(timeoutMillis);
}

if (incomingRequests.isEmpty() && client == null) throw new IOException("Connection closed!");
Expand Down Expand Up @@ -172,20 +171,16 @@ public synchronized void sendResponse(WebSocketResponseMessage response) throws
}
}

private synchronized void sendKeepAlive() throws IOException {
private synchronized Future<Pair<Integer, String>> sendKeepAlive() throws IOException {
if (keepAliveSender != null && client != null) {
byte[] message = WebSocketMessage.newBuilder()
.setType(WebSocketMessage.Type.REQUEST)
.setRequest(WebSocketRequestMessage.newBuilder()
.setId(System.currentTimeMillis())
.setPath("/v1/keepalive")
.setVerb("GET")
.build()).build()
.toByteArray();

if (!client.send(ByteString.of(message))) {
throw new IOException("Write failed!");
}
WebSocketRequestMessage request = WebSocketRequestMessage.newBuilder()
.setId(System.currentTimeMillis())
.setPath("/v1/keepalive")
.setVerb("GET")
.build();
return sendRequest(request);
} else {
return null;
}
}

Expand Down Expand Up @@ -238,7 +233,7 @@ public synchronized void onClosed(WebSocket webSocket, int code, String reason)
}

if (keepAliveSender != null) {
keepAliveSender.shutdown();
keepAliveSender.interrupt();
keepAliveSender = null;
}

Expand Down Expand Up @@ -301,23 +296,43 @@ private Pair<SSLSocketFactory, X509TrustManager> createTlsSocketFactory(TrustSto

private class KeepAliveSender extends Thread {

private AtomicBoolean stop = new AtomicBoolean(false);

public void run() {
while (!stop.get()) {
Future future = null;
boolean severed = false;

while (!interrupted()) {
try {
sleepTimer.sleep(TimeUnit.SECONDS.toMillis(KEEPALIVE_TIMEOUT_SECONDS));

Log.w(TAG, "Sending keep alive...");
sendKeepAlive();
} catch (Throwable e) {
Log.w(TAG, e);
if (future != null) {
try {
future.get(0L, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e){
severed = true;
}
}
} catch (InterruptedException e) {
Log.d(TAG, "Keep alive sender interrupted; exiting loop.");
break;
}
}
}

public void shutdown() {
stop.set(true);
if (severed) {
Log.d(TAG, "No response to previous keep-alive; forcing new connection.");

disconnect();
synchronized(WebSocketConnection.this) {
WebSocketConnection.this.notifyAll();
}
} else {
Log.d(TAG, "Sending keep alive...");

try {
future = sendKeepAlive();
} catch (IOException e) {
Log.d(TAG, "Failed to send keep alive: " + e.getMessage());
}
}
}
}
}

Expand Down

0 comments on commit 5f5bab7

Please sign in to comment.