diff --git a/README.md b/README.md index 55af9b68..944a5e76 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ + Netty-socketio Overview === This project is an open-source Java implementation of [Socket.IO](http://socket.io/) server. Based on [Netty](http://netty.io/) server framework. diff --git a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java index 63b34560..65b3c013 100644 --- a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java +++ b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java @@ -15,123 +15,24 @@ */ package com.corundumstudio.socketio; -import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import com.corundumstudio.socketio.misc.IterableCollection; -import com.corundumstudio.socketio.namespace.Namespace; import com.corundumstudio.socketio.protocol.Packet; -import com.corundumstudio.socketio.protocol.PacketType; -import com.corundumstudio.socketio.store.StoreFactory; -import com.corundumstudio.socketio.store.pubsub.DispatchMessage; -import com.corundumstudio.socketio.store.pubsub.PubSubType; /** - * Fully thread-safe. + * broadcast interface * */ -public class BroadcastOperations implements ClientOperations { - - private final Iterable clients; - private final StoreFactory storeFactory; - - public BroadcastOperations(Iterable clients, StoreFactory storeFactory) { - super(); - this.clients = clients; - this.storeFactory = storeFactory; - } - - private void dispatch(Packet packet) { - Map> namespaceRooms = new HashMap>(); - for (SocketIOClient socketIOClient : clients) { - Namespace namespace = (Namespace)socketIOClient.getNamespace(); - Set rooms = namespace.getRooms(socketIOClient); - - Set roomsList = namespaceRooms.get(namespace.getName()); - if (roomsList == null) { - roomsList = new HashSet(); - namespaceRooms.put(namespace.getName(), roomsList); - } - roomsList.addAll(rooms); - } - for (Entry> entry : namespaceRooms.entrySet()) { - for (String room : entry.getValue()) { - storeFactory.pubSubStore().publish(PubSubType.DISPATCH, new DispatchMessage(room, packet, entry.getKey())); - } - } - } - - public Collection getClients() { - return new IterableCollection(clients); - } - - @Override - public void send(Packet packet) { - for (SocketIOClient client : clients) { - client.send(packet); - } - dispatch(packet); - } - - public void send(Packet packet, BroadcastAckCallback ackCallback) { - for (SocketIOClient client : clients) { - client.send(packet, ackCallback.createClientCallback(client)); - } - ackCallback.loopFinished(); - } - - @Override - public void disconnect() { - for (SocketIOClient client : clients) { - client.disconnect(); - } - } - - public void sendEvent(String name, SocketIOClient excludedClient, Object... data) { - Packet packet = new Packet(PacketType.MESSAGE); - packet.setSubType(PacketType.EVENT); - packet.setName(name); - packet.setData(Arrays.asList(data)); - - for (SocketIOClient client : clients) { - if (client.getSessionId().equals(excludedClient.getSessionId())) { - continue; - } - client.send(packet); - } - dispatch(packet); - } - - @Override - public void sendEvent(String name, Object... data) { - Packet packet = new Packet(PacketType.MESSAGE); - packet.setSubType(PacketType.EVENT); - packet.setName(name); - packet.setData(Arrays.asList(data)); - send(packet); - } - - public void sendEvent(String name, Object data, BroadcastAckCallback ackCallback) { - for (SocketIOClient client : clients) { - client.sendEvent(name, ackCallback.createClientCallback(client), data); - } - ackCallback.loopFinished(); - } - - public void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback ackCallback) { - for (SocketIOClient client : clients) { - if (client.getSessionId().equals(excludedClient.getSessionId())) { - continue; - } - client.sendEvent(name, ackCallback.createClientCallback(client), data); - } - ackCallback.loopFinished(); - } - - +public interface BroadcastOperations extends ClientOperations { + + public Collection getClients(); + + public void send(Packet packet, BroadcastAckCallback ackCallback); + + public void sendEvent(String name, SocketIOClient excludedClient, Object... data); + + public void sendEvent(String name, Object data, BroadcastAckCallback ackCallback); + + public void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback ackCallback); + } diff --git a/src/main/java/com/corundumstudio/socketio/MultiRoomBroadcastOperations.java b/src/main/java/com/corundumstudio/socketio/MultiRoomBroadcastOperations.java new file mode 100644 index 00000000..46ed6693 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/MultiRoomBroadcastOperations.java @@ -0,0 +1,118 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import com.corundumstudio.socketio.protocol.Packet; + +/** + * Fully thread-safe. + * + */ +public class MultiRoomBroadcastOperations implements BroadcastOperations { + + private Collection broadcastOperations; + + public MultiRoomBroadcastOperations( Collection broadcastOperations ) { + this.broadcastOperations = broadcastOperations; + } + + public Collection getClients() { + Set clients = new HashSet(); + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return clients; + } + for( BroadcastOperations b : this.broadcastOperations ) { + clients.addAll( b.getClients() ); + } + return clients; + } + + @Override + public void send(Packet packet) { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.send( packet ); + } + } + + @Override + public void send(Packet packet, BroadcastAckCallback ackCallback) { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.send( packet, ackCallback ); + } + } + + @Override + public void disconnect() { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.disconnect(); + } + } + + @Override + public void sendEvent(String name, SocketIOClient excludedClient, Object... data) { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.sendEvent( name, excludedClient, data ); + } + } + + @Override + public void sendEvent(String name, Object... data) { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.sendEvent( name, data ); + } + } + + @Override + public void sendEvent(String name, Object data, BroadcastAckCallback ackCallback) { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.sendEvent( name, data, ackCallback ); + } + } + + @Override + public void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback ackCallback) { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.sendEvent( name, data, excludedClient, ackCallback ); + } + } + + +} diff --git a/src/main/java/com/corundumstudio/socketio/SingleRoomBroadcastOperations.java b/src/main/java/com/corundumstudio/socketio/SingleRoomBroadcastOperations.java new file mode 100644 index 00000000..09be9b19 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/SingleRoomBroadcastOperations.java @@ -0,0 +1,143 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio; + +import java.util.Arrays; +import java.util.Collection; + +import com.corundumstudio.socketio.misc.IterableCollection; +import com.corundumstudio.socketio.protocol.Packet; +import com.corundumstudio.socketio.protocol.PacketType; +import com.corundumstudio.socketio.store.StoreFactory; +import com.corundumstudio.socketio.store.pubsub.DispatchMessage; +import com.corundumstudio.socketio.store.pubsub.PubSubType; + +/** + * Fully thread-safe. + * + */ +public class SingleRoomBroadcastOperations implements BroadcastOperations { + + private final String namespace; + private final String room; + private final Iterable clients; + private final StoreFactory storeFactory; + + public SingleRoomBroadcastOperations(String namespace, String room, Iterable clients, StoreFactory storeFactory) { + super(); + this.namespace = namespace; + this.room = room; + this.clients = clients; + this.storeFactory = storeFactory; + } + + private void dispatch(Packet packet) { + this.storeFactory.pubSubStore().publish( + PubSubType.DISPATCH, + new DispatchMessage(this.room, packet, this.namespace)); +// Map> namespaceRooms = new HashMap>(); +// for (SocketIOClient socketIOClient : clients) { +// Namespace namespace = (Namespace)socketIOClient.getNamespace(); +// Set rooms = namespace.getRooms(socketIOClient); +// +// Set roomsList = namespaceRooms.get(namespace.getName()); +// if (roomsList == null) { +// roomsList = new HashSet(); +// namespaceRooms.put(namespace.getName(), roomsList); +// } +// roomsList.addAll(rooms); +// } +// for (Entry> entry : namespaceRooms.entrySet()) { +// for (String room : entry.getValue()) { +// storeFactory.pubSubStore().publish(PubSubType.DISPATCH, new DispatchMessage(room, packet, entry.getKey())); +// } +// } + } + + @Override + public Collection getClients() { + return new IterableCollection(clients); + } + + @Override + public void send(Packet packet) { + for (SocketIOClient client : clients) { + client.send(packet); + } + dispatch(packet); + } + + @Override + public void send(Packet packet, BroadcastAckCallback ackCallback) { + for (SocketIOClient client : clients) { + client.send(packet, ackCallback.createClientCallback(client)); + } + ackCallback.loopFinished(); + } + + @Override + public void disconnect() { + for (SocketIOClient client : clients) { + client.disconnect(); + } + } + + @Override + public void sendEvent(String name, SocketIOClient excludedClient, Object... data) { + Packet packet = new Packet(PacketType.MESSAGE); + packet.setSubType(PacketType.EVENT); + packet.setName(name); + packet.setData(Arrays.asList(data)); + + for (SocketIOClient client : clients) { + if (client.getSessionId().equals(excludedClient.getSessionId())) { + continue; + } + client.send(packet); + } + dispatch(packet); + } + + @Override + public void sendEvent(String name, Object... data) { + Packet packet = new Packet(PacketType.MESSAGE); + packet.setSubType(PacketType.EVENT); + packet.setName(name); + packet.setData(Arrays.asList(data)); + send(packet); + } + + @Override + public void sendEvent(String name, Object data, BroadcastAckCallback ackCallback) { + for (SocketIOClient client : clients) { + client.sendEvent(name, ackCallback.createClientCallback(client), data); + } + ackCallback.loopFinished(); + } + + @Override + public void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback ackCallback) { + for (SocketIOClient client : clients) { + if (client.getSessionId().equals(excludedClient.getSessionId())) { + continue; + } + client.sendEvent(name, ackCallback.createClientCallback(client), data); + } + ackCallback.loopFinished(); + } + + +} diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java index 5419d2a6..44ba88e9 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java @@ -15,7 +15,24 @@ */ package com.corundumstudio.socketio; -import com.corundumstudio.socketio.listener.*; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.corundumstudio.socketio.listener.ClientListeners; +import com.corundumstudio.socketio.listener.ConnectListener; +import com.corundumstudio.socketio.listener.DataListener; +import com.corundumstudio.socketio.listener.DisconnectListener; +import com.corundumstudio.socketio.listener.MultiTypeEventListener; +import com.corundumstudio.socketio.listener.PingListener; +import com.corundumstudio.socketio.namespace.Namespace; +import com.corundumstudio.socketio.namespace.NamespacesHub; + import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; @@ -28,16 +45,6 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.corundumstudio.socketio.namespace.Namespace; -import com.corundumstudio.socketio.namespace.NamespacesHub; - /** * Fully thread-safe. * @@ -97,7 +104,16 @@ public Collection getAllNamespaces() { } public BroadcastOperations getBroadcastOperations() { - return new BroadcastOperations(getAllClients(), configCopy.getStoreFactory()); + Collection namespaces = namespacesHub.getAllNamespaces(); + List list = new ArrayList(); + BroadcastOperations broadcast = null; + if( namespaces != null && namespaces.size() > 0 ) { + for( SocketIONamespace n : namespaces ) { + broadcast = n.getBroadcastOperations(); + list.add( broadcast ); + } + } + return new MultiRoomBroadcastOperations( list ); } /** @@ -108,8 +124,16 @@ public BroadcastOperations getBroadcastOperations() { * @return broadcast operations */ public BroadcastOperations getRoomOperations(String room) { - Iterable clients = namespacesHub.getRoomClients(room); - return new BroadcastOperations(clients, configCopy.getStoreFactory()); + Collection namespaces = namespacesHub.getAllNamespaces(); + List list = new ArrayList(); + BroadcastOperations broadcast = null; + if( namespaces != null && namespaces.size() > 0 ) { + for( SocketIONamespace n : namespaces ) { + broadcast = n.getRoomOperations( room ); + list.add( broadcast ); + } + } + return new MultiRoomBroadcastOperations( list ); } /** diff --git a/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java b/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java index cbbc227d..b056d1a0 100644 --- a/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java +++ b/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java @@ -31,10 +31,16 @@ import com.corundumstudio.socketio.BroadcastOperations; import com.corundumstudio.socketio.Configuration; import com.corundumstudio.socketio.MultiTypeArgs; +import com.corundumstudio.socketio.SingleRoomBroadcastOperations; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.SocketIONamespace; import com.corundumstudio.socketio.annotation.ScannerEngine; -import com.corundumstudio.socketio.listener.*; +import com.corundumstudio.socketio.listener.ConnectListener; +import com.corundumstudio.socketio.listener.DataListener; +import com.corundumstudio.socketio.listener.DisconnectListener; +import com.corundumstudio.socketio.listener.ExceptionListener; +import com.corundumstudio.socketio.listener.MultiTypeEventListener; +import com.corundumstudio.socketio.listener.PingListener; import com.corundumstudio.socketio.protocol.JsonSupport; import com.corundumstudio.socketio.protocol.Packet; import com.corundumstudio.socketio.store.StoreFactory; @@ -229,12 +235,12 @@ public void onPing(SocketIOClient client) { @Override public BroadcastOperations getBroadcastOperations() { - return new BroadcastOperations(allClients.values(), storeFactory); + return new SingleRoomBroadcastOperations(getName(), getName(), allClients.values(), storeFactory); } @Override public BroadcastOperations getRoomOperations(String room) { - return new BroadcastOperations(getRoomClients(room), storeFactory); + return new SingleRoomBroadcastOperations(getName(), room, getRoomClients(room), storeFactory); } @Override @@ -320,7 +326,7 @@ private void leave(ConcurrentMap> map, K room, V sessionId) { clients.remove(sessionId); if (clients.isEmpty()) { - map.remove(room, Collections.emptySet()); + map.remove(room, clients); } }