本项目是对网络通信框架netty、akka的封装,方便使用。
- 序列化与反序列化器
- 基础通信模型
- 自动断连和重连
- 心跳
- 自定义通讯协议
- 自定义业务处理器
- 异步业务线程
<dependency>
<groupId>io.github.pleuvoir</groupId>
<artifactId>pikaq</artifactId>
<version>${latest.version}</version>
</dependency>
项目中定义的通信基本单元为RemoteCommand
,业务方可通过继承RemoteBaseCommand
实现自己的远程命令,为了方便起见目前加载全路径下实现类,只需要提供实现类即可。如:
public class RpcRequest extends RemoteBaseCommand{
@Override
public boolean responsible() {
return true;
}
@Override
public int requestCode() {
return 55;
}
@Override
public RemotingCommandType remotingCommandType() {
return RemotingCommandType.REQUEST_COMMAND;
}
}
public class RpcResponse extends RemoteBaseCommand {
@Getter
@Setter
private String payload;
@Override
public boolean responsible() {
return false;
}
@Override
public int requestCode() {
return -55;
}
@Override
public RemotingCommandType remotingCommandType() {
return RemotingCommandType.RESPONSE_COMMAND;
}
}
注意,消息类型决定了接收到消息后的处理方式。当RemotingCommandType
为REQUEST_COMMAND
时,处理器会进行异步处理,如果responsible
为true
,那么处理器会将返回的结果冲刷到远程对端节点;当RemotingCommandType
为RESPONSE_COMMAND
时,一般而言代表之前的一次请求命令现在响应过来了,如果之前的请求命令有回调会异步执行回调并结束此次流程。
框架通过requestCode
匹配到对应的处理器。所有的指令处理都通过实现RemotingRequestProcessor
接口进行处理,RemotingServer
提供了方法进行注册。
async、sync、oneway
消息处理使用Akka作为业务线程池进行异步处理。
public static void main(String[] args) {
SimpleServer simpleServer = new SimpleServer(ServerConfig.create(8888));
// 注册业务处理器 requestCode=55
simpleServer.registerHandler(55, new RemotingRequestProcessor<RpcRequest, RpcResponse>() {
@Override
public RpcResponse handler(ChannelHandlerContext ctx, RpcRequest request) {
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setPayload("hello rpc");
return rpcResponse;
}
});
simpleServer.start();
}
public static void main(String[] args) throws RemotingSendRequestException, RemotingTimeoutException {
//设置为0则关闭了心跳
SimpleClient simpleClient = new SimpleClient(ClientConfig.create().heartbeatIntervalSeconds(0).build());
String addr = "127.0.0.1:8888";
String addr2 = "127.0.0.1:8888";
// 批量连接多个不同的地址,连接过程中会进行重试,一般是为了预先连接而使用;如果有一个连接失败则停止
simpleClient.connectWithRetry(addr, addr2);
RpcRequest rpcRequest = new RpcRequest();
// onway
simpleClient.invokeOneway(addr, rpcRequest);
// 同步调用
RemotingCommand response = simpleClient.invokeSync(addr, rpcRequest, 1000);
System.out.println(response.toJSON());
// 异步回调
simpleClient.invokeAsync(addr, rpcRequest, new InvokeCallback() {
@Override
public void onRequestException(RemotingFuture remotingFuture) {
System.err.println("onRequestException .. " + remotingFuture.getBeginTimestamp());
}
@Override
public void onReceiveResponse(RemotingFuture remotingFuture) {
System.out.println("onReceiveResponse .. " + remotingFuture.getResponseCommand());
}
});
simpleClient.shutdown();
}