Skip to content

Commit

Permalink
调用receiveMessage不需要返回值,不需要传递版本号参数
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed May 4, 2015
1 parent 256cf94 commit 33bbf87
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 33 deletions.
Expand Up @@ -32,6 +32,8 @@
import net.jpountz.lz4.LZ4FastDecompressor;
import net.jpountz.xxhash.XXHashFactory;

import org.lealone.cluster.concurrent.LealoneExecutorService;
import org.lealone.cluster.concurrent.StageManager;
import org.lealone.cluster.config.DatabaseDescriptor;
import org.lealone.cluster.gms.Gossiper;
import org.slf4j.Logger;
Expand Down Expand Up @@ -80,6 +82,15 @@ public void run() {
}
}

private void close() {
try {
socket.close();
} catch (IOException e) {
if (logger.isDebugEnabled())
logger.debug("Error closing socket", e);
}
}

private void receiveMessages() throws IOException {
// handshake (true) endpoint versions
DataOutputStream out = new DataOutputStream(socket.getOutputStream());
Expand Down Expand Up @@ -115,12 +126,13 @@ private void receiveMessages() throws IOException {
// outbound side will reconnect if necessary to upgrade version

while (true) {
MessagingService.validateMagic(in.readInt());
receiveMessage(in, version);
receiveMessage(in);
}
}

private InetAddress receiveMessage(DataInputStream input, int version) throws IOException {
//对应OutboundTcpConnection.sendMessage
private void receiveMessage(DataInputStream input) throws IOException {
MessagingService.validateMagic(input.readInt());
int id = input.readInt();

long timestamp = System.currentTimeMillis();
Expand All @@ -130,25 +142,13 @@ private InetAddress receiveMessage(DataInputStream input, int version) throws IO
timestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);

MessageIn<?> message = MessageIn.read(input, version, id);
if (message == null) {
// callback expired; nothing to do
return null;
}
if (version <= MessagingService.current_version) {
MessagingService.instance().receive(message, id, timestamp);
} else {
if (logger.isDebugEnabled())
logger.debug("Received connection from newer protocol version {}. Ignoring message", version);
}
return message.from;
}

private void close() {
try {
socket.close();
} catch (IOException e) {
if (logger.isDebugEnabled())
logger.debug("Error closing socket", e);
if (message != null) {
Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
LealoneExecutorService stage = StageManager.getStage(message.getMessageType());
assert stage != null : "No stage for message type " + message.verb;
stage.execute(runnable);
}
// message == null
// callback expired; nothing to do
}
}
Expand Up @@ -46,7 +46,6 @@
import javax.management.ObjectName;

import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.lealone.cluster.concurrent.LealoneExecutorService;
import org.lealone.cluster.concurrent.ScheduledExecutors;
import org.lealone.cluster.concurrent.Stage;
import org.lealone.cluster.concurrent.StageManager;
Expand Down Expand Up @@ -507,14 +506,6 @@ public void shutdown() {
}
}

public void receive(MessageIn message, int id, long timestamp) {
Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
LealoneExecutorService stage = StageManager.getStage(message.getMessageType());
assert stage != null : "No stage for message type " + message.verb;

stage.execute(runnable);
}

public void setCallbackForTests(int messageId, CallbackInfo callback) {
callbacks.put(messageId, callback);
}
Expand Down
Expand Up @@ -171,7 +171,7 @@ private boolean shouldCompressConnection() {

private void writeConnected(QueuedMessage qm, boolean flush) {
try {
writeInternal(qm.message, qm.id, qm.timestamp);
sendMessage(qm.message, qm.id, qm.timestamp);

completed++;
if (flush)
Expand All @@ -198,7 +198,7 @@ private void writeConnected(QueuedMessage qm, boolean flush) {
}
}

private void writeInternal(MessageOut<?> message, int id, long timestamp) throws IOException {
private void sendMessage(MessageOut<?> message, int id, long timestamp) throws IOException {
out.writeInt(MessagingService.PROTOCOL_MAGIC);
out.writeInt(id);

Expand Down

0 comments on commit 33bbf87

Please sign in to comment.