Skip to content

Commit

Permalink
DDL语句通过第一个活跃的seed节点转发
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Jul 15, 2015
1 parent 7464d1e commit 2d68bfb
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 179 deletions.
Expand Up @@ -376,6 +376,10 @@ public static Set<InetAddress> getSeeds() {
return ImmutableSet.<InetAddress> builder().addAll(seedProvider.getSeeds()).build();
}

public static List<InetAddress> getSeedList() {
return seedProvider.getSeeds();
}

public static InetAddress getListenAddress() {
return listenAddress;
}
Expand Down
Expand Up @@ -88,7 +88,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean {
private static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN,
VersionedValue.REMOVED_TOKEN, VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE);

//Maximum difference in generation and version values we are willing to accept about a peer
// Maximum difference in generation and version values we are willing to accept about a peer
private static final long MAX_GENERATION_DIFFERENCE = 86400 * 365;
private static final int QUARANTINE_DELAY = StorageService.RING_DELAY * 2;
// half of QUARATINE_DELAY, to ensure justRemovedEndpoints has enough leeway to prevent re-gossip
Expand Down Expand Up @@ -140,7 +140,7 @@ private class GossipTask implements Runnable {
@Override
public void run() {
try {
//wait on messaging service to start listening
// wait on messaging service to start listening
MessagingService.instance().waitUntilListening();

taskLock.lock();
Expand Down Expand Up @@ -222,7 +222,7 @@ public void start(int generationNbr, Map<ApplicationState, VersionedValue> prelo
for (Map.Entry<ApplicationState, VersionedValue> entry : preloadLocalStates.entrySet())
localState.addApplicationState(entry.getKey(), entry.getValue());

//notify snitches that Gossiper is about to start
// notify snitches that Gossiper is about to start
DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
if (logger.isTraceEnabled())
logger.trace("gossip started with generation {}", localState.getHeartBeatState().getGeneration());
Expand Down Expand Up @@ -353,7 +353,7 @@ private void doStatusCheck() {
// if some new messages just arrived, give the executor some time to work on them
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);

// still behind? something's broke
// still behind? something's broke
if (lastProcessedMessageAt < now - 1000) {
logger.warn("Gossip stage has {} pending tasks; skipping status check (no nodes will be marked down)",
pending);
Expand All @@ -370,7 +370,7 @@ private void doStatusCheck() {
EndpointState epState = endpointStateMap.get(endpoint);
if (epState != null) {
// check if this is a fat client. fat clients are removed automatically from
// gossip after FatClientTimeout. Do not remove dead states here.
// gossip after FatClientTimeout. Do not remove dead states here.
if (isGossipOnlyMember(endpoint) && !justRemovedEndpoints.containsKey(endpoint)
&& TimeUnit.NANOSECONDS.toMillis(nowNano - epState.getUpdateTimestamp()) > FAT_CLIENT_TIMEOUT) {
logger.info("FatClient {} has been silent for {}ms, removing from gossip", endpoint,
Expand Down Expand Up @@ -871,7 +871,8 @@ private void realMarkAlive(final InetAddress addr, final EndpointState localStat
if (logger.isTraceEnabled())
logger.trace("marking as alive {}", addr);
localState.markAlive();
localState.updateTimestamp(); // prevents doStatusCheck from racing us and evicting if it was down > aVeryLongTime
localState.updateTimestamp(); // prevents doStatusCheck from racing us and evicting if it was down >
// aVeryLongTime
liveEndpoints.add(addr);
unreachableEndpoints.remove(addr);
expireTimeEndpointMap.remove(addr);
Expand Down Expand Up @@ -965,7 +966,7 @@ void applyStateLocally(Map<InetAddress, EndpointState> epStateMap) {
logger.trace("{} local generation {}, remote generation {}", ep, localGeneration, remoteGeneration);

if (localGeneration != 0 && remoteGeneration > localGeneration + MAX_GENERATION_DIFFERENCE) {
// assume some peer has corrupted memory
// assume some peer has corrupted memory
// and is broadcasting an unbelievable generation about another peer (or itself)
logger.warn("received an invalid gossip generation for peer {}; "
+ "local generation = {}, received generation = {}", ep, localGeneration, remoteGeneration);
Expand All @@ -992,7 +993,8 @@ void applyStateLocally(Map<InetAddress, EndpointState> epStateMap) {
logger.trace("Ignoring remote generation {} < {}", remoteGeneration, localGeneration);
}
} else {
// this is a new node, report it to the FD in case it is the first time we are seeing it AND it's not alive
// this is a new node, report it to the FD in case it is the first time we are seeing it AND it's not
// alive
FailureDetector.instance.report(ep);
handleMajorStateChange(ep, remoteState);
}
Expand Down Expand Up @@ -1154,7 +1156,7 @@ public void addSavedEndpoint(InetAddress ep) {
return;
}

//preserve any previously known, in-memory data about the endpoint (such as DC, RACK, and so on)
// preserve any previously known, in-memory data about the endpoint (such as DC, RACK, and so on)
EndpointState epState = endpointStateMap.get(ep);
if (epState != null) {
logger.debug("not replacing a previous epState for {}, but reusing it: {}", ep, epState);
Expand Down Expand Up @@ -1259,4 +1261,12 @@ public void addExpireTimeForEndpoint(InetAddress endpoint, long expireTime) {
public static long computeExpireTime() {
return System.currentTimeMillis() + Gossiper.A_VERY_LONG_TIME;
}

public InetAddress getFirstLiveSeedEndpoint() {
for (InetAddress seed : DatabaseDescriptor.getSeedList()) {
if (FailureDetector.instance.isAlive(seed))
return seed;
}
return null;
}
}
Expand Up @@ -53,6 +53,8 @@
import org.lealone.command.router.SortedResult;
import org.lealone.dbobject.Schema;
import org.lealone.dbobject.table.TableFilter;
import org.lealone.engine.FrontendSession;
import org.lealone.engine.Session;
import org.lealone.expression.Parameter;
import org.lealone.message.DbException;
import org.lealone.result.ResultInterface;
Expand Down Expand Up @@ -80,18 +82,41 @@ public int executeDefineCommand(DefineCommand defineCommand) {
if (defineCommand.isLocal())
return defineCommand.updateLocal();

Set<InetAddress> liveMembers = Gossiper.instance.getLiveMembers();
List<Callable<Integer>> commands = New.arrayList(liveMembers.size());
InetAddress seedEndpoint = Gossiper.instance.getFirstLiveSeedEndpoint();
if (seedEndpoint == null)
throw new RuntimeException("no live seed endpoint");

liveMembers.remove(Utils.getBroadcastAddress());
commands.add(defineCommand);
try {
for (InetAddress endpoint : liveMembers) {
commands.add(createUpdateCallable(endpoint, defineCommand));
if (!seedEndpoint.equals(Utils.getBroadcastAddress())) {
Session session = defineCommand.getSession();
FrontendSession fs = null;
try {
fs = FrontendSessionPool.getSeedEndpointFrontendSession(session, session.getURL(seedEndpoint));
FrontendCommand fc = FrontendSessionPool.getFrontendCommand(fs, defineCommand.getSQL(),
defineCommand.getParameters(), defineCommand.getFetchSize());
return fc.executeUpdate();
} catch (Exception e) {
throw DbException.convert(e);
} finally {
if (fs != null)
fs.close();
}
}

// 在seed节点上串行执行所有的defineCommand
synchronized (this) {
Set<InetAddress> liveMembers = Gossiper.instance.getLiveMembers();
List<Callable<Integer>> commands = New.arrayList(liveMembers.size());

liveMembers.remove(Utils.getBroadcastAddress());
commands.add(defineCommand);
try {
for (InetAddress endpoint : liveMembers) {
commands.add(createUpdateCallable(endpoint, defineCommand));
}
return CommandParallel.executeUpdateCallable(commands);
} catch (Exception e) {
throw DbException.convert(e);
}
return CommandParallel.executeUpdateCallable(commands);
} catch (Exception e) {
throw DbException.convert(e);
}
}

Expand Down

0 comments on commit 2d68bfb

Please sign in to comment.