Skip to content

Commit

Permalink
feat: emit with timeout
Browse files Browse the repository at this point in the history
This feature allows to send a packet and expect an acknowledgement from
the server within the given delay.

Syntax:

```java
socket.emit("hello", "world", new AckWithTimeout(5000) {
    @OverRide
    public void onTimeout() {
        // ...
    }

    @OverRide
    public void onSuccess(Object... args) {
        // ...
    }
});
```

Related:

- #309
- #517
  • Loading branch information
darrachequesne committed Jul 8, 2022
1 parent 7375763 commit fca3b95
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 6 deletions.
35 changes: 35 additions & 0 deletions src/main/java/io/socket/client/AckWithTimeout.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.socket.client;

import java.util.Timer;
import java.util.TimerTask;

public abstract class AckWithTimeout implements Ack {
private final long timeout;
private final Timer timer = new Timer();

/**
*
* @param timeout delay in milliseconds
*/
public AckWithTimeout(long timeout) {
this.timeout = timeout;
}

@Override
public final void call(Object... args) {
this.timer.cancel();
this.onSuccess(args);
}

public final void schedule(TimerTask task) {
this.timer.schedule(task, this.timeout);
}

public final void cancelTimer() {
this.timer.cancel();
}

public abstract void onSuccess(Object... args);
public abstract void onTimeout();

}
34 changes: 32 additions & 2 deletions src/main/java/io/socket/client/Socket.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,32 @@ public void run() {
Packet<JSONArray> packet = new Packet<>(Parser.EVENT, jsonArgs);

if (ack != null) {
logger.fine(String.format("emitting packet with ack id %d", ids));
Socket.this.acks.put(ids, ack);
final int ackId = Socket.this.ids;

logger.fine(String.format("emitting packet with ack id %d", ackId));

if (ack instanceof AckWithTimeout) {
final AckWithTimeout ackWithTimeout = (AckWithTimeout) ack;
ackWithTimeout.schedule(new TimerTask() {
@Override
public void run() {
// remove the ack from the map (to prevent an actual acknowledgement)
acks.remove(ackId);

// remove the packet from the buffer (if applicable)
Iterator<Packet<JSONArray>> iterator = sendBuffer.iterator();
while (iterator.hasNext()) {
if (iterator.next().id == ackId) {
iterator.remove();
}
}

ackWithTimeout.onTimeout();
}
});
}

Socket.this.acks.put(ackId, ack);
packet.id = ids++;
}

Expand Down Expand Up @@ -405,6 +429,12 @@ private void destroy() {
this.subs = null;
}

for (Ack ack : acks.values()) {
if (ack instanceof AckWithTimeout) {
((AckWithTimeout) ack).cancelTimer();
}
}

this.io.destroy();
}

Expand Down
116 changes: 112 additions & 4 deletions src/test/java/io/socket/client/SocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
import io.socket.util.Optional;
import org.json.JSONException;
import org.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.*;

import static java.util.Collections.singletonMap;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -287,4 +286,113 @@ public void call(Object... args) {
assertThat(values.take(), is("first"));
assertThat(values.take(), is("second"));
}

@Test(timeout = TIMEOUT)
public void shouldTimeoutAfterTheGivenDelayWhenSocketIsNotConnected() throws InterruptedException {
final BlockingQueue<Boolean> values = new LinkedBlockingQueue<>();

socket = client();

socket.emit("event", new AckWithTimeout(50) {
@Override
public void onSuccess(Object... args) {
fail();
}

@Override
public void onTimeout() {
values.offer(true);
}
});

assertThat(values.take(), is(true));
}

@Test(timeout = TIMEOUT)
public void shouldTimeoutWhenTheServerDoesNotAcknowledgeTheEvent() throws InterruptedException {
final BlockingQueue<Boolean> values = new LinkedBlockingQueue<>();

socket = client();

socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.emit("unknown", new AckWithTimeout(50) {
@Override
public void onTimeout() {
values.offer(true);
}

@Override
public void onSuccess(Object... args) {
fail();
}
});
}
});

socket.connect();

assertThat(values.take(), is(true));
}

@Test(timeout = TIMEOUT)
public void shouldTimeoutWhenTheServerDoesNotAcknowledgeTheEventInTime() throws InterruptedException {
final BlockingQueue<Boolean> values = new LinkedBlockingQueue<>();

socket = client();

socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.emit("ack", new AckWithTimeout(0) {
@Override
public void onTimeout() {
values.offer(true);
}

@Override
public void onSuccess(Object... args) {
fail();
}
});
}
});

socket.connect();

assertThat(values.take(), is(true));
}

@Test(timeout = TIMEOUT)
public void shouldNotTimeoutWhenTheServerDoesAcknowledgeTheEvent() throws InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<>();

socket = client();

socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.emit("ack", 1, "2", new byte[] { 3 }, new AckWithTimeout(200) {
@Override
public void onTimeout() {
fail();
}

@Override
public void onSuccess(Object... args) {
for (Object arg : args) {
values.offer(arg);
}
}
});
}
});

socket.connect();

assertThat((Integer) values.take(), is(1));
assertThat((String) values.take(), is("2"));
assertThat((byte[]) values.take(), is(new byte[] { 3 }));
}
}

0 comments on commit fca3b95

Please sign in to comment.