Skip to content

Commit

Permalink
接入层增加websocket协议支持
Browse files Browse the repository at this point in the history
  • Loading branch information
夜色 committed Dec 18, 2016
1 parent 817a26b commit 27df79b
Show file tree
Hide file tree
Showing 48 changed files with 816 additions and 244 deletions.
2 changes: 2 additions & 0 deletions conf/reference.conf
Expand Up @@ -47,6 +47,8 @@ mp {
gateway-server-net=udp //网关服务使用的网络类型tcp/udp
gateway-server-multicast="239.239.239.88" //239.0.0.0~239.255.255.255为本地管理组播地址,仅在特定的本地范围内有效
gateway-client-multicast="239.239.239.99" //239.0.0.0~239.255.255.255为本地管理组播地址,仅在特定的本地范围内有效
ws-server-port=0 //websocket对外端口, 公网端口, 0表示禁用websocket
ws-path="/" //websocket path
public-host-mapping { //本机局域网IP和公网IP的映射关系
//"10.0.10.156":"111.1.32.137"
//"10.0.10.166":"111.1.33.138"
Expand Down
6 changes: 6 additions & 0 deletions mpush-api/pom.xml
Expand Up @@ -22,6 +22,12 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>

Expand Down
4 changes: 4 additions & 0 deletions mpush-api/src/main/java/com/mpush/api/Message.java
Expand Up @@ -32,6 +32,10 @@ public interface Message {

Connection getConnection();

void decodeBody();

void encodeBody();

/**
* 发送当前message, 并根据情况最body进行数据压缩、加密
*
Expand Down
90 changes: 90 additions & 0 deletions mpush-api/src/main/java/com/mpush/api/protocol/JsonPacket.java
@@ -0,0 +1,90 @@
/*
* (C) Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* ohun@live.cn (夜色)
*/

package com.mpush.api.protocol;


import com.mpush.api.Constants;
import com.mpush.api.spi.common.Json;
import com.mpush.api.spi.common.JsonFactory;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.util.Map;

/**
* Created by ohun on 2016/12/16.
*
* @author ohun@live.cn (夜色)
*/
public final class JsonPacket extends Packet {

public Map<String, Object> body;

public JsonPacket() {
super(Command.UNKNOWN);
this.addFlag(FLAG_JSON_BODY);
}

public JsonPacket(Command cmd, int sessionId) {
super(cmd, sessionId);
this.addFlag(FLAG_JSON_BODY);
}

@Override
@SuppressWarnings("unchecked")
public Map<String, Object> getBody() {
return body;
}

@Override
@SuppressWarnings("unchecked")
public <T> void setBody(T body) {
this.body = (Map<String, Object>) body;
}

@Override
public int getBodyLength() {
return body == null ? 0 : body.size();
}

@Override
public Packet response(Command command) {
return new JsonPacket(command, sessionId);
}

@Override
public Object toFrame(Channel channel) {
byte[] json = Json.JSON.toJson(this).getBytes(Constants.UTF_8);
return new TextWebSocketFrame(Unpooled.wrappedBuffer(json));
}

@Override
public String toString() {
return "JsonPacket{" +
"cmd=" + cmd +
", cc=" + cc +
", flags=" + flags +
", sessionId=" + sessionId +
", lrc=" + lrc +
", body=" + body +
'}';
}
}
46 changes: 42 additions & 4 deletions mpush-api/src/main/java/com/mpush/api/protocol/Packet.java
Expand Up @@ -21,6 +21,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;

import java.net.InetSocketAddress;

Expand All @@ -44,11 +45,11 @@ public class Packet {
public static final Packet HB_PACKET = new Packet(Command.HEARTBEAT);

public byte cmd; //命令
public short cc; //校验码 暂时没有用到
transient public short cc; //校验码 暂时没有用到
public byte flags; //特性,如是否加密,是否压缩等
public int sessionId; // 会话id。客户端生成。
public byte lrc; // 校验,纵向冗余校验。只校验head
public byte[] body;
transient public byte lrc; // 校验,纵向冗余校验。只校验head
transient public byte[] body;

public Packet(byte cmd) {
this.cmd = cmd;
Expand Down Expand Up @@ -84,6 +85,10 @@ public <T> T getBody() {
return (T) body;
}

public <T> void setBody(T body) {
this.body = (byte[]) body;
}

public short calcCheckCode() {
short checkCode = 0;
if (body != null) {
Expand All @@ -109,7 +114,7 @@ public byte calcLrc() {
return lrc;
}

public boolean vaildCheckCode() {
public boolean validCheckCode() {
return calcCheckCode() == cc;
}

Expand All @@ -128,6 +133,39 @@ public Packet response(Command command) {
return new Packet(command, sessionId);
}

public Object toFrame(Channel channel) {
return this;
}

public static Packet decodePacket(Packet packet, ByteBuf in, int bodyLength) {
packet.cc = in.readShort();//read cc
packet.flags = in.readByte();//read flags
packet.sessionId = in.readInt();//read sessionId
packet.lrc = in.readByte();//read lrc

//read body
if (bodyLength > 0) {
in.readBytes(packet.body = new byte[bodyLength]);
}
return packet;
}

public static void encodePacket(Packet packet, ByteBuf out) {
if (packet.cmd == Command.HEARTBEAT.cmd) {
out.writeByte(Packet.HB_PACKET_BYTE);
} else {
out.writeInt(packet.getBodyLength());
out.writeByte(packet.cmd);
out.writeShort(packet.cc);
out.writeByte(packet.flags);
out.writeInt(packet.sessionId);
out.writeByte(packet.lrc);
if (packet.getBodyLength() > 0) {
out.writeBytes(packet.body);
}
}
}

@Override
public String toString() {
return "{" +
Expand Down
14 changes: 14 additions & 0 deletions mpush-api/src/main/java/com/mpush/api/protocol/UDPPacket.java
Expand Up @@ -19,8 +19,14 @@

package com.mpush.api.protocol;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.socket.DatagramPacket;

import java.net.InetSocketAddress;

import static com.mpush.api.protocol.Command.HEARTBEAT;

/**
* Created by ohun on 16/10/21.
*
Expand Down Expand Up @@ -65,4 +71,12 @@ public void setRecipient(InetSocketAddress recipient) {
public Packet response(Command command) {
return new UDPPacket(command, sessionId, address);
}

@Override
public Object toFrame(Channel channel) {
int capacity = cmd == HEARTBEAT.cmd ? 1 : HEADER_LEN + getBodyLength();
ByteBuf out = channel.alloc().buffer(capacity, capacity);
encodePacket(this, out);
return new DatagramPacket(out, sender());
}
}
2 changes: 1 addition & 1 deletion mpush-api/src/main/java/com/mpush/api/spi/SpiLoader.java
Expand Up @@ -49,7 +49,7 @@ public static <T> T load(Class<T> clazz, String name) {
return load0(clazz, name);
}

public static <T> T load0(Class<T> clazz, String name) {
public static <T> T load0(Class<T> clazz, String name) {
ServiceLoader<T> factories = ServiceLoader.load(clazz);
T t = filterByName(factories, name);

Expand Down
Expand Up @@ -29,15 +29,11 @@
* @author ohun@live.cn
*/
public interface ExecutorFactory {
String SERVER_BOSS = "sb";
String SERVER_WORK = "sw";
String HTTP_CLIENT_WORK = "hcw";
String PUSH_CLIENT = "pc";
String PUSH_TASK = "pt";
String ACK_TIMER = "at";
String EVENT_BUS = "eb";
String MQ = "r";
String BIZ = "b";

Executor get(String name);

Expand Down
33 changes: 33 additions & 0 deletions mpush-api/src/main/java/com/mpush/api/spi/common/Json.java
@@ -0,0 +1,33 @@
/*
* (C) Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* ohun@live.cn (夜色)
*/

package com.mpush.api.spi.common;

/**
* Created by ohun on 2016/12/17.
*
* @author ohun@live.cn (夜色)
*/
public interface Json {
Json JSON = JsonFactory.create();

<T> T fromJson(String json, Class<T> clazz);

String toJson(Object json);
}
Expand Up @@ -17,27 +17,19 @@
* ohun@live.cn (夜色)
*/

package com.mpush.api.protocol;
package com.mpush.api.spi.common;

import com.mpush.api.spi.Factory;
import com.mpush.api.spi.SpiLoader;

/**
* Created by ohun on 2016/12/16.
* Created by ohun on 2016/12/17.
*
* @author ohun@live.cn (夜色)
*/
public final class TextPacket extends Packet {
public String body;

public TextPacket(byte cmd) {
super(cmd);
}

public TextPacket(byte cmd, int sessionId) {
super(cmd, sessionId);
}
public interface JsonFactory extends Factory<Json> {

@Override
@SuppressWarnings("unchecked")
public String getBody() {
return body;
static Json create() {
return SpiLoader.load(JsonFactory.class).get();
}
}
Expand Up @@ -21,12 +21,11 @@


import com.mpush.bootstrap.job.*;
import com.mpush.core.server.AdminServer;
import com.mpush.core.server.ConnectionServer;
import com.mpush.core.server.GatewayServer;
import com.mpush.core.server.GatewayUDPConnector;
import com.mpush.core.server.*;
import com.mpush.tools.config.CC;

import static com.mpush.tools.config.CC.mp.net.udpGateway;
import static com.mpush.tools.config.CC.mp.net.wsEnabled;
import static com.mpush.zk.node.ZKServerNode.*;

/**
Expand All @@ -43,6 +42,7 @@ public ServerLauncher() {
.setNext(new ZKBoot())//1.启动ZK节点数据变化监听
.setNext(new RedisBoot())//2.注册redis sever 到ZK
.setNext(new ServerBoot(ConnectionServer.I(), CS_NODE))//3.启动长连接服务
.setNext(() -> new ServerBoot(WebSocketServer.I(), WS_NODE), wsEnabled())//4.启动websocket连接服务
.setNext(new ServerBoot(udpGateway() ? GatewayUDPConnector.I() : GatewayServer.I(), GS_NODE))//4.启动网关服务
.setNext(new ServerBoot(AdminServer.I(), null))//5.启动控制台服务
.setNext(new PushCenterBoot())//6.启动http代理服务,解析dns
Expand Down

0 comments on commit 27df79b

Please sign in to comment.