From 276851580c15ce0b0d8969b6b7edd7f5cf03b1b6 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Mon, 27 Jan 2014 17:34:17 +0100 Subject: [PATCH] Fix #78 Provide point to point communication Signed-off-by: Clement Escoffier --- .../router/DefaultWebSocketCallback.java | 8 ++- .../router/OnMessageWebSocketCallback.java | 9 +++- .../org/wisdom/router/WebSocketRouter.java | 28 ++++++---- .../org/wisdom/api/annotations/Closed.java | 2 + .../org/wisdom/api/annotations/OnMessage.java | 2 + .../org/wisdom/api/annotations/Opened.java | 2 + .../wisdom/api/http/websockets/Publisher.java | 32 ++++++++++++ .../http/websockets/WebSocketDispatcher.java | 2 + .../http/websockets/WebSocketListener.java | 25 +++++++-- .../org/wisdom/engine/server/Dispatcher.java | 52 +++++++++++++++++-- .../wisdom/engine/server/WisdomHandler.java | 5 +- 11 files changed, 144 insertions(+), 23 deletions(-) diff --git a/router/src/main/java/org/wisdom/router/DefaultWebSocketCallback.java b/router/src/main/java/org/wisdom/router/DefaultWebSocketCallback.java index eb3499b57..0f2df6028 100644 --- a/router/src/main/java/org/wisdom/router/DefaultWebSocketCallback.java +++ b/router/src/main/java/org/wisdom/router/DefaultWebSocketCallback.java @@ -103,12 +103,16 @@ public Map getPathParametersEncoded(String uri) { return map; } - public void invoke(String uri) throws InvocationTargetException, IllegalAccessException { + public void invoke(String uri, String client) throws InvocationTargetException, IllegalAccessException { Map values = getPathParametersEncoded(uri); Object[] parameters = new Object[arguments.size()]; for (int i = 0; i < arguments.size(); i++) { RouteUtils.Argument argument = arguments.get(i); - parameters[i] = RouteUtils.getParameter(argument, values); + if (argument.getName().equals("client") && argument.getType().equals(String.class)) { + parameters[i] = client; + } else { + parameters[i] = RouteUtils.getParameter(argument, values); + } } method.invoke(controller, parameters); } diff --git a/router/src/main/java/org/wisdom/router/OnMessageWebSocketCallback.java b/router/src/main/java/org/wisdom/router/OnMessageWebSocketCallback.java index c7a38fa36..666c783e3 100644 --- a/router/src/main/java/org/wisdom/router/OnMessageWebSocketCallback.java +++ b/router/src/main/java/org/wisdom/router/OnMessageWebSocketCallback.java @@ -57,14 +57,19 @@ public List buildArguments(Method method) { return arguments; } - public void invoke(String uri, byte[] content, ContentEngine engine) throws InvocationTargetException, + public void invoke(String uri, String client, byte[] content, ContentEngine engine) throws + InvocationTargetException, IllegalAccessException { Map values = getPathParametersEncoded(uri); Object[] parameters = new Object[arguments.size()]; for (int i = 0; i < arguments.size(); i++) { RouteUtils.Argument argument = arguments.get(i); if (argument.getSource() == RouteUtils.Source.PARAMETER) { - parameters[i] = RouteUtils.getParameter(argument, values); + if (argument.getName().equals("client") && argument.getType().equals(String.class)) { + parameters[i] = client; + } else { + parameters[i] = RouteUtils.getParameter(argument, values); + } } else { // Body parameters[i] = transform(argument.getType(), content, engine); diff --git a/router/src/main/java/org/wisdom/router/WebSocketRouter.java b/router/src/main/java/org/wisdom/router/WebSocketRouter.java index 8c760063d..c324692f8 100644 --- a/router/src/main/java/org/wisdom/router/WebSocketRouter.java +++ b/router/src/main/java/org/wisdom/router/WebSocketRouter.java @@ -1,6 +1,8 @@ package org.wisdom.router; import org.apache.felix.ipojo.annotations.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.wisdom.api.Controller; import org.wisdom.api.annotations.Closed; import org.wisdom.api.annotations.OnMessage; @@ -10,8 +12,6 @@ import org.wisdom.api.http.websockets.WebSocketDispatcher; import org.wisdom.api.http.websockets.WebSocketListener; import org.wisdom.api.router.RouteUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -34,7 +34,7 @@ public class WebSocketRouter implements WebSocketListener, Publisher { private List closes = new ArrayList<>(); private List listeners = new ArrayList<>(); - @Requires(optional=true) + @Requires(optional = true) private ContentEngine engine; @@ -123,11 +123,11 @@ public synchronized void unbindController(Controller controller) { } @Override - public void received(String uri, byte[] content) { + public void received(String uri, String from, byte[] content) { for (OnMessageWebSocketCallback listener : listeners) { if (listener.matches(uri)) { try { - listener.invoke(uri, content, engine); + listener.invoke(uri, from, content, engine); } catch (InvocationTargetException e) { //NOSONAR LOGGER.error("An error occurred in the @OnMessage callback {}#{} : {}", listener.getController().getClass().getName(), listener.getMethod().getName @@ -141,11 +141,11 @@ public void received(String uri, byte[] content) { } @Override - public void opened(String uri) { + public void opened(String uri, String client) { for (DefaultWebSocketCallback open : opens) { if (open.matches(uri)) { try { - open.invoke(uri); + open.invoke(uri, client); } catch (InvocationTargetException e) { //NOSONAR LOGGER.error("An error occurred in the @Open callback {}#{} : {}", open.getController().getClass().getName(), open.getMethod().getName @@ -159,11 +159,11 @@ public void opened(String uri) { } @Override - public void closed(String uri) { + public void closed(String uri, String client) { for (DefaultWebSocketCallback close : closes) { if (close.matches(uri)) { try { - close.invoke(uri); + close.invoke(uri, client); } catch (InvocationTargetException e) { //NOSONAR LOGGER.error("An error occurred in the @Close callback {}#{} : {}", close.getController().getClass().getName(), close.getMethod().getName @@ -185,4 +185,14 @@ public void publish(String uri, String message) { public void publish(String uri, byte[] message) { dispatcher.publish(uri, message); } + + @Override + public void send(String uri, String client, String message) { + dispatcher.send(uri, client, message); + } + + @Override + public void send(String uri, String client, byte[] message) { + dispatcher.send(uri, client, message); + } } diff --git a/wisdom-api/src/main/java/org/wisdom/api/annotations/Closed.java b/wisdom-api/src/main/java/org/wisdom/api/annotations/Closed.java index b43c6b9ed..408020ccc 100644 --- a/wisdom-api/src/main/java/org/wisdom/api/annotations/Closed.java +++ b/wisdom-api/src/main/java/org/wisdom/api/annotations/Closed.java @@ -8,6 +8,8 @@ /** * Specifies a callback notified when a web socket is closed. The web socket is already closed when the callback is * called. + * Like other web socket callback a special parameter is provided to retrieve the client's id. To retrieve it, use the + * parameter named 'client': @Parameter("client") String client. */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) diff --git a/wisdom-api/src/main/java/org/wisdom/api/annotations/OnMessage.java b/wisdom-api/src/main/java/org/wisdom/api/annotations/OnMessage.java index b6b76993a..4392f880e 100644 --- a/wisdom-api/src/main/java/org/wisdom/api/annotations/OnMessage.java +++ b/wisdom-api/src/main/java/org/wisdom/api/annotations/OnMessage.java @@ -7,6 +7,8 @@ /** * Mark a method receiving a web socket event. + * Like other web socket callback a special parameter is provided to retrieve the client's id. To retrieve it, use the + * parameter named 'client': @Parameter("client") String client. */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) diff --git a/wisdom-api/src/main/java/org/wisdom/api/annotations/Opened.java b/wisdom-api/src/main/java/org/wisdom/api/annotations/Opened.java index 9e7e240d3..5899ecc17 100644 --- a/wisdom-api/src/main/java/org/wisdom/api/annotations/Opened.java +++ b/wisdom-api/src/main/java/org/wisdom/api/annotations/Opened.java @@ -7,6 +7,8 @@ /** * Specifies a callback notified when a web socket is opened. + * Like other web socket callback a special parameter is provided to retrieve the client's id. To retrieve it, use the + * parameter named 'client': @Parameter("client") String client. */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) diff --git a/wisdom-api/src/main/java/org/wisdom/api/http/websockets/Publisher.java b/wisdom-api/src/main/java/org/wisdom/api/http/websockets/Publisher.java index f76cdb3c4..07d105238 100644 --- a/wisdom-api/src/main/java/org/wisdom/api/http/websockets/Publisher.java +++ b/wisdom-api/src/main/java/org/wisdom/api/http/websockets/Publisher.java @@ -5,10 +5,42 @@ */ public interface Publisher { + /** + * Publishes a message to all clients connected to a websocket. + * + * @param uri the websocket's url + * @param message the message + */ public void publish(String uri, String message); + /** + * Publishes data to all clients connected to a websocket. + * + * @param uri the websocket's url + * @param message the data + */ public void publish(String uri, byte[] message); + /** + * Sends a message to a specific client connected to a websocket. + * The method does nothing if their is no client matching the id connected on the websocket. + * + * @param uri the websocket's url + * @param client the client id, received in the 'opened' callback + * @param message the message + */ + public void send(String uri, String client, String message); + + /** + * Sends data to a specific client connected to a websocket. + * The method does nothing if their is no client matching the id connected on the websocket. + * + * @param uri the websocket's url + * @param client the client id, received in the 'opened' callback + * @param message the data + */ + public void send(String uri, String client, byte[] message); + //TODO Extend with other types such as json.... // publish(message(o).as(Json).on(uri)) } diff --git a/wisdom-api/src/main/java/org/wisdom/api/http/websockets/WebSocketDispatcher.java b/wisdom-api/src/main/java/org/wisdom/api/http/websockets/WebSocketDispatcher.java index 0327fe5f1..784260a38 100644 --- a/wisdom-api/src/main/java/org/wisdom/api/http/websockets/WebSocketDispatcher.java +++ b/wisdom-api/src/main/java/org/wisdom/api/http/websockets/WebSocketDispatcher.java @@ -11,4 +11,6 @@ public interface WebSocketDispatcher { public void register(WebSocketListener listener); public void unregister(WebSocketListener listener); + public void send(String uri, String client, String message); + public void send(String uri, String client, byte[] message); } diff --git a/wisdom-api/src/main/java/org/wisdom/api/http/websockets/WebSocketListener.java b/wisdom-api/src/main/java/org/wisdom/api/http/websockets/WebSocketListener.java index 5df873014..a35bbde26 100644 --- a/wisdom-api/src/main/java/org/wisdom/api/http/websockets/WebSocketListener.java +++ b/wisdom-api/src/main/java/org/wisdom/api/http/websockets/WebSocketListener.java @@ -6,10 +6,29 @@ */ public interface WebSocketListener { - public void received(String uri, byte[] content); + /** + * Callback invoked when data is received on the web socket identified by its url. + * + * @param uri the url of the web socket + * @param client the client id + * @param content the received content + */ + public void received(String uri, String client, byte[] content); - public void opened(String uri); + /** + * Callback invoked when a new client connects on a web socket identified by its url. + * + * @param uri the url of the web socket + * @param client the client id + */ + public void opened(String uri, String client); - public void closed(String uri); + /** + * Callback invoked when a new client closes the connection to a web socket identified by its url. + * + * @param uri the url of the web socket + * @param client the client id + */ + public void closed(String uri, String client); } diff --git a/wisdom-engine/src/main/java/org/wisdom/engine/server/Dispatcher.java b/wisdom-engine/src/main/java/org/wisdom/engine/server/Dispatcher.java index f1fb2fd8c..536408f02 100644 --- a/wisdom-engine/src/main/java/org/wisdom/engine/server/Dispatcher.java +++ b/wisdom-engine/src/main/java/org/wisdom/engine/server/Dispatcher.java @@ -138,7 +138,7 @@ public synchronized void publish(String url, byte[] data) { } public void addWebSocket(String url, ChannelHandlerContext ctx) { - LOGGER.info("Adding web socket on {} bound to {}", url, ctx); + LOGGER.info("Adding web socket on {} bound to {}, {}", url, ctx, ctx.channel()); List webSocketListeners; synchronized (this) { List channels = sockets.get(url); @@ -151,7 +151,7 @@ public void addWebSocket(String url, ChannelHandlerContext ctx) { } for (WebSocketListener listener : webSocketListeners) { - listener.opened(url); + listener.opened(url, Integer.toOctalString(ctx.channel().hashCode())); } } @@ -171,7 +171,7 @@ public void removeWebSocket(String url, ChannelHandlerContext ctx) { } for (WebSocketListener listener : webSocketListeners) { - listener.closed(url); + listener.closed(url, id(ctx)); } } @@ -189,14 +189,56 @@ public void unregister(WebSocketListener listener) { } } - public void received(String uri, byte[] content) { + @Override + public void send(String uri, String client, String message) { + List channels; + synchronized (this) { + List ch = sockets.get(uri); + if (ch != null) { + channels = new ArrayList<>(ch); + } else { + channels = Collections.emptyList(); + } + } + for (ChannelHandlerContext channel : channels) { + if (client.equals(id(channel))) { + channel.writeAndFlush(new TextWebSocketFrame(message)); + } else { + System.out.println("Mismatch " + client + " - " + id(channel)); + } + } + } + + private String id(ChannelHandlerContext ctx) { + return Integer.toOctalString(ctx.channel().hashCode()); + } + + @Override + public void send(String uri, String client, byte[] message) { + List channels; + synchronized (this) { + List ch = sockets.get(uri); + if (ch != null) { + channels = new ArrayList<>(ch); + } else { + channels = Collections.emptyList(); + } + } + for (ChannelHandlerContext channel : channels) { + if (client.equals(id(channel))) { + channel.writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(message))); + } + } + } + + public void received(String uri, byte[] content, ChannelHandlerContext ctx) { List localListeners; synchronized (this) { localListeners = new ArrayList<>(this.listeners); } for (WebSocketListener listener : localListeners) { - listener.received(uri, content); + listener.received(uri, id(ctx), content); } } diff --git a/wisdom-engine/src/main/java/org/wisdom/engine/server/WisdomHandler.java b/wisdom-engine/src/main/java/org/wisdom/engine/server/WisdomHandler.java index 14528f68c..91a597968 100644 --- a/wisdom-engine/src/main/java/org/wisdom/engine/server/WisdomHandler.java +++ b/wisdom-engine/src/main/java/org/wisdom/engine/server/WisdomHandler.java @@ -89,9 +89,10 @@ private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame fram } if (frame instanceof TextWebSocketFrame) { - accessor.getDispatcher().received(strip(handshaker.uri()), ((TextWebSocketFrame) frame).text().getBytes()); + accessor.getDispatcher().received(strip(handshaker.uri()), ((TextWebSocketFrame) frame).text() + .getBytes(), ctx); } else if (frame instanceof BinaryWebSocketFrame) { - accessor.getDispatcher().received(strip(handshaker.uri()), frame.content().array()); + accessor.getDispatcher().received(strip(handshaker.uri()), frame.content().array(), ctx); } }