Skip to content

Commit

Permalink
issue #29, re-implemented leader selection algorithm based on modifie…
Browse files Browse the repository at this point in the history
…d Chang and Reberts algo (http://en.wikipedia.org/wiki/Chang_and_Roberts_algorithm). Modifications include:

- dead node detection
- acks
- hop counting to prevent infinite loops
- leader reassertion to prevent current leader from being demoted.
  • Loading branch information
bluestreak01 committed Mar 25, 2015
1 parent 4982a06 commit 8b63778
Show file tree
Hide file tree
Showing 46 changed files with 518 additions and 541 deletions.
3 changes: 2 additions & 1 deletion nfsdb-core/src/main/java/com/nfsdb/Journal.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public long decrementRowID(long rowID) throws JournalException {
return -1;
}

@SuppressWarnings("EqualsBetweenInconvertibleTypes")
@Override
public boolean equals(Object o) {
return this == o || !(o == null || getClass() != o.getClass()) && key.equals(((Journal) o).key);
Expand Down Expand Up @@ -662,7 +663,7 @@ private void configureIrregularPartition() throws JournalException {
temp.applyTx(tx.lagSize, tx.lagIndexPointers);
setIrregularPartition(temp);
// exit out of while loop
} else if (lagPartitionName != null && irregularPartition != null && lagPartitionName.equals(irregularPartition.getName())) {
} else if (lagPartitionName != null && lagPartitionName.equals(irregularPartition.getName())) {
irregularPartition.applyTx(tx.lagSize, tx.lagIndexPointers);
} else if (lagPartitionName == null && irregularPartition != null) {
removeIrregularPartitionInternal();
Expand Down
30 changes: 15 additions & 15 deletions nfsdb-core/src/main/java/com/nfsdb/JournalKey.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -100,7 +100,7 @@ private JournalKey(String clazz, String location, PartitionType partitionType, i
this.ordered = ordered;
}

public static JournalKey<Object> fromBuffer(ByteBuffer buffer) {
public static <X> JournalKey<X> fromBuffer(ByteBuffer buffer) {
// id
int clazzLen = buffer.getInt();
byte[] clazz = new byte[clazzLen];
Expand Down Expand Up @@ -128,6 +128,15 @@ public String derivedLocation() {
return location == null ? id : location;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof JournalKey)) return false;
JournalKey that = (JournalKey) o;
return ordered == that.ordered && recordHint == that.recordHint && !(id != null ? !id.equals(that.id) : that.id != null) && !(location != null ? !location.equals(that.location) : that.location != null) && partitionType == that.partitionType;

}

public int getBufferSize() {
return 4 + id.getBytes(Files.UTF_8).length + 4 + 2 * (location == null ? 0 : location.length()) + 1 + 1 + 4;
}
Expand All @@ -152,6 +161,8 @@ public int getRecordHint() {
return recordHint;
}

//////////////////////// REPLICATION CODE //////////////////////

@Override
public int hashCode() {
int result = id != null ? id.hashCode() : 0;
Expand All @@ -161,15 +172,8 @@ public int hashCode() {
return 31 * result + (ordered ? 1 : 0);
}

//////////////////////// REPLICATION CODE //////////////////////

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof JournalKey)) return false;
JournalKey that = (JournalKey) o;
return ordered == that.ordered && recordHint == that.recordHint && !(id != null ? !id.equals(that.id) : that.id != null) && !(location != null ? !location.equals(that.location) : that.location != null) && partitionType == that.partitionType;

public boolean isOrdered() {
return ordered;
}

@Override
Expand All @@ -183,10 +187,6 @@ public String toString() {
'}';
}

public boolean isOrdered() {
return ordered;
}

public void write(ByteBuffer buffer) {
// id
buffer.putInt(id.length());
Expand Down
1 change: 1 addition & 0 deletions nfsdb-core/src/main/java/com/nfsdb/JournalWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ public JournalEntryWriter entryWriter(long timestamp) throws JournalException {
}
}

@SuppressWarnings("EqualsBetweenInconvertibleTypes")
@Override
public boolean equals(Object o) {
return this == o || !(o == null || getClass() != o.getClass()) && getKey().equals(((Journal) o).getKey());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public interface JournalReaderFactory extends Closeable {

<T> Journal<T> reader(Class<T> clazz, String location, int recordHint) throws JournalException;

public static enum JournalExistenceCheck {
enum JournalExistenceCheck {
EXISTS, DOES_NOT_EXIST, EXISTS_FOREIGN
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

public interface JournalConfiguration {

public static final String FILE_NAME = "_meta2";
String FILE_NAME = "_meta2";

<T> JournalMetadata<T> augmentMetadata(MetadataBuilder<T> builder) throws JournalException;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,7 +20,7 @@

public abstract class AbstractMutableObjectConsumer<T> extends AbstractObjectConsumer {

protected T value;
private T value;

public final T getValue() {
return value;
Expand Down
59 changes: 11 additions & 48 deletions nfsdb-core/src/main/java/com/nfsdb/ha/JournalClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -110,18 +110,19 @@ public JournalClient(ClientConfig config, JournalWriterFactory factory, Credenti
this.credentialProvider = credentialProvider;
}

public void halt() throws JournalNetworkException {
public void halt() {
if (running.compareAndSet(true, false)) {
try {
if (handlerFuture != null) {
if (handlerFuture != null) {
try {
handlerFuture.get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Exception while waiting for client to shutdown gracefully", e);
} finally {
handlerFuture = null;
}
close0();
free();
} catch (Exception e) {
throw new JournalNetworkException(e);
}
close0();
free();
} else {
closeChannel();
}
Expand All @@ -131,16 +132,6 @@ public boolean isRunning() {
return running.get();
}

public boolean pingServer(ServerNode node) {
try {
openChannel(node);
sendProtocolVersion();
return true;
} catch (JournalNetworkException e) {
return false;
}
}

public JournalClient setDisconnectCallback(DisconnectCallback callback) {
this.disconnectCallback.next = callback;
return this;
Expand Down Expand Up @@ -210,27 +201,6 @@ public void subscribe(JournalKey remote, JournalKey local, TxListener txListener
listeners.add(txListener);
}

public VoteResult voteInstance(int instance) {
try {
openChannel(null);
commandProducer.write(channel, Command.CLUSTER_VOTE);
intResponseProducer.write(channel, instance);
stringResponseConsumer.read(channel);

switch (stringResponseConsumer.getValue()) {
case "WIN":
return VoteResult.ALPHA;
case "OUT":
return VoteResult.THEM;
default:
return VoteResult.ME;
}
} catch (JournalNetworkException e) {
LOGGER.info("Voting error: %s", e.getMessage());
return VoteResult.ME_BY_DEFAULT;
}
}

private void checkAck() throws JournalNetworkException {
stringResponseConsumer.read(channel);
fail("OK".equals(stringResponseConsumer.getValue()), stringResponseConsumer.getValue());
Expand Down Expand Up @@ -413,11 +383,6 @@ private <T> void set0(int index, JournalWriter<T> writer, TxListener txListener)
}
}

public static enum VoteResult {
ME, THEM, ALPHA, ME_BY_DEFAULT
}


public enum DisconnectReason {
UNKNOWN, CLIENT_HALT, CLIENT_EXCEPTION, BROKEN_CHANNEL, CLIENT_ERROR, INCOMPATIBLE_JOURNAL
}
Expand Down Expand Up @@ -487,10 +452,8 @@ public void run() {
switch (commandConsumer.getValue()) {
case JOURNAL_DELTA_CMD:
statsChannel.setDelegate(channel);
intResponseConsumer.read(statsChannel);
int index = intResponseConsumer.getValue();
JournalDeltaConsumer deltaConsumer = deltaConsumers.get(index);
deltaConsumer.read(statsChannel);
int index = intResponseConsumer.getValue(statsChannel);
deltaConsumers.get(index).read(statsChannel);
statusSentList.set(index, 0);
statsChannel.logStats();
break;
Expand Down

0 comments on commit 8b63778

Please sign in to comment.