diff --git a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java index 06a4b05a..86ca4282 100644 --- a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java +++ b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import com.corundumstudio.socketio.misc.IterableCollection; import com.corundumstudio.socketio.namespace.Namespace; @@ -71,11 +72,13 @@ public Collection getClients() { } @Override - public void send(Packet packet) { + public NetworkCallback send(Packet packet) { for (SocketIOClient client : clients) { client.send(packet); } dispatch(packet); + + return NetworkCallbacks.success(null); } public void send(Packet packet, BroadcastAckCallback ackCallback) { @@ -106,14 +109,14 @@ public void sendEvent(String name, SocketIOClient excludedClient, Object... data } dispatch(packet); } - + @Override - public void sendEvent(String name, Object... data) { + public NetworkCallback sendEvent(String name, Object... data) { Packet packet = new Packet(PacketType.MESSAGE); packet.setSubType(PacketType.EVENT); packet.setName(name); packet.setData(Arrays.asList(data)); - send(packet); + return send(packet); } public void sendEvent(String name, Object data, BroadcastAckCallback ackCallback) { diff --git a/src/main/java/com/corundumstudio/socketio/ClientOperations.java b/src/main/java/com/corundumstudio/socketio/ClientOperations.java index bbd48d46..5e5e5da2 100644 --- a/src/main/java/com/corundumstudio/socketio/ClientOperations.java +++ b/src/main/java/com/corundumstudio/socketio/ClientOperations.java @@ -30,7 +30,7 @@ public interface ClientOperations { * * @param packet - packet to send */ - void send(Packet packet); + NetworkCallback send(Packet packet); /** * Disconnect client @@ -44,6 +44,5 @@ public interface ClientOperations { * @param name - event name * @param data - event data */ - void sendEvent(String name, Object ... data); - + NetworkCallback sendEvent(String name, Object ... data); } diff --git a/src/main/java/com/corundumstudio/socketio/NetworkCallback.java b/src/main/java/com/corundumstudio/socketio/NetworkCallback.java new file mode 100644 index 00000000..16fb3dd8 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/NetworkCallback.java @@ -0,0 +1,26 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio; + +import java.util.concurrent.Future; + +/** + * Callback for the network operations. + */ +public interface NetworkCallback extends Future { + /* todo: replace with > */ + void addCallback(NetworkCallbackListener> callback); +} diff --git a/src/main/java/com/corundumstudio/socketio/NetworkCallbackListener.java b/src/main/java/com/corundumstudio/socketio/NetworkCallbackListener.java new file mode 100644 index 00000000..f81610c4 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/NetworkCallbackListener.java @@ -0,0 +1,26 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio; + +import java.util.concurrent.Future; +import java.util.EventListener; + +/** + * Listener for {@link NetworkCallback}. + */ +public interface NetworkCallbackListener> extends EventListener { + void operationComplete(F var1) throws Exception; +} diff --git a/src/main/java/com/corundumstudio/socketio/NetworkCallbacks.java b/src/main/java/com/corundumstudio/socketio/NetworkCallbacks.java new file mode 100644 index 00000000..700ad2a9 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/NetworkCallbacks.java @@ -0,0 +1,39 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio; + +import com.corundumstudio.socketio.concurrent.FailedFuture; +import com.corundumstudio.socketio.concurrent.SucceededFuture; +import io.netty.util.concurrent.ImmediateEventExecutor; + +/** + * Object instances of {@link NetworkCallback} + */ +public class NetworkCallbacks { + private static Throwable CHANNEL_CLOSED_ERROR = new IllegalStateException("channel is closed"); + + public static NetworkCallback channelClosed() { + return new FailedFuture(ImmediateEventExecutor.INSTANCE, CHANNEL_CLOSED_ERROR); + } + + public static NetworkCallback success(T value) { + return new SucceededFuture(ImmediateEventExecutor.INSTANCE, value); + } + + public static NetworkCallback failure(E throwable) { + return new FailedFuture(ImmediateEventExecutor.INSTANCE, throwable); + } +} diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOClient.java b/src/main/java/com/corundumstudio/socketio/SocketIOClient.java index 45453a93..f322bfe7 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOClient.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOClient.java @@ -109,4 +109,9 @@ public interface SocketIOClient extends ClientOperations, Store { */ Set getAllRooms(); + /** + * Check is network buffer is empty and client is ready to read a data. + * @return true in case channel is ready to consume a bit of data. + */ + boolean isWritable(); } diff --git a/src/main/java/com/corundumstudio/socketio/concurrent/CompleteFuture.java b/src/main/java/com/corundumstudio/socketio/concurrent/CompleteFuture.java new file mode 100644 index 00000000..2c3cbff5 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/concurrent/CompleteFuture.java @@ -0,0 +1,43 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio.concurrent; + +import com.corundumstudio.socketio.NetworkCallback; +import com.corundumstudio.socketio.NetworkCallbackListener; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.GenericFutureListener; + +import java.util.concurrent.Future; + +/** + * Simple promise which already has a result. + */ +public abstract class CompleteFuture extends io.netty.util.concurrent.CompleteFuture implements NetworkCallback { + + protected CompleteFuture(EventExecutor executor) { + super(executor); + } + + @Override + public void addCallback(final NetworkCallbackListener> callback) { + super.addListener(new GenericFutureListener>() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + callback.operationComplete(future); + } + }); + } +} diff --git a/src/main/java/com/corundumstudio/socketio/concurrent/DefaultNetworkPromise.java b/src/main/java/com/corundumstudio/socketio/concurrent/DefaultNetworkPromise.java new file mode 100644 index 00000000..ac5f9be3 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/concurrent/DefaultNetworkPromise.java @@ -0,0 +1,44 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio.concurrent; + +import com.corundumstudio.socketio.NetworkCallback; +import com.corundumstudio.socketio.NetworkCallbackListener; +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.GenericFutureListener; + +import java.util.concurrent.Future; + +/** + * Simple network callback. + */ +public class DefaultNetworkPromise extends DefaultPromise implements NetworkCallback { + + public DefaultNetworkPromise(EventExecutor executor) { + super(executor); + } + + @Override + public void addCallback(final NetworkCallbackListener> callback) { + super.addListener(new GenericFutureListener>() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + callback.operationComplete(future); + } + }); + } +} diff --git a/src/main/java/com/corundumstudio/socketio/concurrent/FailedFuture.java b/src/main/java/com/corundumstudio/socketio/concurrent/FailedFuture.java new file mode 100644 index 00000000..840368c5 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/concurrent/FailedFuture.java @@ -0,0 +1,55 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio.concurrent; + +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.internal.PlatformDependent; + +public final class FailedFuture extends CompleteFuture { + private final Throwable cause; + + public FailedFuture(EventExecutor executor, Throwable cause) { + super(executor); + if (cause == null) { + throw new NullPointerException("cause"); + } else { + this.cause = cause; + } + } + + public Throwable cause() { + return this.cause; + } + + public boolean isSuccess() { + return false; + } + + public Future sync() { + PlatformDependent.throwException(this.cause); + return this; + } + + public Future syncUninterruptibly() { + PlatformDependent.throwException(this.cause); + return this; + } + + public V getNow() { + return null; + } +} diff --git a/src/main/java/com/corundumstudio/socketio/concurrent/SucceededFuture.java b/src/main/java/com/corundumstudio/socketio/concurrent/SucceededFuture.java new file mode 100644 index 00000000..bce4def3 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/concurrent/SucceededFuture.java @@ -0,0 +1,39 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio.concurrent; + +import io.netty.util.concurrent.EventExecutor; + +public final class SucceededFuture extends CompleteFuture { + private final V result; + + public SucceededFuture(EventExecutor executor, V result) { + super(executor); + this.result = result; + } + + public Throwable cause() { + return null; + } + + public boolean isSuccess() { + return true; + } + + public V getNow() { + return this.result; + } +} diff --git a/src/main/java/com/corundumstudio/socketio/handler/ClientHead.java b/src/main/java/com/corundumstudio/socketio/handler/ClientHead.java index 82cd8600..c883d85e 100644 --- a/src/main/java/com/corundumstudio/socketio/handler/ClientHead.java +++ b/src/main/java/com/corundumstudio/socketio/handler/ClientHead.java @@ -15,19 +15,6 @@ */ package com.corundumstudio.socketio.handler; -import java.net.SocketAddress; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Queue; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.corundumstudio.socketio.Configuration; import com.corundumstudio.socketio.DisconnectableHub; import com.corundumstudio.socketio.HandshakeData; @@ -43,13 +30,21 @@ import com.corundumstudio.socketio.store.Store; import com.corundumstudio.socketio.store.StoreFactory; import com.corundumstudio.socketio.transport.NamespaceClient; - import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.EventLoop; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.util.AttributeKey; import io.netty.util.internal.PlatformDependent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.SocketAddress; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; public class ClientHead { @@ -76,8 +71,8 @@ public class ClientHead { private volatile Transport currentTransport; public ClientHead(UUID sessionId, AckManager ackManager, DisconnectableHub disconnectable, - StoreFactory storeFactory, HandshakeData handshakeData, ClientsBox clientsBox, Transport transport, CancelableScheduler disconnectScheduler, - Configuration configuration) { + StoreFactory storeFactory, HandshakeData handshakeData, ClientsBox clientsBox, Transport transport, CancelableScheduler disconnectScheduler, + Configuration configuration) { this.sessionId = sessionId; this.ackManager = ackManager; this.disconnectableHub = disconnectable; @@ -132,6 +127,22 @@ public void run() { }, configuration.getPingTimeout() + configuration.getPingInterval(), TimeUnit.MILLISECONDS); } + public boolean isWritable() { + TransportState state = channels.get(getCurrentTransport()); + Channel channel = state.getChannel(); + return channel != null && channel.isWritable(); + } + + public EventLoop eventLoop() { + TransportState state = channels.get(getCurrentTransport()); + Channel channel = state.getChannel(); + if (channel != null) { + return channel.eventLoop(); + } else { + return null; + } + } + public ChannelFuture send(Packet packet, Transport transport) { TransportState state = channels.get(transport); state.getPacketsQueue().add(packet); @@ -260,6 +271,7 @@ public Queue getPacketsQueue(Transport transport) { public void setLastBinaryPacket(Packet lastBinaryPacket) { this.lastBinaryPacket = lastBinaryPacket; } + public Packet getLastBinaryPacket() { return lastBinaryPacket; } diff --git a/src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java b/src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java index 73f82f3e..f9e652b9 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java +++ b/src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java @@ -21,13 +21,13 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import com.corundumstudio.socketio.*; +import com.corundumstudio.socketio.concurrent.DefaultNetworkPromise; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.corundumstudio.socketio.AckCallback; -import com.corundumstudio.socketio.HandshakeData; -import com.corundumstudio.socketio.SocketIOClient; -import com.corundumstudio.socketio.Transport; import com.corundumstudio.socketio.handler.ClientHead; import com.corundumstudio.socketio.namespace.Namespace; import com.corundumstudio.socketio.protocol.Packet; @@ -67,12 +67,12 @@ public Namespace getNamespace() { } @Override - public void sendEvent(String name, Object ... data) { + public NetworkCallback sendEvent(String name, Object ... data) { Packet packet = new Packet(PacketType.MESSAGE); packet.setSubType(PacketType.EVENT); packet.setName(name); packet.setData(Arrays.asList(data)); - send(packet); + return send(packet); } @Override @@ -100,12 +100,22 @@ public void send(Packet packet, AckCallback ackCallback) { } @Override - public void send(Packet packet) { + public NetworkCallback send(Packet packet) { if (!isConnected()) { - return; + return NetworkCallbacks.channelClosed(); } packet.setNsp(namespace.getName()); - baseClient.send(packet); + + final DefaultNetworkPromise promise = new DefaultNetworkPromise(baseClient.eventLoop()); + baseClient.send(packet).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + if(channelFuture.isSuccess()) promise.setSuccess(null); + else promise.setFailure(channelFuture.cause()); + } + }); + + return promise; } public void onDisconnect() { @@ -207,4 +217,8 @@ public HandshakeData getHandshakeData() { return baseClient.getHandshakeData(); } + @Override + public boolean isWritable() { + return isConnected() && this.baseClient.isWritable(); + } }