Skip to content

Commit

Permalink
延后注册P2pConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Sep 17, 2018
1 parent f980460 commit 9826f24
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 26 deletions.
Expand Up @@ -95,6 +95,7 @@ public TcpConnection(WritableChannel writableChannel, boolean isServer) {
public TcpConnection(WritableChannel writableChannel, NetClient netClient) {
super(writableChannel, false);
this.netClient = netClient;
setHostAndPort(writableChannel.getHost() + ":" + writableChannel.getPort());
}

public int getNextId() {
Expand Down
Expand Up @@ -163,8 +163,6 @@ public void operationComplete(ChannelFuture future) throws Exception {
}
ChannelPipeline p = ch.pipeline();
p.addLast(new NettyClientHandler(connectionManager, conn));
InetSocketAddress remoteAddress = ch.remoteAddress();
conn.setHostAndPort(remoteAddress.getHostName() + ":" + remoteAddress.getPort());
conn.setInetSocketAddress(inetSocketAddress);
asyncConnections.put(inetSocketAddress, conn);
}
Expand Down
42 changes: 21 additions & 21 deletions lealone-p2p/src/main/java/org/lealone/p2p/net/MessagingService.java
Expand Up @@ -303,27 +303,27 @@ public void sendOneWay(MessageOut message, int id, NetEndpoint to) {

public P2pConnection getConnection(NetEndpoint remoteEndpoint) {
remoteEndpoint = ClusterMetaData.getPreferredIP(remoteEndpoint);
final String remoteHostAndPort = remoteEndpoint.getHostAndPort();
String remoteHostAndPort = remoteEndpoint.getHostAndPort();
P2pConnection conn = connections.get(remoteHostAndPort);
if (conn == null) {
synchronized (connections) {
conn = connections.get(remoteHostAndPort);
if (conn == null) {
Properties prop = new Properties();
prop.putAll(P2pServer.instance.getConfig());
NetFactory factory = NetFactoryManager.getFactory(P2pServer.instance.getConfig());
try {
conn = (P2pConnection) factory.getNetClient().createConnection(prop, remoteEndpoint, this);
String localHost = ConfigDescriptor.getLocalEndpoint().getHostAddress();
String localHostAndPort = localHost + ":" + remoteEndpoint.getPort();
conn.initTransfer(remoteEndpoint, remoteHostAndPort, localHostAndPort);
connections.put(remoteHostAndPort, conn);
} catch (Exception e) {
// TODO 是否不应该立刻移除节点
Gossiper.instance.removeEndpoint(remoteEndpoint);
logger.error("Failed to connect " + remoteEndpoint, e);
throw DbException.convert(e);
}
if (conn != null)
return conn;

Properties prop = new Properties();
prop.putAll(P2pServer.instance.getConfig());
NetFactory factory = NetFactoryManager.getFactory(P2pServer.instance.getConfig());
try {
conn = (P2pConnection) factory.getNetClient().createConnection(prop, remoteEndpoint, this);
String localHostAndPort = ConfigDescriptor.getLocalEndpoint().getHostAndPort();
conn.initTransfer(remoteEndpoint, remoteHostAndPort, localHostAndPort);
connections.put(remoteHostAndPort, conn);
} catch (Exception e) {
// TODO 是否不应该立刻移除节点
Gossiper.instance.removeEndpoint(remoteEndpoint);
logger.error("Failed to connect " + remoteEndpoint, e);
throw DbException.convert(e);
}
}
}
Expand Down Expand Up @@ -423,10 +423,10 @@ public void removeConnection(AsyncConnection conn) {

@Override
public P2pConnection createConnection(WritableChannel writableChannel, boolean isServer) {
P2pConnection conn = new P2pConnection(writableChannel, isServer);
conn.setHostAndPort(writableChannel.getHost() + ":" + writableChannel.getPort());
addConnection(conn);
return conn;
// 此时还不能把创建的连接放到connections中,
// 如果是服务器端的连接,需要等到执行完P2pConnection.readInitPacket后才加入connections
// 如果是客户端(也就是对等端)的连接,需要等到执行完P2pConnection.writeInitPacket后才加入connections
return new P2pConnection(writableChannel, isServer);
}

// --------------以下是MessagingServiceMBean的API实现-------------
Expand Down
Expand Up @@ -110,16 +110,17 @@ synchronized void initTransfer(NetEndpoint remoteEndpoint, String remoteHostAndP
transfer = new Transfer(this, writableChannel, (Session) null);
targetVersion = MessagingService.instance().getVersion(remoteEndpoint);
writeInitPacket(transfer, 0, targetVersion, localHostAndPort);
MessagingService.instance().addConnection(this);
}
}

private void writeInitPacket(Transfer transfer, int sessionId, int version, String hostAndPort) throws Exception {
transfer.writeRequestHeaderWithoutSessionId(sessionId, Session.SESSION_INIT);
private void writeInitPacket(Transfer transfer, int packetId, int version, String hostAndPort) throws Exception {
transfer.writeRequestHeaderWithoutSessionId(packetId, Session.SESSION_INIT);
transfer.writeInt(MessagingService.PROTOCOL_MAGIC);
transfer.writeInt(version);
transfer.writeString(hostAndPort);
AsyncCallback<Void> ac = new AsyncCallback<>();
transfer.addAsyncCallback(sessionId, ac);
transfer.addAsyncCallback(packetId, ac);
transfer.flush();
ac.await();
}
Expand Down

0 comments on commit 9826f24

Please sign in to comment.