Skip to content

Commit

Permalink
Fix #78
Browse files Browse the repository at this point in the history
Provide point to point communication

Signed-off-by: Clement Escoffier <clement.escoffier@gmail.com>
  • Loading branch information
cescoffier committed Jan 30, 2014
1 parent 3ac93c8 commit 2768515
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 23 deletions.
Expand Up @@ -103,12 +103,16 @@ public Map<String, String> getPathParametersEncoded(String uri) {
return map;
}

public void invoke(String uri) throws InvocationTargetException, IllegalAccessException {
public void invoke(String uri, String client) throws InvocationTargetException, IllegalAccessException {
Map<String, String> values = getPathParametersEncoded(uri);
Object[] parameters = new Object[arguments.size()];
for (int i = 0; i < arguments.size(); i++) {
RouteUtils.Argument argument = arguments.get(i);
parameters[i] = RouteUtils.getParameter(argument, values);
if (argument.getName().equals("client") && argument.getType().equals(String.class)) {
parameters[i] = client;
} else {
parameters[i] = RouteUtils.getParameter(argument, values);
}
}
method.invoke(controller, parameters);
}
Expand Down
Expand Up @@ -57,14 +57,19 @@ public List<RouteUtils.Argument> buildArguments(Method method) {
return arguments;
}

public void invoke(String uri, byte[] content, ContentEngine engine) throws InvocationTargetException,
public void invoke(String uri, String client, byte[] content, ContentEngine engine) throws
InvocationTargetException,
IllegalAccessException {
Map<String, String> values = getPathParametersEncoded(uri);
Object[] parameters = new Object[arguments.size()];
for (int i = 0; i < arguments.size(); i++) {
RouteUtils.Argument argument = arguments.get(i);
if (argument.getSource() == RouteUtils.Source.PARAMETER) {
parameters[i] = RouteUtils.getParameter(argument, values);
if (argument.getName().equals("client") && argument.getType().equals(String.class)) {
parameters[i] = client;
} else {
parameters[i] = RouteUtils.getParameter(argument, values);
}
} else {
// Body
parameters[i] = transform(argument.getType(), content, engine);
Expand Down
28 changes: 19 additions & 9 deletions router/src/main/java/org/wisdom/router/WebSocketRouter.java
@@ -1,6 +1,8 @@
package org.wisdom.router;

import org.apache.felix.ipojo.annotations.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wisdom.api.Controller;
import org.wisdom.api.annotations.Closed;
import org.wisdom.api.annotations.OnMessage;
Expand All @@ -10,8 +12,6 @@
import org.wisdom.api.http.websockets.WebSocketDispatcher;
import org.wisdom.api.http.websockets.WebSocketListener;
import org.wisdom.api.router.RouteUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
Expand All @@ -34,7 +34,7 @@ public class WebSocketRouter implements WebSocketListener, Publisher {
private List<DefaultWebSocketCallback> closes = new ArrayList<>();
private List<OnMessageWebSocketCallback> listeners = new ArrayList<>();

@Requires(optional=true)
@Requires(optional = true)
private ContentEngine engine;


Expand Down Expand Up @@ -123,11 +123,11 @@ public synchronized void unbindController(Controller controller) {
}

@Override
public void received(String uri, byte[] content) {
public void received(String uri, String from, byte[] content) {
for (OnMessageWebSocketCallback listener : listeners) {
if (listener.matches(uri)) {
try {
listener.invoke(uri, content, engine);
listener.invoke(uri, from, content, engine);
} catch (InvocationTargetException e) { //NOSONAR
LOGGER.error("An error occurred in the @OnMessage callback {}#{} : {}",
listener.getController().getClass().getName(), listener.getMethod().getName
Expand All @@ -141,11 +141,11 @@ public void received(String uri, byte[] content) {
}

@Override
public void opened(String uri) {
public void opened(String uri, String client) {
for (DefaultWebSocketCallback open : opens) {
if (open.matches(uri)) {
try {
open.invoke(uri);
open.invoke(uri, client);
} catch (InvocationTargetException e) { //NOSONAR
LOGGER.error("An error occurred in the @Open callback {}#{} : {}",
open.getController().getClass().getName(), open.getMethod().getName
Expand All @@ -159,11 +159,11 @@ public void opened(String uri) {
}

@Override
public void closed(String uri) {
public void closed(String uri, String client) {
for (DefaultWebSocketCallback close : closes) {
if (close.matches(uri)) {
try {
close.invoke(uri);
close.invoke(uri, client);
} catch (InvocationTargetException e) { //NOSONAR
LOGGER.error("An error occurred in the @Close callback {}#{} : {}",
close.getController().getClass().getName(), close.getMethod().getName
Expand All @@ -185,4 +185,14 @@ public void publish(String uri, String message) {
public void publish(String uri, byte[] message) {
dispatcher.publish(uri, message);
}

@Override
public void send(String uri, String client, String message) {
dispatcher.send(uri, client, message);
}

@Override
public void send(String uri, String client, byte[] message) {
dispatcher.send(uri, client, message);
}
}
Expand Up @@ -8,6 +8,8 @@
/**
* Specifies a callback notified when a web socket is closed. The web socket is already closed when the callback is
* called.
* Like other web socket callback a special parameter is provided to retrieve the client's id. To retrieve it, use the
* parameter named 'client': <code>@Parameter("client") String client</code>.
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Expand Up @@ -7,6 +7,8 @@

/**
* Mark a method receiving a web socket event.
* Like other web socket callback a special parameter is provided to retrieve the client's id. To retrieve it, use the
* parameter named 'client': <code>@Parameter("client") String client</code>.
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Expand Up @@ -7,6 +7,8 @@

/**
* Specifies a callback notified when a web socket is opened.
* Like other web socket callback a special parameter is provided to retrieve the client's id. To retrieve it, use the
* parameter named 'client': <code>@Parameter("client") String client</code>.
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Expand Up @@ -5,10 +5,42 @@
*/
public interface Publisher {

/**
* Publishes a message to all clients connected to a websocket.
*
* @param uri the websocket's url
* @param message the message
*/
public void publish(String uri, String message);

/**
* Publishes data to all clients connected to a websocket.
*
* @param uri the websocket's url
* @param message the data
*/
public void publish(String uri, byte[] message);

/**
* Sends a message to a specific client connected to a websocket.
* The method does nothing if their is no client matching the id connected on the websocket.
*
* @param uri the websocket's url
* @param client the client id, received in the 'opened' callback
* @param message the message
*/
public void send(String uri, String client, String message);

/**
* Sends data to a specific client connected to a websocket.
* The method does nothing if their is no client matching the id connected on the websocket.
*
* @param uri the websocket's url
* @param client the client id, received in the 'opened' callback
* @param message the data
*/
public void send(String uri, String client, byte[] message);

//TODO Extend with other types such as json....
// publish(message(o).as(Json).on(uri))
}
Expand Up @@ -11,4 +11,6 @@ public interface WebSocketDispatcher {
public void register(WebSocketListener listener);
public void unregister(WebSocketListener listener);

public void send(String uri, String client, String message);
public void send(String uri, String client, byte[] message);
}
Expand Up @@ -6,10 +6,29 @@
*/
public interface WebSocketListener {

public void received(String uri, byte[] content);
/**
* Callback invoked when data is received on the web socket identified by its url.
*
* @param uri the url of the web socket
* @param client the client id
* @param content the received content
*/
public void received(String uri, String client, byte[] content);

public void opened(String uri);
/**
* Callback invoked when a new client connects on a web socket identified by its url.
*
* @param uri the url of the web socket
* @param client the client id
*/
public void opened(String uri, String client);

public void closed(String uri);
/**
* Callback invoked when a new client closes the connection to a web socket identified by its url.
*
* @param uri the url of the web socket
* @param client the client id
*/
public void closed(String uri, String client);

}
Expand Up @@ -138,7 +138,7 @@ public synchronized void publish(String url, byte[] data) {
}

public void addWebSocket(String url, ChannelHandlerContext ctx) {
LOGGER.info("Adding web socket on {} bound to {}", url, ctx);
LOGGER.info("Adding web socket on {} bound to {}, {}", url, ctx, ctx.channel());
List<WebSocketListener> webSocketListeners;
synchronized (this) {
List<ChannelHandlerContext> channels = sockets.get(url);
Expand All @@ -151,7 +151,7 @@ public void addWebSocket(String url, ChannelHandlerContext ctx) {
}

for (WebSocketListener listener : webSocketListeners) {
listener.opened(url);
listener.opened(url, Integer.toOctalString(ctx.channel().hashCode()));
}

}
Expand All @@ -171,7 +171,7 @@ public void removeWebSocket(String url, ChannelHandlerContext ctx) {
}

for (WebSocketListener listener : webSocketListeners) {
listener.closed(url);
listener.closed(url, id(ctx));
}
}

Expand All @@ -189,14 +189,56 @@ public void unregister(WebSocketListener listener) {
}
}

public void received(String uri, byte[] content) {
@Override
public void send(String uri, String client, String message) {
List<ChannelHandlerContext> channels;
synchronized (this) {
List<ChannelHandlerContext> ch = sockets.get(uri);
if (ch != null) {
channels = new ArrayList<>(ch);
} else {
channels = Collections.emptyList();
}
}
for (ChannelHandlerContext channel : channels) {
if (client.equals(id(channel))) {
channel.writeAndFlush(new TextWebSocketFrame(message));
} else {
System.out.println("Mismatch " + client + " - " + id(channel));
}
}
}

private String id(ChannelHandlerContext ctx) {
return Integer.toOctalString(ctx.channel().hashCode());
}

@Override
public void send(String uri, String client, byte[] message) {
List<ChannelHandlerContext> channels;
synchronized (this) {
List<ChannelHandlerContext> ch = sockets.get(uri);
if (ch != null) {
channels = new ArrayList<>(ch);
} else {
channels = Collections.emptyList();
}
}
for (ChannelHandlerContext channel : channels) {
if (client.equals(id(channel))) {
channel.writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(message)));
}
}
}

public void received(String uri, byte[] content, ChannelHandlerContext ctx) {
List<WebSocketListener> localListeners;
synchronized (this) {
localListeners = new ArrayList<>(this.listeners);
}

for (WebSocketListener listener : localListeners) {
listener.received(uri, content);
listener.received(uri, id(ctx), content);
}
}

Expand Down
Expand Up @@ -89,9 +89,10 @@ private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame fram
}

if (frame instanceof TextWebSocketFrame) {
accessor.getDispatcher().received(strip(handshaker.uri()), ((TextWebSocketFrame) frame).text().getBytes());
accessor.getDispatcher().received(strip(handshaker.uri()), ((TextWebSocketFrame) frame).text()
.getBytes(), ctx);
} else if (frame instanceof BinaryWebSocketFrame) {
accessor.getDispatcher().received(strip(handshaker.uri()), frame.content().array());
accessor.getDispatcher().received(strip(handshaker.uri()), frame.content().array(), ctx);
}
}

Expand Down

0 comments on commit 2768515

Please sign in to comment.