Skip to content

Commit

Permalink
增加Message接口和MessageType
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Sep 16, 2018
1 parent 0710ac7 commit 3116bab
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 40 deletions.
Expand Up @@ -152,20 +152,22 @@ public void connect(InetSocketAddress inetSocketAddress, String host, int port,
bootstrap.connect(host, port).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
SocketChannel ch = (SocketChannel) future.channel();
NettyWritableChannel channel = new NettyWritableChannel(ch);
AsyncConnection conn;
if (connectionManager != null) {
conn = connectionManager.createConnection(channel, false);
} else {
conn = new TcpConnection(channel, nettyNetClient);
if (future.isSuccess()) {
SocketChannel ch = (SocketChannel) future.channel();
NettyWritableChannel channel = new NettyWritableChannel(ch);
AsyncConnection conn;
if (connectionManager != null) {
conn = connectionManager.createConnection(channel, false);
} else {
conn = new TcpConnection(channel, nettyNetClient);
}
ChannelPipeline p = ch.pipeline();
p.addLast(new NettyClientHandler(connectionManager, conn));
InetSocketAddress remoteAddress = ch.remoteAddress();
conn.setHostAndPort(remoteAddress.getHostName() + ":" + remoteAddress.getPort());
conn.setInetSocketAddress(inetSocketAddress);
asyncConnections.put(inetSocketAddress, conn);
}
ChannelPipeline p = ch.pipeline();
p.addLast(new NettyClientHandler(connectionManager, conn));
InetSocketAddress remoteAddress = ch.remoteAddress();
conn.setHostAndPort(remoteAddress.getHostName() + ":" + remoteAddress.getPort());
conn.setInetSocketAddress(inetSocketAddress);
asyncConnections.put(inetSocketAddress, conn);
latch.countDown();
}
});
Expand Down
14 changes: 13 additions & 1 deletion lealone-p2p/src/main/java/org/lealone/p2p/gms/EchoMessage.java
Expand Up @@ -22,8 +22,10 @@
import java.io.IOException;

import org.lealone.p2p.net.IVersionedSerializer;
import org.lealone.p2p.net.Message;
import org.lealone.p2p.net.MessageType;

