From 7854e010797744e7548997236c06ecc64fc61257 Mon Sep 17 00:00:00 2001 From: whg Date: Tue, 24 May 2016 04:14:05 +0800 Subject: [PATCH 1/2] PubSubStore interface should use String parameter type to Publish/Subscribe custom Topic --- .../socketio/BroadcastOperations.java | 2 +- .../socketio/SocketIOChannelInitializer.java | 2 +- .../socketio/handler/AuthorizeHandler.java | 2 +- .../socketio/namespace/Namespace.java | 8 ++++---- .../socketio/store/HazelcastPubSubStore.java | 15 ++++++--------- .../socketio/store/MemoryPubSubStore.java | 7 +++---- .../socketio/store/RedissonPubSubStore.java | 11 ++++------- .../socketio/store/pubsub/BaseStoreFactory.java | 10 +++++----- .../socketio/store/pubsub/PubSubStore.java | 6 +++--- 9 files changed, 28 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java index d1ded5da..c4f2b3c4 100644 --- a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java +++ b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java @@ -62,7 +62,7 @@ public BroadcastOperations(Iterable clients, StoreFactory storeF private void dispatch(Packet packet) { for (Entry> entry : namespaceRooms.entrySet()) { for (String room : entry.getValue()) { - storeFactory.pubSubStore().publish(PubSubType.DISPATCH, new DispatchMessage(room, packet, entry.getKey())); + storeFactory.pubSubStore().publish(PubSubType.DISPATCH.toString(), new DispatchMessage(room, packet, entry.getKey())); } } } diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java b/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java index 28417b3e..faa2d6d8 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java @@ -222,7 +222,7 @@ public void onDisconnect(ClientHead client) { authorizeHandler.onDisconnect(client); configuration.getStoreFactory().onDisconnect(client); - configuration.getStoreFactory().pubSubStore().publish(PubSubType.DISCONNECT, new DisconnectMessage(client.getSessionId())); + configuration.getStoreFactory().pubSubStore().publish(PubSubType.DISCONNECT.toString(), new DisconnectMessage(client.getSessionId())); log.debug("Client with sessionId: {} disconnected", client.getSessionId()); } diff --git a/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java b/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java index c8660fa1..6c3a3025 100644 --- a/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java +++ b/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java @@ -223,7 +223,7 @@ public void connect(ClientHead client) { packet.setSubType(PacketType.CONNECT); client.send(packet); - configuration.getStoreFactory().pubSubStore().publish(PubSubType.CONNECT, new ConnectMessage(client.getSessionId())); + configuration.getStoreFactory().pubSubStore().publish(PubSubType.CONNECT.toString(), new ConnectMessage(client.getSessionId())); SocketIOClient nsClient = client.addNamespaceClient(ns); ns.onConnect(nsClient); diff --git a/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java b/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java index 5b6d72c3..00559aeb 100644 --- a/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java +++ b/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java @@ -173,7 +173,7 @@ public void onDisconnect(SocketIOClient client) { allClients.remove(client.getSessionId()); leave(getName(), client.getSessionId()); - storeFactory.pubSubStore().publish(PubSubType.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName(), getName())); + storeFactory.pubSubStore().publish(PubSubType.LEAVE.toString(), new JoinLeaveMessage(client.getSessionId(), getName(), getName())); try { for (DisconnectListener listener : disconnectListeners) { @@ -191,7 +191,7 @@ public void addConnectListener(ConnectListener listener) { public void onConnect(SocketIOClient client) { join(getName(), client.getSessionId()); - storeFactory.pubSubStore().publish(PubSubType.JOIN, new JoinLeaveMessage(client.getSessionId(), getName(), getName())); + storeFactory.pubSubStore().publish(PubSubType.JOIN.toString(), new JoinLeaveMessage(client.getSessionId(), getName(), getName())); try { for (ConnectListener listener : connectListeners) { @@ -249,7 +249,7 @@ public void addListeners(Object listeners, Class listenersClass) { public void joinRoom(String room, UUID sessionId) { join(room, sessionId); - storeFactory.pubSubStore().publish(PubSubType.JOIN, new JoinLeaveMessage(sessionId, room, getName())); + storeFactory.pubSubStore().publish(PubSubType.JOIN.toString(), new JoinLeaveMessage(sessionId, room, getName())); } public void dispatch(String room, Packet packet) { @@ -284,7 +284,7 @@ public void join(String room, UUID sessionId) { public void leaveRoom(String room, UUID sessionId) { leave(room, sessionId); - storeFactory.pubSubStore().publish(PubSubType.LEAVE, new JoinLeaveMessage(sessionId, room, getName())); + storeFactory.pubSubStore().publish(PubSubType.LEAVE.toString(), new JoinLeaveMessage(sessionId, room, getName())); } private void leave(ConcurrentMap> map, K room, V sessionId) { diff --git a/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java index 815b9c80..78e00463 100644 --- a/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java @@ -15,8 +15,6 @@ */ package com.corundumstudio.socketio.store; -import io.netty.util.internal.PlatformDependent; - import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; @@ -24,12 +22,13 @@ import com.corundumstudio.socketio.store.pubsub.PubSubListener; import com.corundumstudio.socketio.store.pubsub.PubSubMessage; import com.corundumstudio.socketio.store.pubsub.PubSubStore; -import com.corundumstudio.socketio.store.pubsub.PubSubType; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.ITopic; import com.hazelcast.core.Message; import com.hazelcast.core.MessageListener; +import io.netty.util.internal.PlatformDependent; + public class HazelcastPubSubStore implements PubSubStore { @@ -46,14 +45,13 @@ public HazelcastPubSubStore(HazelcastInstance hazelcastPub, HazelcastInstance ha } @Override - public void publish(PubSubType type, PubSubMessage msg) { + public void publish(String name, PubSubMessage msg) { msg.setNodeId(nodeId); - hazelcastPub.getTopic(type.toString()).publish(msg); + hazelcastPub.getTopic(name).publish(msg); } @Override - public void subscribe(PubSubType type, final PubSubListener listener, Class clazz) { - String name = type.toString(); + public void subscribe(String name, final PubSubListener listener, Class clazz) { ITopic topic = hazelcastSub.getTopic(name); String regId = topic.addMessageListener(new MessageListener() { @Override @@ -77,8 +75,7 @@ public void onMessage(Message message) { } @Override - public void unsubscribe(PubSubType type) { - String name = type.toString(); + public void unsubscribe(String name) { Queue regIds = map.remove(name); ITopic topic = hazelcastSub.getTopic(name); for (String id : regIds) { diff --git a/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java index b472682b..c912eb5f 100644 --- a/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java @@ -18,20 +18,19 @@ import com.corundumstudio.socketio.store.pubsub.PubSubListener; import com.corundumstudio.socketio.store.pubsub.PubSubMessage; import com.corundumstudio.socketio.store.pubsub.PubSubStore; -import com.corundumstudio.socketio.store.pubsub.PubSubType; public class MemoryPubSubStore implements PubSubStore { @Override - public void publish(PubSubType type, PubSubMessage msg) { + public void publish(String name, PubSubMessage msg) { } @Override - public void subscribe(PubSubType type, PubSubListener listener, Class clazz) { + public void subscribe(String name, PubSubListener listener, Class clazz) { } @Override - public void unsubscribe(PubSubType type) { + public void unsubscribe(String name) { } @Override diff --git a/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java index 2facb2d7..7e5c8644 100644 --- a/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java @@ -26,7 +26,6 @@ import com.corundumstudio.socketio.store.pubsub.PubSubListener; import com.corundumstudio.socketio.store.pubsub.PubSubMessage; import com.corundumstudio.socketio.store.pubsub.PubSubStore; -import com.corundumstudio.socketio.store.pubsub.PubSubType; import io.netty.util.internal.PlatformDependent; @@ -45,14 +44,13 @@ public RedissonPubSubStore(RedissonClient redissonPub, RedissonClient redissonSu } @Override - public void publish(PubSubType type, PubSubMessage msg) { + public void publish(String name, PubSubMessage msg) { msg.setNodeId(nodeId); - redissonPub.getTopic(type.toString()).publish(msg); + redissonPub.getTopic(name).publish(msg); } @Override - public void subscribe(PubSubType type, final PubSubListener listener, Class clazz) { - String name = type.toString(); + public void subscribe(String name, final PubSubListener listener, Class clazz) { RTopic topic = redissonSub.getTopic(name); int regId = topic.addListener(new MessageListener() { @Override @@ -75,8 +73,7 @@ public void onMessage(String channel, T msg) { } @Override - public void unsubscribe(PubSubType type) { - String name = type.toString(); + public void unsubscribe(String name) { Queue regIds = map.remove(name); RTopic topic = redissonSub.getTopic(name); for (Integer id : regIds) { diff --git a/src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java b/src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java index 78c6007d..fb292df2 100644 --- a/src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java +++ b/src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java @@ -36,14 +36,14 @@ protected Long getNodeId() { @Override public void init(final NamespacesHub namespacesHub, final AuthorizeHandler authorizeHandler, JsonSupport jsonSupport) { - pubSubStore().subscribe(PubSubType.DISCONNECT, new PubSubListener() { + pubSubStore().subscribe(PubSubType.DISCONNECT.toString(), new PubSubListener() { @Override public void onMessage(DisconnectMessage msg) { log.debug("{} sessionId: {}", PubSubType.DISCONNECT, msg.getSessionId()); } }, DisconnectMessage.class); - pubSubStore().subscribe(PubSubType.CONNECT, new PubSubListener() { + pubSubStore().subscribe(PubSubType.CONNECT.toString(), new PubSubListener() { @Override public void onMessage(ConnectMessage msg) { authorizeHandler.connect(msg.getSessionId()); @@ -51,7 +51,7 @@ public void onMessage(ConnectMessage msg) { } }, ConnectMessage.class); - pubSubStore().subscribe(PubSubType.DISPATCH, new PubSubListener() { + pubSubStore().subscribe(PubSubType.DISPATCH.toString(), new PubSubListener() { @Override public void onMessage(DispatchMessage msg) { String name = msg.getRoom(); @@ -61,7 +61,7 @@ public void onMessage(DispatchMessage msg) { } }, DispatchMessage.class); - pubSubStore().subscribe(PubSubType.JOIN, new PubSubListener() { + pubSubStore().subscribe(PubSubType.JOIN.toString(), new PubSubListener() { @Override public void onMessage(JoinLeaveMessage msg) { String name = msg.getRoom(); @@ -71,7 +71,7 @@ public void onMessage(JoinLeaveMessage msg) { } }, JoinLeaveMessage.class); - pubSubStore().subscribe(PubSubType.LEAVE, new PubSubListener() { + pubSubStore().subscribe(PubSubType.LEAVE.toString(), new PubSubListener() { @Override public void onMessage(JoinLeaveMessage msg) { String name = msg.getRoom(); diff --git a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java index 4bdcf83c..d6ed4d76 100644 --- a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java @@ -18,11 +18,11 @@ public interface PubSubStore { - void publish(PubSubType type, PubSubMessage msg); + void publish(String name, PubSubMessage msg); - void subscribe(PubSubType type, PubSubListener listener, Class clazz); + void subscribe(String name, PubSubListener listener, Class clazz); - void unsubscribe(PubSubType type); + void unsubscribe(String name); void shutdown(); From 8ff8f738786ff2be76aa9dd411997d816f1281ab Mon Sep 17 00:00:00 2001 From: whg Date: Tue, 24 May 2016 04:16:22 +0800 Subject: [PATCH 2/2] wrong year --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 3b390398..b7cb117c 100644 --- a/README.md +++ b/README.md @@ -50,11 +50,11 @@ Recent Releases ================================ #### Please Note: trunk is current development branch. -#### 4-Mar-2015 - version 1.7.10 released +#### 4-Mar-2016 - version 1.7.10 released Fixed - netty updated to 4.1.0.CR3 version Fixed - binary packet parsing (thanks to Winston Li) -#### 6-Feb-2015 - version 1.7.9 released +#### 6-Feb-2016 - version 1.7.9 released Feature - Compression support Fixed - DotNET client request handling Fixed - Packet length format parsing