Skip to content

Commit

Permalink
(issue #29) updated cluster voting algo to be more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
bluestreak01 committed Jan 28, 2015
1 parent 1bde960 commit c42e0f9
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 68 deletions.
8 changes: 3 additions & 5 deletions nfsdb-core/src/main/java/com/nfsdb/net/JournalClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -198,7 +198,6 @@ public <T> void subscribe(JournalKey<T> remoteKey, JournalWriter<T> writer, TxLi

public VoteResult voteInstance(int instance, ServerNode remote) {
try {
LOGGER.info("Instance %s is sending vote to %s", instance, remote);
openChannel(null);
commandProducer.write(channel, Command.CLUSTER_VOTE);
intResponseProducer.write(channel, instance);
Expand All @@ -218,8 +217,7 @@ public VoteResult voteInstance(int instance, ServerNode remote) {
return VoteResult.ME;
}
} catch (JournalNetworkException e) {
LOGGER.info("Remote side bailed out. WIN by default.");
return VoteResult.ME;
return VoteResult.ME_BY_DEFAULT;
}
}

Expand Down Expand Up @@ -394,7 +392,7 @@ private void sendState() throws JournalNetworkException {
}

public static enum VoteResult {
ME, THEM, ALPHA
ME, THEM, ALPHA, ME_BY_DEFAULT
}


Expand Down
49 changes: 25 additions & 24 deletions nfsdb-core/src/main/java/com/nfsdb/net/JournalServer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,7 +54,7 @@ public class JournalServer {
private final AuthorizationHandler authorizationHandler;
private final JournalServerLogger serverLogger = new JournalServerLogger();
private final int serverInstance;
private final AtomicBoolean ignoreVoting = new AtomicBoolean(false);
private final AtomicBoolean alpha = new AtomicBoolean(false);
private ServerSocketChannel serverSocketChannel;

public JournalServer(JournalReaderFactory factory) {
Expand Down Expand Up @@ -118,13 +118,13 @@ public void halt(long timeout, TimeUnit unit) {
if (!running.compareAndSet(true, false)) {
return;
}
LOGGER.trace("Stopping agent services");
LOGGER.info("Stopping agent services %d", serverInstance);
service.shutdown();
for (ObjIntHashMap.Entry<JournalWriter> e : writers) {
e.key.setTxAsyncListener(null);
}

LOGGER.trace("Stopping acceptor");
LOGGER.info("Stopping acceptor");
try {
serverSocketChannel.close();
} catch (IOException e) {
Expand All @@ -134,24 +134,25 @@ public void halt(long timeout, TimeUnit unit) {

if (timeout > 0) {
try {
LOGGER.info("Waiting for %s agent services to conplete data exchange on %s", service.getActiveCount(), serverInstance);
service.awaitTermination(timeout, unit);
} catch (InterruptedException e) {
LOGGER.debug(e);
}
}

LOGGER.trace("Stopping bridge");
LOGGER.info("Stopping bridge on %d", serverInstance);
bridge.halt();

if (addressSender != null) {
LOGGER.trace("Stopping mcast sender");
LOGGER.info("Stopping mcast sender on %d", serverInstance);
addressSender.halt();
}

LOGGER.trace("Closing channels");
LOGGER.info("Closing channels on %d", serverInstance);
closeChannels();

LOGGER.trace("Stopping logger");
LOGGER.info("Stopping logger on %d", serverInstance);
serverLogger.halt();

try {
Expand All @@ -170,12 +171,12 @@ public void halt() {
halt(30, TimeUnit.SECONDS);
}

public boolean isIgnoreVoting() {
return ignoreVoting.get();
public boolean isAlpha() {
return alpha.get();
}

public void setIgnoreVoting(boolean ignore) {
ignoreVoting.set(ignore);
public void setAlpha(boolean ignore) {
alpha.set(ignore);
}

public boolean isRunning() {
Expand Down Expand Up @@ -203,6 +204,18 @@ public void start() throws JournalNetworkException {
service.execute(new Acceptor());
}

int getWriterIndex(JournalKey key) {
for (ObjIntHashMap.Entry<JournalWriter> e : writers.immutableIterator()) {
JournalKey jk = e.key.getKey();
if (jk.getId().equals(key.getId()) && (
(jk.getLocation() == null && key.getLocation() == null)
|| (jk.getLocation() != null && jk.getLocation().equals(key.getLocation())))) {
return e.value;
}
}
return JOURNAL_KEY_NOT_FOUND;
}

private void addChannel(SocketChannelHolder holder) {
channels.add(holder);
}
Expand Down Expand Up @@ -231,18 +244,6 @@ private void closeChannels() {
}
}

int getWriterIndex(JournalKey key) {
for (ObjIntHashMap.Entry<JournalWriter> e : writers.immutableIterator()) {
JournalKey jk = e.key.getKey();
if (jk.getId().equals(key.getId()) && (
(jk.getLocation() == null && key.getLocation() == null)
|| (jk.getLocation() != null && jk.getLocation().equals(key.getLocation())))) {
return e.value;
}
}
return JOURNAL_KEY_NOT_FOUND;
}

private void removeChannel(SocketChannelHolder holder) {
if (channels.remove(holder)) {
closeChannel(holder, false);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -94,15 +94,15 @@ public void process(ByteChannel channel) throws JournalNetworkException {
intResponseConsumer.read(channel);
if (intResponseConsumer.isComplete()) {
int inst = intResponseConsumer.getValue();
boolean loss = !server.isIgnoreVoting() && inst > server.getServerInstance();
boolean loss = !server.isAlpha() && inst > server.getServerInstance();
intResponseConsumer.reset();
commandConsumer.reset();

if (loss) {
ok(channel);
throw new ClusterLossException(inst);
} else {
error(channel, server.isIgnoreVoting() ? "WIN" : "OUT");
error(channel, server.isAlpha() ? "WIN" : "OUT");
}
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;

public class ClusterController {

Expand All @@ -40,6 +41,7 @@ public class ClusterController {
private final Runnable up = new Runnable() {

private boolean startup = true;

@Override
public void run() {
try {
Expand Down Expand Up @@ -107,6 +109,10 @@ public void halt() throws JournalNetworkException {
}
}

public boolean isAlpha() {
return server != null && server.isAlpha();
}

public void start() {
if (running.compareAndSet(false, true)) {
service.submit(up);
Expand All @@ -121,7 +127,6 @@ private ServerNode getActiveNodeAndSetupClient() throws JournalNetworkException
continue;
}


client = new JournalClient(clientConfig, factory);
if (client.pingServer(node)) {
return node;
Expand Down Expand Up @@ -172,18 +177,21 @@ public void onDisconnect(JournalClient.DisconnectReason reason) {
}

private ServerNode thisNode() {
return serverConfig.getNode(instance);
ServerNode node = serverConfig.getNode(instance);
if (node == null) {
System.out.println("NULL node for " + instance);
}
return node;
}

private void vote(boolean startup) throws JournalNetworkException {
System.out.println("VOTE " + startup);

// this method can be called during both, standalone start and cluster re-vote
// during re-vote all members scramble to become ALPHA so checking for sever presence
// is only appropriate during standalone start, where new server node will always assume
// slave role if there is existing server.
ServerNode activeNode;
if (startup) {
ServerNode activeNode;
try {
if ((activeNode = getActiveNodeAndSetupClient()) != null) {
LOGGER.info(thisNode() + " There is active node already %s. Yielding", activeNode);
Expand Down Expand Up @@ -213,32 +221,54 @@ private void vote(boolean startup) throws JournalNetworkException {
//
// on other hand, if this node loses, it is enough to become a slave, so give up voting in this case.
boolean isClient = false;
while (!isClient && server.isRunning() && (activeNode = getActiveNodeAndSetupClient()) != null && client != null) {
LOGGER.info("%s thinks that %s is present", thisNode(), activeNode);
switch (client.voteInstance(instance, activeNode)) {
case ALPHA:
LOGGER.info(thisNode() + " Lost tie-break vote, becoming a client");
server.halt();
// don't stop server explicitly, it wil shut down after being voted out
setupClient(activeNode);
return;
case THEM:
LOGGER.info("%s lost tie-break against %s, wait for ALPHA node", thisNode(), activeNode);
boolean nodesLeft = true;
while (!isClient && nodesLeft) {

nodesLeft = false;
for (ServerNode node : clientConfig.nodes()) {

if (!server.isRunning()) {
isClient = true;
server.halt();
break;
default:
LOGGER.info("%s WON tie-break against %s", thisNode(), activeNode);
}
}

// always stop client because we will create new one when looking for ALPHA
haltClient();
Thread.yield();
if (node.getId() == instance) {
continue;
}

client = new JournalClient(clientConfig, factory);
LOGGER.info("%s is probing %s", thisNode(), node);
JournalClient.VoteResult vote = client.voteInstance(instance, node);
LOGGER.info("%s got %s from %s", thisNode(), vote, node);
switch (vote) {
case ALPHA:
LOGGER.info(thisNode() + " Lost tie-break vote, becoming a client");
server.halt();
// don't stop server explicitly, it wil shut down after being voted out
setupClient(node);
return;
case THEM:
LOGGER.info("%s lost tie-break against %s, wait for ALPHA node", thisNode(), node);
isClient = true;
server.halt();
break;
case ME_BY_DEFAULT:
LOGGER.info("%s WON by default against %s", thisNode(), node);
break;
default:
LOGGER.info("%s WON tie-break against %s", thisNode(), node);
nodesLeft = true;
}

// always stop client because we will create new one when looking for ALPHA
haltClient();
Thread.yield();
}
}

if (!isClient) {
// after this point server cannot be voted out and it becomes the ALPHA
server.setIgnoreVoting(true);
server.setAlpha(true);
LOGGER.info(thisNode() + " Activating callback");
listener.onNodeActive();
return;
Expand All @@ -247,16 +277,18 @@ private void vote(boolean startup) throws JournalNetworkException {
// look for ALPHA in a loop
// this loop cannot exit unless it finds ALPHA or runs out of nodes to check and errors out
while (true) {
activeNode = getActiveNodeAndSetupClient();
ServerNode activeNode = getActiveNodeAndSetupClient();
if (activeNode == null || client == null) {
throw new JournalNetworkException("Expected ALPHA node but got none");
}

LOGGER.info("%s is checking if %s has become ALPHA", thisNode(), activeNode);
if (client.voteInstance(instance, activeNode) == JournalClient.VoteResult.ALPHA) {
setupClient(activeNode);
return;
}
haltClient();
LockSupport.parkNanos(500000000L);
}

}
Expand Down

0 comments on commit c42e0f9

Please sign in to comment.