Skip to content

Commit

Permalink
简化MessagingService协议,不需要handshake
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed May 5, 2015
1 parent c707ddc commit 15fba58
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 102 deletions.
Expand Up @@ -44,14 +44,48 @@ class IncomingTcpConnection extends Thread {

private final int version;
private final boolean compressed;
private final InetAddress from;
private final Socket socket;
private InetAddress from;
private final DataInputStream in;
private final DataOutputStream out;

IncomingTcpConnection(Socket socket) throws IOException {
super("IncomingTcpConnection-" + socket.getInetAddress());

IncomingTcpConnection(int version, boolean compressed, Socket socket) {
assert socket != null;
this.version = version;
this.compressed = compressed;
this.socket = socket;

socket.setKeepAlive(true);
socket.setSoTimeout(2 * OutboundTcpConnection.WAIT_FOR_VERSION_MAX_TIME);
// determine the connection type to decide whether to buffer
DataInputStream in = new DataInputStream(socket.getInputStream());

//read header
MessagingService.validateMagic(in.readInt());
version = in.readInt();
compressed = in.readBoolean();
from = CompactEndpointSerializationHelper.deserialize(in);

if (logger.isDebugEnabled())
logger.debug("Connection version {} from {}", version, socket.getInetAddress());
socket.setSoTimeout(0);

MessagingService.instance().setVersion(from, version);

out = new DataOutputStream(socket.getOutputStream());
out.writeInt(MessagingService.CURRENT_VERSION);
out.flush();

if (compressed) {
if (logger.isDebugEnabled())
logger.debug("Upgrading incoming connection to be compressed");
LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor();
Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(OutboundTcpConnection.LZ4_HASH_SEED)
.asChecksum();
this.in = new DataInputStream(new LZ4BlockInputStream(socket.getInputStream(), decompressor, checksum));
} else {
this.in = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096));
}

if (DatabaseDescriptor.getInternodeRecvBufferSize() != null) {
try {
this.socket.setReceiveBufferSize(DatabaseDescriptor.getInternodeRecvBufferSize());
Expand Down Expand Up @@ -92,31 +126,6 @@ private void close() {
}

private void receiveMessages() throws IOException {
// handshake (true) endpoint versions
DataOutputStream out = new DataOutputStream(socket.getOutputStream());
out.writeInt(MessagingService.CURRENT_VERSION);
out.flush();
DataInputStream in = new DataInputStream(socket.getInputStream());
int maxVersion = in.readInt();

from = CompactEndpointSerializationHelper.deserialize(in);
// record the (true) version of the endpoint
MessagingService.instance().setVersion(from, maxVersion);
if (logger.isDebugEnabled())
logger.debug("Set version for {} to {} (will use {})", from, maxVersion, MessagingService.instance()
.getVersion(from));

if (compressed) {
if (logger.isDebugEnabled())
logger.debug("Upgrading incoming connection to be compressed");
LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor();
Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(OutboundTcpConnection.LZ4_HASH_SEED)
.asChecksum();
in = new DataInputStream(new LZ4BlockInputStream(socket.getInputStream(), decompressor, checksum));
} else {
in = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096));
}

if (version > MessagingService.CURRENT_VERSION) {
// save the endpoint so gossip will reconnect to it
Gossiper.instance.addSavedEndpoint(from);
Expand All @@ -125,22 +134,22 @@ private void receiveMessages() throws IOException {
}

while (true) {
receiveMessage(in);
receiveMessage();
}
}

//对应OutboundTcpConnection.sendMessage
private void receiveMessage(DataInputStream input) throws IOException {
MessagingService.validateMagic(input.readInt());
int id = input.readInt();
private void receiveMessage() throws IOException {
MessagingService.validateMagic(in.readInt());
int id = in.readInt();

long timestamp = System.currentTimeMillis();
// make sure to readInt, even if cross_node_to is not enabled
int partial = input.readInt();
int partial = in.readInt();
if (DatabaseDescriptor.hasCrossNodeTimeout())
timestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);

MessageIn<?> message = MessageIn.read(input, version, id);
MessageIn<?> message = MessageIn.read(in, version, id);
if (message != null) {
Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
LealoneExecutorService stage = StageManager.getStage(message.getMessageType());
Expand Down
Expand Up @@ -18,7 +18,6 @@
package org.lealone.cluster.net;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOError;
import java.io.IOException;
import java.lang.management.ManagementFactory;
Expand Down Expand Up @@ -400,21 +399,7 @@ public void run() {
continue;
}

socket.setKeepAlive(true);
socket.setSoTimeout(2 * OutboundTcpConnection.WAIT_FOR_VERSION_MAX_TIME);
// determine the connection type to decide whether to buffer
DataInputStream in = new DataInputStream(socket.getInputStream());

//read header
MessagingService.validateMagic(in.readInt());
int version = in.readInt();
boolean compressionEnabled = in.readBoolean();

if (logger.isDebugEnabled())
logger.debug("Connection version {} from {}", version, socket.getInetAddress());
socket.setSoTimeout(0);

new IncomingTcpConnection(version, compressionEnabled, socket).start();
new IncomingTcpConnection(socket).start();

} catch (AsynchronousCloseException e) {
if (logger.isDebugEnabled())
Expand Down
Expand Up @@ -29,10 +29,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Checksum;

Expand Down Expand Up @@ -64,7 +62,6 @@ class OutboundTcpConnection extends Thread {
static final int WAIT_FOR_VERSION_MAX_TIME = 5000;

private static final int OPEN_RETRY_DELAY = 100; // ms between retries
private static final int NO_VERSION = Integer.MIN_VALUE;

private static boolean isLocalDC(InetAddress targetHost) {
String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost);
Expand All @@ -90,7 +87,7 @@ private static boolean isLocalDC(InetAddress targetHost) {
private ConnectionMetrics metrics;

OutboundTcpConnection(InetAddress remoteEndpoint) {
super("WRITE-" + remoteEndpoint);
super("OutboundTcpConnection-" + remoteEndpoint);
this.remoteEndpoint = remoteEndpoint;
resetEndpoint = ClusterMetaData.getPreferredIP(remoteEndpoint);
metrics = new ConnectionMetrics(remoteEndpoint);
Expand Down Expand Up @@ -292,22 +289,12 @@ private boolean connect() {
out.writeInt(MessagingService.PROTOCOL_MAGIC);
out.writeInt(targetVersion);
out.writeBoolean(shouldCompressConnection());
CompactEndpointSerializationHelper.serialize(Utils.getBroadcastAddress(), out);
out.flush();

DataInputStream in = new DataInputStream(socket.getInputStream());
int maxTargetVersion = handshakeVersion(in);
if (maxTargetVersion == NO_VERSION) {
// no version is returned, so disconnect an try again: we will either get
// a different target version (targetVersion < MessagingService.VERSION_12)
// or if the same version the handshake will finally succeed
if (logger.isDebugEnabled())
logger.debug("Target max version is {}; no version information yet, will retry",
maxTargetVersion);
disconnect();
continue;
} else {
MessagingService.instance().setVersion(remoteEndpoint, maxTargetVersion);
}
int maxTargetVersion = in.readInt();
MessagingService.instance().setVersion(remoteEndpoint, maxTargetVersion);

if (targetVersion > maxTargetVersion) {
if (logger.isDebugEnabled())
Expand All @@ -324,10 +311,7 @@ private boolean connect() {
softCloseSocket();
}

out.writeInt(MessagingService.CURRENT_VERSION);
CompactEndpointSerializationHelper.serialize(Utils.getBroadcastAddress(), out);
if (shouldCompressConnection()) {
out.flush();
if (logger.isTraceEnabled())
logger.trace("Upgrading OutputStream to be compressed");
// TODO: custom LZ4 OS that supports BB write methods
Expand All @@ -350,36 +334,6 @@ private boolean connect() {
return false;
}

private int handshakeVersion(final DataInputStream inputStream) {
final AtomicInteger version = new AtomicInteger(NO_VERSION);
final CountDownLatch versionLatch = new CountDownLatch(1);
new Thread("HANDSHAKE-" + remoteEndpoint) {
@Override
public void run() {
try {
logger.info("Handshaking version with {}", remoteEndpoint);
version.set(inputStream.readInt());
} catch (IOException ex) {
final String msg = "Cannot handshake version with " + remoteEndpoint;
if (logger.isTraceEnabled())
logger.trace(msg, ex);
else
logger.info(msg);
} finally {
//unblock the waiting thread on either success or fail
versionLatch.countDown();
}
}
}.start();

try {
versionLatch.await(WAIT_FOR_VERSION_MAX_TIME, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
throw new AssertionError(ex);
}
return version.get();
}

private void expireMessages() {
Iterator<QueuedMessage> iter = backlog.iterator();
while (iter.hasNext()) {
Expand Down

0 comments on commit 15fba58

Please sign in to comment.