Skip to content

Commit

Permalink
feat: implement catch-all listeners
Browse files Browse the repository at this point in the history
Syntax:

```java
socket.onAnyIncoming(new Emitter.Listener() {
    @OverRide
    public void call(Object... args) {
        // ...
    }
});

socket.onAnyOutgoing(new Emitter.Listener() {
    @OverRide
    public void call(Object... args) {
        // ...
    }
});
```

Related:

- socketio/engine.io-client-java#99
- #243
- #475
  • Loading branch information
darrachequesne committed Jul 8, 2022
1 parent fca3b95 commit c7d50b8
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 0 deletions.
62 changes: 62 additions & 0 deletions src/main/java/io/socket/client/Socket.java
Expand Up @@ -9,6 +9,7 @@
import org.json.JSONObject;

import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -63,6 +64,9 @@ public class Socket extends Emitter {
private final Queue<List<Object>> receiveBuffer = new LinkedList<>();
private final Queue<Packet<JSONArray>> sendBuffer = new LinkedList<>();

private ConcurrentLinkedQueue<Listener> onAnyIncomingListeners = new ConcurrentLinkedQueue<>();
private ConcurrentLinkedQueue<Listener> onAnyOutgoingListeners = new ConcurrentLinkedQueue<>();

public Socket(Manager io, String nsp, Manager.Options opts) {
this.io = io;
this.nsp = nsp;
Expand Down Expand Up @@ -250,6 +254,14 @@ public void run() {
}

private void packet(Packet packet) {
if (packet.type == Parser.EVENT) {
if (!onAnyOutgoingListeners.isEmpty()) {
Object[] argsAsArray = toArray((JSONArray) packet.data);
for (Listener listener : onAnyOutgoingListeners) {
listener.call(argsAsArray);
}
}
}
packet.nsp = this.nsp;
this.io.packet(packet);
}
Expand Down Expand Up @@ -340,6 +352,12 @@ private void onevent(Packet<JSONArray> packet) {

if (this.connected) {
if (args.isEmpty()) return;
if (!this.onAnyIncomingListeners.isEmpty()) {
Object[] argsAsArray = args.toArray();
for (Listener listener : this.onAnyIncomingListeners) {
listener.call(argsAsArray);
}
}
String event = args.remove(0).toString();
super.emit(event, args.toArray());
} else {
Expand Down Expand Up @@ -507,5 +525,49 @@ private static Object[] toArray(JSONArray array) {
}
return data;
}

public Socket onAnyIncoming(Listener fn) {
this.onAnyIncomingListeners.add(fn);
return this;
}

public Socket offAnyIncoming() {
this.onAnyIncomingListeners.clear();
return this;
}

public Socket offAnyIncoming(Listener fn) {
Iterator<Listener> it = this.onAnyIncomingListeners.iterator();
while (it.hasNext()) {
Listener listener = it.next();
if (listener == fn) {
it.remove();
break;
}
}
return this;
}

public Socket onAnyOutgoing(Listener fn) {
this.onAnyOutgoingListeners.add(fn);
return this;
}

public Socket offAnyOutgoing() {
this.onAnyOutgoingListeners.clear();
return this;
}

public Socket offAnyOutgoing(Listener fn) {
Iterator<Listener> it = this.onAnyOutgoingListeners.iterator();
while (it.hasNext()) {
Listener listener = it.next();
if (listener == fn) {
it.remove();
break;
}
}
return this;
}
}

55 changes: 55 additions & 0 deletions src/test/java/io/socket/client/SocketTest.java
Expand Up @@ -395,4 +395,59 @@ public void onSuccess(Object... args) {
assertThat((String) values.take(), is("2"));
assertThat((byte[]) values.take(), is(new byte[] { 3 }));
}

@Test(timeout = TIMEOUT)
public void shouldCallCatchAllListenerForIncomingPackets() throws InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<>();

socket = client();

socket.on("message", new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.emit("echo", 1, "2", new byte[] { 3 });

socket.onAnyIncoming(new Emitter.Listener() {
@Override
public void call(Object... args) {
for (Object arg : args) {
values.offer(arg);
}
}
});
}
});

socket.connect();

assertThat((String) values.take(), is("echoBack"));
assertThat((Integer) values.take(), is(1));
assertThat((String) values.take(), is("2"));
assertThat((byte[]) values.take(), is(new byte[] { 3 }));
}

@Test(timeout = TIMEOUT)
public void shouldCallCatchAllListenerForOutgoingPackets() throws InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<>();

socket = client();

socket.emit("echo", 1, "2", new byte[] { 3 });

socket.onAnyOutgoing(new Emitter.Listener() {
@Override
public void call(Object... args) {
for (Object arg : args) {
values.offer(arg);
}
}
});

socket.connect();

assertThat((String) values.take(), is("echo"));
assertThat((Integer) values.take(), is(1));
assertThat((String) values.take(), is("2"));
assertThat((byte[]) values.take(), is(new byte[] { 3 }));
}
}

0 comments on commit c7d50b8

Please sign in to comment.