Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSubStore interface should use String parameter type to Publish/Subscribe custom Topic #357

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ Recent Releases
================================
#### Please Note: trunk is current development branch.

#### 4-Mar-2015 - version 1.7.10 released
#### 4-Mar-2016 - version 1.7.10 released
Fixed - netty updated to 4.1.0.CR3 version
Fixed - binary packet parsing (thanks to Winston Li)

#### 6-Feb-2015 - version 1.7.9 released
#### 6-Feb-2016 - version 1.7.9 released
Feature - Compression support
Fixed - DotNET client request handling
Fixed - Packet length format parsing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public BroadcastOperations(Iterable<SocketIOClient> clients, StoreFactory storeF
private void dispatch(Packet packet) {
for (Entry<String, List<String>> entry : namespaceRooms.entrySet()) {
for (String room : entry.getValue()) {
storeFactory.pubSubStore().publish(PubSubType.DISPATCH, new DispatchMessage(room, packet, entry.getKey()));
storeFactory.pubSubStore().publish(PubSubType.DISPATCH.toString(), new DispatchMessage(room, packet, entry.getKey()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void onDisconnect(ClientHead client) {
authorizeHandler.onDisconnect(client);
configuration.getStoreFactory().onDisconnect(client);

configuration.getStoreFactory().pubSubStore().publish(PubSubType.DISCONNECT, new DisconnectMessage(client.getSessionId()));
configuration.getStoreFactory().pubSubStore().publish(PubSubType.DISCONNECT.toString(), new DisconnectMessage(client.getSessionId()));

log.debug("Client with sessionId: {} disconnected", client.getSessionId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public void connect(ClientHead client) {
packet.setSubType(PacketType.CONNECT);
client.send(packet);

configuration.getStoreFactory().pubSubStore().publish(PubSubType.CONNECT, new ConnectMessage(client.getSessionId()));
configuration.getStoreFactory().pubSubStore().publish(PubSubType.CONNECT.toString(), new ConnectMessage(client.getSessionId()));

SocketIOClient nsClient = client.addNamespaceClient(ns);
ns.onConnect(nsClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void onDisconnect(SocketIOClient client) {
allClients.remove(client.getSessionId());

leave(getName(), client.getSessionId());
storeFactory.pubSubStore().publish(PubSubType.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
storeFactory.pubSubStore().publish(PubSubType.LEAVE.toString(), new JoinLeaveMessage(client.getSessionId(), getName(), getName()));

try {
for (DisconnectListener listener : disconnectListeners) {
Expand All @@ -191,7 +191,7 @@ public void addConnectListener(ConnectListener listener) {

public void onConnect(SocketIOClient client) {
join(getName(), client.getSessionId());
storeFactory.pubSubStore().publish(PubSubType.JOIN, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
storeFactory.pubSubStore().publish(PubSubType.JOIN.toString(), new JoinLeaveMessage(client.getSessionId(), getName(), getName()));

try {
for (ConnectListener listener : connectListeners) {
Expand Down Expand Up @@ -249,7 +249,7 @@ public void addListeners(Object listeners, Class<?> listenersClass) {

public void joinRoom(String room, UUID sessionId) {
join(room, sessionId);
storeFactory.pubSubStore().publish(PubSubType.JOIN, new JoinLeaveMessage(sessionId, room, getName()));
storeFactory.pubSubStore().publish(PubSubType.JOIN.toString(), new JoinLeaveMessage(sessionId, room, getName()));
}

public void dispatch(String room, Packet packet) {
Expand Down Expand Up @@ -284,7 +284,7 @@ public void join(String room, UUID sessionId) {

public void leaveRoom(String room, UUID sessionId) {
leave(room, sessionId);
storeFactory.pubSubStore().publish(PubSubType.LEAVE, new JoinLeaveMessage(sessionId, room, getName()));
storeFactory.pubSubStore().publish(PubSubType.LEAVE.toString(), new JoinLeaveMessage(sessionId, room, getName()));
}

private <K, V> void leave(ConcurrentMap<K, Set<V>> map, K room, V sessionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,20 @@
*/
package com.corundumstudio.socketio.store;

import io.netty.util.internal.PlatformDependent;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;

import com.corundumstudio.socketio.store.pubsub.PubSubListener;
import com.corundumstudio.socketio.store.pubsub.PubSubMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;

import io.netty.util.internal.PlatformDependent;


public class HazelcastPubSubStore implements PubSubStore {

Expand All @@ -46,14 +45,13 @@ public HazelcastPubSubStore(HazelcastInstance hazelcastPub, HazelcastInstance ha
}

@Override
public void publish(PubSubType type, PubSubMessage msg) {
public void publish(String name, PubSubMessage msg) {
msg.setNodeId(nodeId);
hazelcastPub.getTopic(type.toString()).publish(msg);
hazelcastPub.getTopic(name).publish(msg);
}

@Override
public <T extends PubSubMessage> void subscribe(PubSubType type, final PubSubListener<T> listener, Class<T> clazz) {
String name = type.toString();
public <T extends PubSubMessage> void subscribe(String name, final PubSubListener<T> listener, Class<T> clazz) {
ITopic<T> topic = hazelcastSub.getTopic(name);
String regId = topic.addMessageListener(new MessageListener<T>() {
@Override
Expand All @@ -77,8 +75,7 @@ public void onMessage(Message<T> message) {
}

@Override
public void unsubscribe(PubSubType type) {
String name = type.toString();
public void unsubscribe(String name) {
Queue<String> regIds = map.remove(name);
ITopic<Object> topic = hazelcastSub.getTopic(name);
for (String id : regIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,19 @@
import com.corundumstudio.socketio.store.pubsub.PubSubListener;
import com.corundumstudio.socketio.store.pubsub.PubSubMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;

public class MemoryPubSubStore implements PubSubStore {

@Override
public void publish(PubSubType type, PubSubMessage msg) {
public void publish(String name, PubSubMessage msg) {
}

@Override
public <T extends PubSubMessage> void subscribe(PubSubType type, PubSubListener<T> listener, Class<T> clazz) {
public <T extends PubSubMessage> void subscribe(String name, PubSubListener<T> listener, Class<T> clazz) {
}

@Override
public void unsubscribe(PubSubType type) {
public void unsubscribe(String name) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.corundumstudio.socketio.store.pubsub.PubSubListener;
import com.corundumstudio.socketio.store.pubsub.PubSubMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;

import io.netty.util.internal.PlatformDependent;

Expand All @@ -45,14 +44,13 @@ public RedissonPubSubStore(RedissonClient redissonPub, RedissonClient redissonSu
}

@Override
public void publish(PubSubType type, PubSubMessage msg) {
public void publish(String name, PubSubMessage msg) {
msg.setNodeId(nodeId);
redissonPub.getTopic(type.toString()).publish(msg);
redissonPub.getTopic(name).publish(msg);
}

@Override
public <T extends PubSubMessage> void subscribe(PubSubType type, final PubSubListener<T> listener, Class<T> clazz) {
String name = type.toString();
public <T extends PubSubMessage> void subscribe(String name, final PubSubListener<T> listener, Class<T> clazz) {
RTopic<T> topic = redissonSub.getTopic(name);
int regId = topic.addListener(new MessageListener<T>() {
@Override
Expand All @@ -75,8 +73,7 @@ public void onMessage(String channel, T msg) {
}

@Override
public void unsubscribe(PubSubType type) {
String name = type.toString();
public void unsubscribe(String name) {
Queue<Integer> regIds = map.remove(name);
RTopic<Object> topic = redissonSub.getTopic(name);
for (Integer id : regIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,22 @@ protected Long getNodeId() {

@Override
public void init(final NamespacesHub namespacesHub, final AuthorizeHandler authorizeHandler, JsonSupport jsonSupport) {
pubSubStore().subscribe(PubSubType.DISCONNECT, new PubSubListener<DisconnectMessage>() {
pubSubStore().subscribe(PubSubType.DISCONNECT.toString(), new PubSubListener<DisconnectMessage>() {
@Override
public void onMessage(DisconnectMessage msg) {
log.debug("{} sessionId: {}", PubSubType.DISCONNECT, msg.getSessionId());
}
}, DisconnectMessage.class);

pubSubStore().subscribe(PubSubType.CONNECT, new PubSubListener<ConnectMessage>() {
pubSubStore().subscribe(PubSubType.CONNECT.toString(), new PubSubListener<ConnectMessage>() {
@Override
public void onMessage(ConnectMessage msg) {
authorizeHandler.connect(msg.getSessionId());
log.debug("{} sessionId: {}", PubSubType.CONNECT, msg.getSessionId());
}
}, ConnectMessage.class);

pubSubStore().subscribe(PubSubType.DISPATCH, new PubSubListener<DispatchMessage>() {
pubSubStore().subscribe(PubSubType.DISPATCH.toString(), new PubSubListener<DispatchMessage>() {
@Override
public void onMessage(DispatchMessage msg) {
String name = msg.getRoom();
Expand All @@ -61,7 +61,7 @@ public void onMessage(DispatchMessage msg) {
}
}, DispatchMessage.class);

pubSubStore().subscribe(PubSubType.JOIN, new PubSubListener<JoinLeaveMessage>() {
pubSubStore().subscribe(PubSubType.JOIN.toString(), new PubSubListener<JoinLeaveMessage>() {
@Override
public void onMessage(JoinLeaveMessage msg) {
String name = msg.getRoom();
Expand All @@ -71,7 +71,7 @@ public void onMessage(JoinLeaveMessage msg) {
}
}, JoinLeaveMessage.class);

pubSubStore().subscribe(PubSubType.LEAVE, new PubSubListener<JoinLeaveMessage>() {
pubSubStore().subscribe(PubSubType.LEAVE.toString(), new PubSubListener<JoinLeaveMessage>() {
@Override
public void onMessage(JoinLeaveMessage msg) {
String name = msg.getRoom();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

public interface PubSubStore {

void publish(PubSubType type, PubSubMessage msg);
void publish(String name, PubSubMessage msg);

<T extends PubSubMessage> void subscribe(PubSubType type, PubSubListener<T> listener, Class<T> clazz);
<T extends PubSubMessage> void subscribe(String name, PubSubListener<T> listener, Class<T> clazz);

void unsubscribe(PubSubType type);
void unsubscribe(String name);

void shutdown();

Expand Down