public class EchoMessage {
public class EchoMessage implements Message<EchoMessage> {
public static final IVersionedSerializer<EchoMessage> serializer = new EchoMessageSerializer();

private static class EchoMessageSerializer implements IVersionedSerializer<EchoMessage> {
Expand All @@ -36,4 +38,14 @@ public EchoMessage deserialize(DataInput in, int version) throws IOException {
return new EchoMessage();
}
}

@Override
public MessageType getType() {
return MessageType.ECHO;
}

@Override
public IVersionedSerializer<EchoMessage> getSerializer() {
return serializer;
}
}
Expand Up @@ -30,8 +30,7 @@ public class EchoVerbHandler implements IVerbHandler<EchoMessage> {

@Override
public void doVerb(MessageIn<EchoMessage> message, int id) {
MessageOut<EchoMessage> echoMessage = new MessageOut<>(Verb.REQUEST_RESPONSE,
new EchoMessage(), EchoMessage.serializer);
MessageOut<EchoMessage> echoMessage = new MessageOut<>(Verb.REQUEST_RESPONSE, new EchoMessage());
if (logger.isTraceEnabled())
logger.trace("Sending a EchoMessage reply {}", message.from);
MessagingService.instance().sendReply(echoMessage, id, message.from);
Expand Down
Expand Up @@ -27,12 +27,14 @@
import org.lealone.net.NetEndpoint;
import org.lealone.p2p.net.CompactEndpointSerializationHelper;
import org.lealone.p2p.net.IVersionedSerializer;
import org.lealone.p2p.net.Message;
import org.lealone.p2p.net.MessageType;

/**
* This ack gets sent out as a result of the receipt of a GossipDigestSynMessage by an
* endpoint. This is the 2 stage of the 3 way messaging in the Gossip protocol.
*/
public class GossipDigestAck {
public class GossipDigestAck implements Message<GossipDigestAck> {
public static final IVersionedSerializer<GossipDigestAck> serializer = new GossipDigestAckSerializer();

final List<GossipDigest> gDigestList;
Expand All @@ -51,6 +53,16 @@ Map<NetEndpoint, EndpointState> getEndpointStateMap() {
return epStateMap;
}

@Override
public MessageType getType() {
return MessageType.GOSSIP_DIGEST_ACK;
}

@Override
public IVersionedSerializer<GossipDigestAck> getSerializer() {
return serializer;
}

private static class GossipDigestAckSerializer implements IVersionedSerializer<GossipDigestAck> {
@Override
public void serialize(GossipDigestAck gDigestAckMessage, DataOutput out, int version) throws IOException {
Expand Down
Expand Up @@ -26,12 +26,14 @@
import org.lealone.net.NetEndpoint;
import org.lealone.p2p.net.CompactEndpointSerializationHelper;
import org.lealone.p2p.net.IVersionedSerializer;
import org.lealone.p2p.net.Message;
import org.lealone.p2p.net.MessageType;

/**
* This ack gets sent out as a result of the receipt of a GossipDigestAckMessage. This the
* last stage of the 3 way messaging of the Gossip protocol.
*/
public class GossipDigestAck2 {
public class GossipDigestAck2 implements Message<GossipDigestAck2> {
public static final IVersionedSerializer<GossipDigestAck2> serializer = new GossipDigestAck2Serializer();

final Map<NetEndpoint, EndpointState> epStateMap;
Expand All @@ -44,6 +46,16 @@ Map<NetEndpoint, EndpointState> getEndpointStateMap() {
return epStateMap;
}

@Override
public MessageType getType() {
return MessageType.GOSSIP_DIGEST_ACK2;
}

@Override
public IVersionedSerializer<GossipDigestAck2> getSerializer() {
return serializer;
}

private static class GossipDigestAck2Serializer implements IVersionedSerializer<GossipDigestAck2> {
@Override
public void serialize(GossipDigestAck2 ack2, DataOutput out, int version) throws IOException {
Expand Down
Expand Up @@ -75,7 +75,7 @@ public void doVerb(MessageIn<GossipDigestAck> message, int id) {
}

MessageOut<GossipDigestAck2> gDigestAck2Message = new MessageOut<>(Verb.GOSSIP_DIGEST_ACK2,
new GossipDigestAck2(deltaEpStateMap), GossipDigestAck2.serializer);
new GossipDigestAck2(deltaEpStateMap));
if (logger.isTraceEnabled())
logger.trace("Sending a GossipDigestAck2Message to {}", from);
MessagingService.instance().sendOneWay(gDigestAck2Message, from);
Expand Down
Expand Up @@ -23,12 +23,14 @@
import java.util.List;

import org.lealone.p2p.net.IVersionedSerializer;
import org.lealone.p2p.net.Message;
import org.lealone.p2p.net.MessageType;

/**
* This is the first message that gets sent out as a start of the Gossip protocol in a
* round.
*/
public class GossipDigestSyn {
public class GossipDigestSyn implements Message<GossipDigestSyn> {
public static final IVersionedSerializer<GossipDigestSyn> serializer = new GossipDigestSynSerializer();

final String clusterId;
Expand All @@ -43,6 +45,16 @@ List<GossipDigest> getGossipDigests() {
return gDigests;
}

@Override
public MessageType getType() {
return MessageType.GOSSIP_DIGEST_SYN;
}

@Override
public IVersionedSerializer<GossipDigestSyn> getSerializer() {
return serializer;
}

private static class GossipDigestSynSerializer implements IVersionedSerializer<GossipDigestSyn> {
@Override
public void serialize(GossipDigestSyn gDigestSynMessage, DataOutput out, int version) throws IOException {
Expand Down
Expand Up @@ -73,7 +73,7 @@ public void doVerb(MessageIn<GossipDigestSyn> message, int id) {
if (logger.isTraceEnabled())
logger.trace("sending {} digests and {} deltas", deltaGossipDigestList.size(), deltaEpStateMap.size());
MessageOut<GossipDigestAck> gDigestAckMessage = new MessageOut<>(Verb.GOSSIP_DIGEST_ACK,
new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap), GossipDigestAck.serializer);
new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap));
if (logger.isTraceEnabled())
logger.trace("Sending a GossipDigestAckMessage to {}", from);
MessagingService.instance().sendOneWay(gDigestAckMessage, from);
Expand Down
11 changes: 4 additions & 7 deletions lealone-p2p/src/main/java/org/lealone/p2p/gms/Gossiper.java
Expand Up @@ -159,8 +159,7 @@ public void run() {

if (gDigests.size() > 0) {
GossipDigestSyn digestSynMessage = new GossipDigestSyn(ConfigDescriptor.getClusterName(), gDigests);
MessageOut<GossipDigestSyn> message = new MessageOut<>(Verb.GOSSIP_DIGEST_SYN,
digestSynMessage, GossipDigestSyn.serializer);
MessageOut<GossipDigestSyn> message = new MessageOut<>(Verb.GOSSIP_DIGEST_SYN, digestSynMessage);
/* Gossip to some random live member */
boolean gossipedToSeed = doGossipToLiveMember(message);

Expand Down Expand Up @@ -803,8 +802,7 @@ void notifyFailureDetector(NetEndpoint endpoint, EndpointState remoteEndpointSta
private void markAlive(final NetEndpoint addr, final EndpointState localState) {
localState.markDead();

MessageOut<EchoMessage> echoMessage = new MessageOut<>(Verb.ECHO, new EchoMessage(),
EchoMessage.serializer);
MessageOut<EchoMessage> echoMessage = new MessageOut<>(Verb.ECHO, new EchoMessage());
logger.trace("Sending a EchoMessage to {}", addr);
IAsyncCallback<Void> echoHandler = new IAsyncCallback<Void>() {
@Override
Expand Down Expand Up @@ -1081,8 +1079,7 @@ public void doShadowRound() {
// send a completely empty syn
List<GossipDigest> gDigests = new ArrayList<>();
GossipDigestSyn digestSynMessage = new GossipDigestSyn(ConfigDescriptor.getClusterName(), gDigests);
MessageOut<GossipDigestSyn> message = new MessageOut<>(Verb.GOSSIP_DIGEST_SYN,
digestSynMessage, GossipDigestSyn.serializer);
MessageOut<GossipDigestSyn> message = new MessageOut<>(Verb.GOSSIP_DIGEST_SYN, digestSynMessage);
inShadowRound = true;
for (NetEndpoint seed : seeds)
MessagingService.instance().sendOneWay(message, seed);
Expand Down Expand Up @@ -1166,7 +1163,7 @@ public void stop() {
scheduledGossipTask.cancel(false);
logger.info("Announcing shutdown");
Uninterruptibles.sleepUninterruptibly(INTERVAL_IN_MILLIS * 2, TimeUnit.MILLISECONDS);
MessageOut<Void> message = new MessageOut<>(Verb.GOSSIP_SHUTDOWN);
MessageOut<?> message = new MessageOut<>(Verb.GOSSIP_SHUTDOWN);
for (NetEndpoint ep : liveEndpoints)
MessagingService.instance().sendOneWay(message, ep);
}
Expand Down
26 changes: 26 additions & 0 deletions lealone-p2p/src/main/java/org/lealone/p2p/net/Message.java
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lealone.p2p.net;

public interface Message<T> {

MessageType getType();

IVersionedSerializer<T> getSerializer();

}
21 changes: 9 additions & 12 deletions lealone-p2p/src/main/java/org/lealone/p2p/net/MessageOut.java
Expand Up @@ -28,39 +28,36 @@
import org.lealone.p2p.concurrent.Stage;
import org.lealone.p2p.config.ConfigDescriptor;

public class MessageOut<T> {
public class MessageOut<T extends Message<T>> {
public final NetEndpoint from;
public final Verb verb;
public final T payload;
public final IVersionedSerializer<T> serializer;
public final Map<String, byte[]> parameters;

// we do support messages that just consist of a verb
public MessageOut(Verb verb) {
this(verb, null, null);
this(verb, null);
}

public MessageOut(Verb verb, T payload, IVersionedSerializer<T> serializer) {
this(verb, payload, serializer, Collections.<String, byte[]> emptyMap());
public MessageOut(Verb verb, T payload) {
this(verb, payload, Collections.<String, byte[]> emptyMap());
}

private MessageOut(Verb verb, T payload, IVersionedSerializer<T> serializer, Map<String, byte[]> parameters) {
this(ConfigDescriptor.getLocalEndpoint(), verb, payload, serializer, parameters);
private MessageOut(Verb verb, T payload, Map<String, byte[]> parameters) {
this(ConfigDescriptor.getLocalEndpoint(), verb, payload, parameters);
}

public MessageOut(NetEndpoint from, Verb verb, T payload, IVersionedSerializer<T> serializer,
Map<String, byte[]> parameters) {
public MessageOut(NetEndpoint from, Verb verb, T payload, Map<String, byte[]> parameters) {
this.from = from;
this.verb = verb;
this.payload = payload;
this.serializer = serializer;
this.parameters = parameters;
}

public MessageOut<T> withParameter(String key, byte[] value) {
HashMap<String, byte[]> map = new HashMap<>(parameters);
map.put(key, value);
return new MessageOut<T>(verb, payload, serializer, map);
return new MessageOut<T>(verb, payload, map);
}

public Stage getStage() {
Expand Down Expand Up @@ -95,7 +92,7 @@ public int serialize(Transfer transfer, DataOutput out, int version) throws IOEx
// out.writeInt((int) longSize);
out.writeInt(0); // 写设为0,会回填
if (payload != null)
serializer.serialize(payload, out, version);
payload.getSerializer().serialize(payload, out, version);
return payloadStartPos;
}
}
36 changes: 36 additions & 0 deletions lealone-p2p/src/main/java/org/lealone/p2p/net/MessageType.java
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lealone.p2p.net;

import org.lealone.p2p.gms.EchoMessage;
import org.lealone.p2p.gms.GossipDigestAck;
import org.lealone.p2p.gms.GossipDigestAck2;
import org.lealone.p2p.gms.GossipDigestSyn;

public enum MessageType {
GOSSIP_DIGEST_SYN(GossipDigestSyn.serializer),
GOSSIP_DIGEST_ACK(GossipDigestAck.serializer),
GOSSIP_DIGEST_ACK2(GossipDigestAck2.serializer),
ECHO(EchoMessage.serializer);

public final IVersionedSerializer<?> serializer;

private MessageType(IVersionedSerializer<?> serializer) {
this.serializer = serializer;
}
}

0 comments on commit 3116bab

Please sign in to comment.