Skip to content

Commit

Permalink
Support for slow client #483
Browse files Browse the repository at this point in the history
* add new method isWritable to check netty channel state
* add future based callback
  • Loading branch information
sergeygrigorev authored and sergeygrigorev committed Oct 18, 2017
1 parent 567ad95 commit f844299
Show file tree
Hide file tree
Showing 12 changed files with 337 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import com.corundumstudio.socketio.misc.IterableCollection;
import com.corundumstudio.socketio.namespace.Namespace;
Expand Down Expand Up @@ -71,11 +72,13 @@ public Collection<SocketIOClient> getClients() {
}

@Override
public void send(Packet packet) {
public NetworkCallback<? extends Void> send(Packet packet) {
for (SocketIOClient client : clients) {
client.send(packet);
}
dispatch(packet);

return NetworkCallbacks.success(null);
}

public <T> void send(Packet packet, BroadcastAckCallback<T> ackCallback) {
Expand Down Expand Up @@ -106,14 +109,14 @@ public void sendEvent(String name, SocketIOClient excludedClient, Object... data
}
dispatch(packet);
}

@Override
public void sendEvent(String name, Object... data) {
public NetworkCallback<? extends Void> sendEvent(String name, Object... data) {
Packet packet = new Packet(PacketType.MESSAGE);
packet.setSubType(PacketType.EVENT);
packet.setName(name);
packet.setData(Arrays.asList(data));
send(packet);
return send(packet);
}

public <T> void sendEvent(String name, Object data, BroadcastAckCallback<T> ackCallback) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface ClientOperations {
*
* @param packet - packet to send
*/
void send(Packet packet);
NetworkCallback<? extends Void> send(Packet packet);

/**
* Disconnect client
Expand All @@ -44,6 +44,5 @@ public interface ClientOperations {
* @param name - event name
* @param data - event data
*/
void sendEvent(String name, Object ... data);

NetworkCallback<? extends Void> sendEvent(String name, Object ... data);
}
26 changes: 26 additions & 0 deletions src/main/java/com/corundumstudio/socketio/NetworkCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio;

import java.util.concurrent.Future;

/**
* Callback for the network operations.
*/
public interface NetworkCallback<V> extends Future<V> {
/* todo: replace with <? extends Future<? super V>> */
void addCallback(NetworkCallbackListener<Future<? super V>> callback);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio;

import java.util.concurrent.Future;
import java.util.EventListener;

/**
* Listener for {@link NetworkCallback}.
*/
public interface NetworkCallbackListener<F extends Future<?>> extends EventListener {
void operationComplete(F var1) throws Exception;
}
39 changes: 39 additions & 0 deletions src/main/java/com/corundumstudio/socketio/NetworkCallbacks.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio;

import com.corundumstudio.socketio.concurrent.FailedFuture;
import com.corundumstudio.socketio.concurrent.SucceededFuture;
import io.netty.util.concurrent.ImmediateEventExecutor;

/**
* Object instances of {@link NetworkCallback}
*/
public class NetworkCallbacks {
private static Throwable CHANNEL_CLOSED_ERROR = new IllegalStateException("channel is closed");

public static <T> NetworkCallback<T> channelClosed() {
return new FailedFuture<T>(ImmediateEventExecutor.INSTANCE, CHANNEL_CLOSED_ERROR);
}

public static <T> NetworkCallback<T> success(T value) {
return new SucceededFuture<T>(ImmediateEventExecutor.INSTANCE, value);
}

public static <E extends Throwable> NetworkCallback<E> failure(E throwable) {
return new FailedFuture<E>(ImmediateEventExecutor.INSTANCE, throwable);
}
}
5 changes: 5 additions & 0 deletions src/main/java/com/corundumstudio/socketio/SocketIOClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,9 @@ public interface SocketIOClient extends ClientOperations, Store {
*/
Set<String> getAllRooms();

/**
* Check is network buffer is empty and client is ready to read a data.
* @return true in case channel is ready to consume a bit of data.
*/
boolean isWritable();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.concurrent;

import com.corundumstudio.socketio.NetworkCallback;
import com.corundumstudio.socketio.NetworkCallbackListener;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GenericFutureListener;

import java.util.concurrent.Future;

/**
* Simple promise which already has a result.
*/
public abstract class CompleteFuture<V> extends io.netty.util.concurrent.CompleteFuture <V> implements NetworkCallback<V> {

protected CompleteFuture(EventExecutor executor) {
super(executor);
}

@Override
public void addCallback(final NetworkCallbackListener<Future<? super V>> callback) {
super.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super V>>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<? super V> future) throws Exception {
callback.operationComplete(future);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.concurrent;

import com.corundumstudio.socketio.NetworkCallback;
import com.corundumstudio.socketio.NetworkCallbackListener;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GenericFutureListener;

import java.util.concurrent.Future;

/**
* Simple network callback.
*/
public class DefaultNetworkPromise<V> extends DefaultPromise<V> implements NetworkCallback<V> {

public DefaultNetworkPromise(EventExecutor executor) {
super(executor);
}

@Override
public void addCallback(final NetworkCallbackListener<Future<? super V>> callback) {
super.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super V>>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<? super V> future) throws Exception {
callback.operationComplete(future);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.concurrent;

import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.PlatformDependent;

public final class FailedFuture<V> extends CompleteFuture<V> {
private final Throwable cause;

public FailedFuture(EventExecutor executor, Throwable cause) {
super(executor);
if (cause == null) {
throw new NullPointerException("cause");
} else {
this.cause = cause;
}
}

public Throwable cause() {
return this.cause;
}

public boolean isSuccess() {
return false;
}

public Future<V> sync() {
PlatformDependent.throwException(this.cause);
return this;
}

public Future<V> syncUninterruptibly() {
PlatformDependent.throwException(this.cause);
return this;
}

public V getNow() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.concurrent;

import io.netty.util.concurrent.EventExecutor;

public final class SucceededFuture<V> extends CompleteFuture<V> {
private final V result;

public SucceededFuture(EventExecutor executor, V result) {
super(executor);
this.result = result;
}

public Throwable cause() {
return null;
}

public boolean isSuccess() {
return true;
}

public V getNow() {
return this.result;
}
}
Loading

0 comments on commit f844299

Please sign in to comment.