Skip to content

Commit

Permalink
网关服务增加UDP及组播支持, PushClient模块代码优化
Browse files Browse the repository at this point in the history
  • Loading branch information
夜色 committed Oct 22, 2016
1 parent 010fd79 commit 36cb570
Show file tree
Hide file tree
Showing 21 changed files with 389 additions and 281 deletions.
4 changes: 2 additions & 2 deletions conf/reference.conf
Expand Up @@ -41,8 +41,8 @@ mp {
gateway-server-port=3001 //网关服务端口, 内部端口
gateway-client-port=4000 //UDP 客户端端口
gateway-server-net=tcp //网关服务使用的网络类型tcp/udp
gateway-server-multicast="239.239.239.88"
gateway-client-multicast="239.239.239.99"
gateway-server-multicast="239.239.239.88" //239.0.0.0~239.255.255.255为本地管理组播地址,仅在特定的本地范围内有效
gateway-client-multicast="239.239.239.99" //239.0.0.0~239.255.255.255为本地管理组播地址,仅在特定的本地范围内有效
public-host-mapping { //本机局域网IP和公网IP的映射关系
"10.0.10.156":"111.1.32.137"
"10.0.10.166":"111.1.33.138"
Expand Down
3 changes: 3 additions & 0 deletions mpush-api/src/main/java/com/mpush/api/protocol/Packet.java
Expand Up @@ -116,6 +116,9 @@ public InetSocketAddress sender() {
return null;
}

public void sender(InetSocketAddress sender) {
}

public Packet response(Command command) {
return new Packet(command, sessionId);
}
Expand Down
19 changes: 13 additions & 6 deletions mpush-api/src/main/java/com/mpush/api/protocol/UDPPacket.java
Expand Up @@ -27,33 +27,40 @@
* @author ohun@live.cn (夜色)
*/
public final class UDPPacket extends Packet {
private final InetSocketAddress sender;
private InetSocketAddress sender;

public UDPPacket(byte cmd, InetSocketAddress sender) {
super(cmd);
this.sender = sender;
}

public UDPPacket(byte cmd, int sessionId, InetSocketAddress sender) {
public UDPPacket(Command cmd, int sessionId, InetSocketAddress sender) {
super(cmd, sessionId);
this.sender = sender;
}

public UDPPacket(Command cmd, InetSocketAddress sender) {
public UDPPacket(byte cmd) {
super(cmd);
this.sender = sender;
}

public UDPPacket(Command cmd, int sessionId, InetSocketAddress sender) {
public UDPPacket(Command cmd) {
super(cmd);
}

public UDPPacket(Command cmd, int sessionId) {
super(cmd, sessionId);
this.sender = sender;
}

@Override
public InetSocketAddress sender() {
return sender;
}

@Override
public void sender(InetSocketAddress sender) {
this.sender = sender;
}

@Override
public Packet response(Command command) {
return new UDPPacket(command, sessionId, sender);
Expand Down
25 changes: 17 additions & 8 deletions mpush-api/src/main/java/com/mpush/api/service/BaseService.java
Expand Up @@ -20,6 +20,7 @@
package com.mpush.api.service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -40,13 +41,15 @@ public boolean isRunning() {
return started.get();
}

protected void tryStart(Listener listener, Function function) {
listener = wrap(listener);
protected void tryStart(Listener l, Function function) {
FutureListener listener = wrap(l);
if (started.compareAndSet(false, true)) {
try {
init();
function.apply(listener);
listener.onSuccess(String.format("service %s start success", this.getClass().getSimpleName()));
if (!listener.isDone()) {
listener.onSuccess(String.format("service %s start success", this.getClass().getSimpleName()));
}
} catch (Throwable e) {
listener.onFailure(e);
throw new ServiceException(e);
Expand All @@ -56,12 +59,14 @@ protected void tryStart(Listener listener, Function function) {
}
}

protected void tryStop(Listener listener, Function function) {
listener = wrap(listener);
protected void tryStop(Listener l, Function function) {
FutureListener listener = wrap(l);
if (started.compareAndSet(true, false)) {
try {
function.apply(listener);
listener.onSuccess(String.format("service %s stop success", this.getClass().getSimpleName()));
if (!listener.isDone()) {
listener.onSuccess(String.format("service %s stop success", this.getClass().getSimpleName()));
}
} catch (Throwable e) {
listener.onFailure(e);
throw new ServiceException(e);
Expand Down Expand Up @@ -93,9 +98,13 @@ public void stop(Listener listener) {
tryStop(listener, this::doStop);
}

protected abstract void doStart(Listener listener) throws Throwable;
protected void doStart(Listener listener) throws Throwable {

protected abstract void doStop(Listener listener) throws Throwable;
}

protected void doStop(Listener listener) throws Throwable {

}

protected interface Function {
void apply(Listener l) throws Throwable;
Expand Down
5 changes: 3 additions & 2 deletions mpush-api/src/main/java/com/mpush/api/spi/Factory.java
Expand Up @@ -19,11 +19,12 @@

package com.mpush.api.spi;

import java.util.function.Supplier;

/**
* Created by yxx on 2016/5/18.
*
* @author ohun@live.cn
*/
public interface Factory<T> {
T get();
public interface Factory<T> extends Supplier<T> {
}
Expand Up @@ -20,7 +20,12 @@
package com.mpush.client.gateway;

import com.mpush.api.connection.Connection;
import com.mpush.api.protocol.Command;
import com.mpush.api.service.Listener;
import com.mpush.client.gateway.handler.GatewayClientChannelHandler;
import com.mpush.client.gateway.handler.GatewayErrorHandler;
import com.mpush.client.gateway.handler.GatewayOKHandler;
import com.mpush.common.MessageDispatcher;
import com.mpush.netty.client.NettyTCPClient;
import com.mpush.tools.thread.NamedPoolThreadFactory;
import io.netty.channel.ChannelHandler;
Expand All @@ -39,12 +44,16 @@
* @author ohun@live.cn
*/
public class GatewayClient extends NettyTCPClient {
private final GatewayClientChannelHandler handler = new GatewayClientChannelHandler();
private final GatewayClientChannelHandler handler;
private GlobalChannelTrafficShapingHandler trafficShapingHandler;
private ScheduledExecutorService trafficShapingExecutor;

public GatewayClient(String host, int port) {
super(host, port);
MessageDispatcher dispatcher = new MessageDispatcher();
dispatcher.register(Command.OK, new GatewayOKHandler());
dispatcher.register(Command.ERROR, new GatewayErrorHandler());
this.handler = new GatewayClientChannelHandler(dispatcher);
if (enabled) {
trafficShapingExecutor = Executors.newSingleThreadScheduledExecutor(new NamedPoolThreadFactory(T_TRAFFIC_SHAPING));
trafficShapingHandler = new GlobalChannelTrafficShapingHandler(
Expand Down

This file was deleted.

Expand Up @@ -17,38 +17,36 @@
* ohun@live.cn (夜色)
*/

package com.mpush.client.gateway;
package com.mpush.client.gateway.connection;

import com.mpush.api.connection.Connection;
import com.mpush.api.service.Listener;
import com.mpush.common.message.BaseMessage;
import com.mpush.common.message.gateway.GatewayPushMessage;
import com.mpush.zk.cache.ZKServerNodeCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* Created by yxx on 2016/5/17.
*
* @author ohun@live.cn
*/
public abstract class GatewayConnectionFactory<T> extends ZKServerNodeCache {
public abstract class GatewayConnectionFactory extends ZKServerNodeCache {

final Logger logger = LoggerFactory.getLogger(this.getClass());

public void init() {

public void init(Listener listener) {
}

abstract public Connection getConnection(String ip);

abstract public T getNode(String ip);

abstract public Collection<T> getAllNode();

abstract public boolean send(String host, Consumer<GatewayPushMessage> consumer);
abstract public <M extends BaseMessage> Function<String, Void> send(Function<Connection, M> creator, Function<M, Void> sender);

abstract public void broadcast(Consumer<GatewayPushMessage> consumer);
abstract public <M extends BaseMessage> void broadcast(Function<Connection, M> creator, Consumer<M> sender);

}

0 comments on commit 36cb570

Please sign in to comment.