Skip to content

Commit

Permalink
初始化一些代码
Browse files Browse the repository at this point in the history
  • Loading branch information
黄志磊 committed Dec 23, 2015
1 parent 45a31c7 commit 3ba747c
Show file tree
Hide file tree
Showing 16 changed files with 387 additions and 79 deletions.
5 changes: 5 additions & 0 deletions mpush-api/src/main/java/com/shinemo/mpush/api/Connection.java
Expand Up @@ -14,4 +14,9 @@ public interface Connection {
boolean isClosed();

boolean isOpen();

void refreshLastReadTime(long lastReadTime);

void close();

}
15 changes: 13 additions & 2 deletions mpush-api/src/main/java/com/shinemo/mpush/api/protocol/Packet.java
@@ -1,11 +1,15 @@
package com.shinemo.mpush.api.protocol;

import java.io.Serializable;
import java.util.Arrays;

/**
* Created by ohun on 2015/12/19.
* magic(2)+cmd(1)+version(1)+flags(1)+msgId(4)+length(4)+body(n)
*/
public class Packet {
public byte command;
public class Packet implements Serializable{
private static final long serialVersionUID = -2725825199998223372L;
public byte command;
public byte version;
public byte flags;
public int msgId;
Expand All @@ -14,4 +18,11 @@ public class Packet {
public int getBodyLength() {
return body == null ? 0 : body.length;
}

@Override
public String toString() {
return "Packet [command=" + command + ", version=" + version + ", flags=" + flags + ", msgId=" + msgId + ", body=" + Arrays.toString(body) + "]";
}


}
6 changes: 5 additions & 1 deletion mpush-connection/pom.xml
Expand Up @@ -22,6 +22,10 @@
<groupId>com.shinemo.mpush</groupId>
<artifactId>mpush-core</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
</dependencies>

</project>
</project>
@@ -0,0 +1,21 @@
package com.shinemo.mpush.connection.netty;

import com.shinemo.mpush.core.thread.NamedThreadFactory;
import com.shinemo.mpush.core.thread.ThreadNameSpace;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;

public class NettySharedHolder {

public static final Timer timer = new HashedWheelTimer(new NamedThreadFactory(ThreadNameSpace.NETTY_TIMER));

public static final ByteBufAllocator byteBufAllocator;

static {
byteBufAllocator = UnpooledByteBufAllocator.DEFAULT;
}

}
@@ -1,4 +1,4 @@
package com.shinemo.mpush.api.protocol;
package com.shinemo.mpush.connection.netty.encoder;

import com.shinemo.mpush.api.Constants;
import com.shinemo.mpush.api.exception.DecodeException;
Expand Down
@@ -1,6 +1,8 @@
package com.shinemo.mpush.api.protocol;
package com.shinemo.mpush.connection.netty.encoder;

import com.shinemo.mpush.api.Constants;
import com.shinemo.mpush.api.protocol.Packet;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
Expand Down
@@ -1,4 +1,4 @@
package com.shinemo.mpush.connection;
package com.shinemo.mpush.connection.netty.handler;


import com.shinemo.mpush.api.protocol.Packet;
Expand Down
@@ -1,7 +1,15 @@
package com.shinemo.mpush.connection;
package com.shinemo.mpush.connection.netty.server;

import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.shinemo.mpush.connection.netty.NettySharedHolder;
import com.shinemo.mpush.connection.netty.encoder.PacketDecoder;
import com.shinemo.mpush.connection.netty.encoder.PacketEncoder;
import com.shinemo.mpush.connection.netty.handler.ConnectionHandler;

import com.shinemo.mpush.api.protocol.PacketDecoder;
import com.shinemo.mpush.api.protocol.PacketEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
Expand All @@ -15,6 +23,11 @@
* Created by ohun on 2015/12/22.
*/
public class ConnectionServer {

private static final Logger log = LoggerFactory.getLogger(ConnectionServer.class);

private final AtomicBoolean startFlag = new AtomicBoolean(false);

private final int port;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
Expand All @@ -24,11 +37,18 @@ public ConnectionServer(int port) {
}

public void stop() {
log.info("netty server stop now");
this.startFlag.set(false);
if (workerGroup != null) workerGroup.shutdownGracefully();
if (bossGroup != null) bossGroup.shutdownGracefully();
}

public void start() {

if (!startFlag.compareAndSet(false, true)) {
return;
}

/***
* NioEventLoopGroup 是用来处理I/O操作的多线程事件循环器,
* Netty提供了许多不同的EventLoopGroup的实现用来处理不同传输协议。
Expand Down Expand Up @@ -88,13 +108,17 @@ public void initChannel(SocketChannel ch) throws Exception {
* 请参考ChannelOption和详细的ChannelConfig实现的接口文档以此可以对ChannelOptions的有一个大概的认识。
*/
b.option(ChannelOption.SO_BACKLOG, 1024);

/***
* option()是提供给NioServerSocketChannel用来接收进来的连接。
* childOption()是提供给由父管道ServerChannel接收到的连接,
* 在这个例子中也是NioServerSocketChannel。
*/
b.childOption(ChannelOption.SO_KEEPALIVE, true);

b.option(ChannelOption.ALLOCATOR, NettySharedHolder.byteBufAllocator);
b.childOption(ChannelOption.ALLOCATOR, NettySharedHolder.byteBufAllocator);


/***
* 绑定端口并启动去接收进来的连接
Expand All @@ -105,9 +129,11 @@ public void initChannel(SocketChannel ch) throws Exception {
* 这里会一直等待,直到socket被关闭
*/
f.channel().closeFuture().sync();

log.info("server start ok on:"+port);

} catch (Exception e) {
e.printStackTrace();
} finally {
log.error("server start exception",e);
/***
* 优雅关闭
*/
Expand Down
37 changes: 36 additions & 1 deletion mpush-core/pom.xml
Expand Up @@ -33,6 +33,41 @@
<groupId>com.shinemo.mpush</groupId>
<artifactId>mpush-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.logback-extensions</groupId>
<artifactId>logback-ext-spring</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

</dependencies>

</project>
</project>

This file was deleted.

@@ -1,27 +1,63 @@
package com.shinemo.mpush.core;


import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.shinemo.mpush.api.Connection;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.netty.util.internal.chmv8.ConcurrentHashMapV8;

import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* Created by ohun on 2015/12/22.
*/
public class ConnectionManager {
public static final ConnectionManager INSTANCE = new ConnectionManager();
private Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
// private final ConcurrentMap<String, NettyConnection> connections = new ConcurrentHashMapV8<String, NettyConnection>();

public Connection get(String channelId) {
return connections.get(channelId);
private final Cache<String,NettyConnection> cacherClients = CacheBuilder.newBuilder()
.maximumSize(2<<17)
.expireAfterAccess(27, TimeUnit.MINUTES)
.removalListener(new RemovalListener<String, NettyConnection>() {
public void onRemoval(RemovalNotification<String,NettyConnection> notification) {
if(notification.getValue().isClosed()){
// notification.getValue().close("[Remoting] removed from cache");
}
};
}).build();

public Connection get(final String channelId) throws ExecutionException {

NettyConnection client = cacherClients.get(channelId, new Callable<NettyConnection>() {
@Override
public NettyConnection call() throws Exception {
NettyConnection client = getFromRedis(channelId);
return client;
}
});
if (client == null || !client.isClosed()) {
cacherClients.invalidate(channelId);
return null;
}
return client;

}

public void add(Connection connection) {
connections.put(connection.getId(), connection);
public void add(NettyConnection connection) {
cacherClients.put(connection.getId(), connection);
}

public void remove(Connection connection) {
connections.remove(connection.getId());
public void remove(String channelId) {
cacherClients.invalidate(channelId);
}

private NettyConnection getFromRedis(String channelId){
return null;
}
}

0 comments on commit 3ba747c

Please sign in to comment.