Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.6 #513

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open

1.6 #513

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 13 additions & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.6.6-SNAPSHOT</version>
<version>1.6.8-SNAPSHOT</version>
<packaging>bundle</packaging>
<name>NettySocketIO</name>
<description>Socket.IO server implemented on Java</description>
Expand Down Expand Up @@ -97,32 +97,32 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>4.0.19.Final</version>
<version>4.0.28.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.0.19.Final</version>
<version>4.0.28.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>4.0.19.Final</version>
<version>4.0.28.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.0.19.Final</version>
<version>4.0.28.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>4.0.19.Final</version>
<version>4.0.28.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>4.0.19.Final</version>
<version>4.0.28.Final</version>
</dependency>

<dependency>
Expand All @@ -141,18 +141,18 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.6</version>
<version>1.7.10</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.3.3</version>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.3.3</version>
<version>2.5.1</version>
</dependency>


Expand All @@ -179,7 +179,7 @@
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>1.0.2</version>
<version>1.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand All @@ -196,7 +196,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
<version>2.4.2</version>
<version>2.5.1</version>
</plugin>

<plugin>
Expand Down Expand Up @@ -319,7 +319,7 @@
<failIfMissing>true</failIfMissing>
<aggregate>false</aggregate>
<includes>
<include>src/**</include>
<include>src/**/*.java</include>
</includes>
<excludes>
<exclude>target/**</exclude>
Expand Down
63 changes: 63 additions & 0 deletions src/main/java/com/corundumstudio/socketio/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,18 @@ public class Configuration {
private int heartbeatTimeout = 60;
private int heartbeatInterval = 25;
private int closeTimeout = 60;
private int firstDataTimeout = 5;

private int maxHttpContentLength = 64 * 1024;
private int maxFramePayloadLength = 64 * 1024;
private boolean useStrictOrdering = false;

private String packagePrefix;
private String hostname;
private int port = -1;

private InputStream crossDomainPolicy;

private String keyStoreFormat = "JKS";
private InputStream keyStore;
private String keyStorePassword;
Expand All @@ -70,6 +74,8 @@ public class Configuration {

private AuthorizationListener authorizationListener = new SuccessAuthorizationListener();

private String origin;

private AckMode ackMode = AckMode.AUTO_SUCCESS_ONLY;

public Configuration() {
Expand Down Expand Up @@ -116,6 +122,10 @@ public Configuration() {
setSocketConfig(conf.getSocketConfig());
setAckMode(conf.getAckMode());
setMaxFramePayloadLength(conf.getMaxFramePayloadLength());
setOrigin(conf.getOrigin());
setCrossDomainPolicy(conf.getCrossDomainPolicy());
setUseStrictOrdering(conf.isUseStrictOrdering());
setFirstDataTimeout(conf.getFirstDataTimeout());
}

private String join(Transport[] transports) {
Expand Down Expand Up @@ -474,4 +484,57 @@ public int getMaxFramePayloadLength() {
return maxFramePayloadLength;
}

/**
* Set <b>Access-Control-Allow-Origin</b> header value for http each response.
* Default is {@code null}.
*
* If value is {@code null} then request {@code ORIGIN} header value used.
*
* @param origin
*/
public void setOrigin(String origin) {
this.origin = origin;
}
public String getOrigin() {
return origin;
}

/**
* crossdomain.xml file stream used for flash-socket transport
*
* @param crossDomainPolicy
*/
public void setCrossDomainPolicy(InputStream crossDomainPolicy) {
this.crossDomainPolicy = crossDomainPolicy;
}
public InputStream getCrossDomainPolicy() {
return crossDomainPolicy;
}

/**
* Packet strict ordering in websocket transport
*
* @param useStrictOrdering
*/
public void setUseStrictOrdering(boolean useStrictOrdering) {
this.useStrictOrdering = useStrictOrdering;
}
public boolean isUseStrictOrdering() {
return useStrictOrdering;
}

/**
* Timeout between channel opening and first data transfer
* Helps to avoid 'silent channel' attack and prevents
* 'Too many open files' problem in this case
*
* @param firstDataTimeout
*/
public void setFirstDataTimeout(int firstDataTimeout) {
this.firstDataTimeout = firstDataTimeout;
}
public int getFirstDataTimeout() {
return firstDataTimeout;
}

}
19 changes: 14 additions & 5 deletions src/main/java/com/corundumstudio/socketio/HandshakeData.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Date;
import java.util.List;
import java.util.Map;
Expand All @@ -26,7 +27,7 @@ public class HandshakeData implements Serializable {
private static final long serialVersionUID = 1196350300161819978L;

private Map<String, List<String>> headers;
private InetSocketAddress address;
private SocketAddress address;
private Date time = new Date();
private String url;
private Map<String, List<String>> urlParams;
Expand All @@ -35,7 +36,7 @@ public class HandshakeData implements Serializable {
public HandshakeData() {
}

public HandshakeData(Map<String, List<String>> headers, Map<String, List<String>> urlParams, InetSocketAddress address, String url, boolean xdomain) {
public HandshakeData(Map<String, List<String>> headers, Map<String, List<String>> urlParams, SocketAddress address, String url, boolean xdomain) {
super();
this.headers = headers;
this.urlParams = urlParams;
Expand All @@ -44,9 +45,17 @@ public HandshakeData(Map<String, List<String>> headers, Map<String, List<String>
this.xdomain = xdomain;
}

public InetSocketAddress getAddress() {
return address;
}
/**
* @deprecated use {@link #getSocketAddress()} instead
*/
@Deprecated
public InetSocketAddress getAddress() {
return (InetSocketAddress) address;
}

public SocketAddress getSocketAddress() {
return address;
}

public Map<String, List<String>> getHeaders() {
return headers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class SocketIOChannelInitializer extends ChannelInitializer<Channel> impl
private XHRPollingTransport xhrPollingTransport;
private WebSocketTransport webSocketTransport;
private FlashSocketTransport flashSocketTransport;
private final FlashPolicyHandler flashPolicyHandler = new FlashPolicyHandler();
private FlashPolicyHandler flashPolicyHandler;
private ResourceHandler resourceHandler;
private EncoderHandler encoderHandler;
private WrongUrlHandler wrongUrlHandler;
Expand Down Expand Up @@ -124,18 +124,19 @@ public void start(Configuration configuration, NamespacesHub namespacesHub) {
}
}

flashPolicyHandler = new FlashPolicyHandler(configuration);
packetHandler = new PacketHandler(packetListener, decoder, namespacesHub, configuration.getExceptionListener());
authorizeHandler = new AuthorizeHandler(connectPath, scheduler, configuration, namespacesHub);

StoreFactory factory = configuration.getStoreFactory();
factory.init(namespacesHub, authorizeHandler, jsonSupport);

xhrPollingTransport = new XHRPollingTransport(connectPath, ackManager, this, scheduler, authorizeHandler, configuration);
webSocketTransport = new WebSocketTransport(connectPath, isSsl, ackManager, this, authorizeHandler, heartbeatHandler, factory, configuration.getMaxFramePayloadLength());
flashSocketTransport = new FlashSocketTransport(connectPath, isSsl, ackManager, this, authorizeHandler, heartbeatHandler, factory, configuration.getMaxFramePayloadLength());
webSocketTransport = new WebSocketTransport(connectPath, isSsl, ackManager, this, authorizeHandler, heartbeatHandler, factory, configuration);
flashSocketTransport = new FlashSocketTransport(connectPath, isSsl, ackManager, this, authorizeHandler, heartbeatHandler, factory, configuration);

resourceHandler = new ResourceHandler(configuration.getContext());
encoderHandler = new EncoderHandler(encoder);
encoderHandler = new EncoderHandler(encoder, configuration);
wrongUrlHandler = new WrongUrlHandler();
}

Expand Down
24 changes: 20 additions & 4 deletions src/main/java/com/corundumstudio/socketio/SocketIOServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
package com.corundumstudio.socketio;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;

import java.net.InetSocketAddress;
import java.util.Collection;
Expand Down Expand Up @@ -115,6 +118,11 @@ public BroadcastOperations getRoomOperations(String room) {
* Start server
*/
public void start() {
startAsync().awaitUninterruptibly();
}

public Future<Void> startAsync() {
log.info("Session store / pubsub factory used: {}", configCopy.getStoreFactory());
initGroups();

pipelineFactory.start(configCopy, namespacesHub);
Expand All @@ -129,10 +137,18 @@ public void start() {
addr = new InetSocketAddress(configCopy.getHostname(), configCopy.getPort());
}

b.bind(addr).syncUninterruptibly();

log.info("Session store / pubsub factory used: {}", configCopy.getStoreFactory());
log.info("SocketIO server started at port: {}", configCopy.getPort());
ChannelFuture future = b.bind(addr);
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (future.isSuccess()) {
log.info("SocketIO server started at port: {}", configCopy.getPort());
} else {
log.error("SocketIO server start at port: {} failed!", configCopy.getPort());
}
}
});
return future;
}

protected void applyConnectionOptions(ServerBootstrap bootstrap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.netty.handler.codec.http.QueryStringDecoder;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -77,9 +76,26 @@ public AuthorizeHandler(String connectPath, CancelableScheduler scheduler, Confi

this.authorizedSessionIds = configuration.getStoreFactory().createMap("authorizedSessionIds");
}

@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
SchedulerKey key = new SchedulerKey(Type.CLOSE_TIMEOUT, ctx.channel());
disconnectScheduler.schedule(key, new Runnable() {
@Override
public void run() {
ctx.channel().close();
log.debug("Client with ip {} opens channel but not sended any data! Channel closed!", ctx.channel().remoteAddress());
}
}, configuration.getFirstDataTimeout(), TimeUnit.SECONDS);

super.channelActive(ctx);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
SchedulerKey key = new SchedulerKey(Type.CLOSE_TIMEOUT, ctx.channel());
disconnectScheduler.cancel(key);

if (msg instanceof FullHttpRequest) {
FullHttpRequest req = (FullHttpRequest) msg;
Channel channel = ctx.channel();
Expand Down Expand Up @@ -112,7 +128,7 @@ private void authorize(Channel channel, String origin, Map<String, List<String>>
}

HandshakeData data = new HandshakeData(headers, params,
(InetSocketAddress)channel.remoteAddress(),
channel.remoteAddress(),
req.getUri(), origin != null && !origin.equalsIgnoreCase("null"));

boolean result = false;
Expand All @@ -137,13 +153,13 @@ private void authorize(Channel channel, String origin, Map<String, List<String>>
channel.writeAndFlush(new AuthorizeMessage(msg, jsonpParam, origin, sessionId));

authorizedSessionIds.put(sessionId, data);
log.debug("Handshake authorized for sessionId: {}", sessionId);
log.debug("Handshake authorized for sessionId: {} query params: {} headers: {}", sessionId, params, headers);
} else {
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.UNAUTHORIZED);
ChannelFuture f = channel.writeAndFlush(res);
f.addListener(ChannelFutureListener.CLOSE);

log.debug("Handshake unauthorized");
log.debug("Handshake unauthorized, query params: {} headers: {}", params, headers);
}
}

Expand Down