From 6b3992b424f2a6537e8f4802419139e082c58eaa Mon Sep 17 00:00:00 2001 From: Mehmet Dogan Date: Tue, 31 Oct 2017 11:46:08 +0300 Subject: [PATCH] RAFT: leader election start --- hazelcast-raft/pom.xml | 208 ++++++++++++++ .../java/com/hazelcast/raft/RaftConfig.java | 21 ++ .../java/com/hazelcast/raft/RaftContext.java | 84 ++++++ .../main/java/com/hazelcast/raft/RaftLog.java | 9 + .../java/com/hazelcast/raft/RaftNode.java | 258 ++++++++++++++++++ .../java/com/hazelcast/raft/RaftRole.java | 12 + .../java/com/hazelcast/raft/RaftService.java | 94 +++++++ .../java/com/hazelcast/raft/VoteRequest.java | 52 ++++ .../java/com/hazelcast/raft/VoteResponse.java | 47 ++++ .../hazelcast/raft/operation/AsyncRaftOp.java | 52 ++++ .../raft/operation/RaftResponseHandler.java | 21 ++ .../raft/operation/RequestVoteOp.java | 44 +++ .../raft/util/StripeExecutorConveyor.java | 34 +++ .../test/java/com/hazelcast/raft/Test.java | 37 +++ hazelcast-raft/src/test/resources/log4j2.xml | 32 +++ pom.xml | 1 + 16 files changed, 1006 insertions(+) create mode 100644 hazelcast-raft/pom.xml create mode 100644 hazelcast-raft/src/main/java/com/hazelcast/raft/RaftConfig.java create mode 100644 hazelcast-raft/src/main/java/com/hazelcast/raft/RaftContext.java create mode 100644 hazelcast-raft/src/main/java/com/hazelcast/raft/RaftLog.java create mode 100644 hazelcast-raft/src/main/java/com/hazelcast/raft/RaftNode.java create mode 100644 hazelcast-raft/src/main/java/com/hazelcast/raft/RaftRole.java create mode 100644 hazelcast-raft/src/main/java/com/hazelcast/raft/RaftService.java create mode 100644 hazelcast-raft/src/main/java/com/hazelcast/raft/VoteRequest.java create mode 100644 hazelcast-raft/src/main/java/com/hazelcast/raft/VoteResponse.java create mode 100644 hazelcast-raft/src/main/java/com/hazelcast/raft/operation/AsyncRaftOp.java create mode 100644 hazelcast-raft/src/main/java/com/hazelcast/raft/operation/RaftResponseHandler.java create mode 100644 hazelcast-raft/src/main/java/com/hazelcast/raft/operation/RequestVoteOp.java create mode 100644 hazelcast-raft/src/main/java/com/hazelcast/raft/util/StripeExecutorConveyor.java create mode 100644 hazelcast-raft/src/test/java/com/hazelcast/raft/Test.java create mode 100644 hazelcast-raft/src/test/resources/log4j2.xml diff --git a/hazelcast-raft/pom.xml b/hazelcast-raft/pom.xml new file mode 100644 index 000000000000..21c6923bc8af --- /dev/null +++ b/hazelcast-raft/pom.xml @@ -0,0 +1,208 @@ + + + + 4.0.0 + + hazelcast raft + hazelcast-raft + Hazelcast Raft Module + jar + + + com.hazelcast + hazelcast-root + 3.10-SNAPSHOT + ../pom.xml + + + + + ${project.parent.basedir} + + + + + + pl.project13.maven + git-commit-id-plugin + ${maven.git.commit.id.plugin.version} + + + + revision + + + + + false + ${project.basedir}/.git + false + 7 + + true + + + + + + org.codehaus.mojo + animal-sniffer-maven-plugin + ${maven.animal.sniffer.plugin.version} + + + org.codehaus.mojo.signature + java16 + 1.0 + + + sun.misc.Unsafe + + + + + source-java6-check + compile + + check + + + + + + + org.apache.maven.plugins + maven-source-plugin + ${maven.source.plugin.version} + + + attach-sources + + jar + + + + + + + org.apache.felix + maven-bundle-plugin + ${maven.bundle.plugin.version} + + + bundle-manifest + process-classes + + manifest + + + + com.hazelcast.osgi.impl.Activator + + com.hazelcast.* + + + !org.junit, + !com.hazelcast.*, + !com.eclipsesource.json, + sun.misc;resolution:=optional, + javax.cache;resolution:=optional, + javax.cache.*;resolution:=optional, + org.apache.log4j;resolution:=optional, + org.apache.log4j.spi;resolution:=optional, + org.apache.logging.log4j;resolution:=optional, + org.apache.logging.log4j.spi;resolution:=optional, + org.slf4j;resolution:=optional, + org.codehaus.groovy.jsr223;resolution:=optional, + org.jruby.embed.jsr223;resolution:=optional, + * + + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + ${maven.jar.plugin.version} + + + true + true + + com.hazelcast.core.server.StartServer + + true + true + + ${project.build.outputDirectory}/META-INF/MANIFEST.MF + + + **/*.html + **/*.sh + **/*.bat + META-INF/services/javax.annotation.processing.Processor + + + + + + test-jar + + + + + + + + org.apache.maven.plugins + maven-clean-plugin + 3.0.0 + + + clean + + clean + + + + + src/main/java + + **/Generated* + + + + + + + + + + + + + com.hazelcast + hazelcast + ${project.version} + compile + + + + diff --git a/hazelcast-raft/src/main/java/com/hazelcast/raft/RaftConfig.java b/hazelcast-raft/src/main/java/com/hazelcast/raft/RaftConfig.java new file mode 100644 index 000000000000..c29d129ea429 --- /dev/null +++ b/hazelcast-raft/src/main/java/com/hazelcast/raft/RaftConfig.java @@ -0,0 +1,21 @@ +package com.hazelcast.raft; + +import java.util.Collection; + +/** + * TODO: Javadoc Pending... + * + * @author mdogan 30.10.2017 + */ +public class RaftConfig { + + private Collection addresses; + + public Collection getAddresses() { + return addresses; + } + + public void setAddresses(Collection addresses) { + this.addresses = addresses; + } +} diff --git a/hazelcast-raft/src/main/java/com/hazelcast/raft/RaftContext.java b/hazelcast-raft/src/main/java/com/hazelcast/raft/RaftContext.java new file mode 100644 index 000000000000..a0d375d643b3 --- /dev/null +++ b/hazelcast-raft/src/main/java/com/hazelcast/raft/RaftContext.java @@ -0,0 +1,84 @@ +package com.hazelcast.raft; + +import com.hazelcast.nio.Address; + +import java.util.Collection; + +/** + * TODO: Javadoc Pending... + * + */ +public class RaftContext { + + final String name; + final Collection
members; + + private RaftRole role = RaftRole.FOLLOWER; + private int term; + private Address leader; + + private Address votedFor; + private int lastVoteTerm; + + public RaftContext(String name, Collection
members) { + this.name = name; + this.members = members; + } + + public String name() { + return name; + } + + public void role(RaftRole role) { + this.role = role; + } + + public Collection
members() { + return members; + } + + public RaftRole role() { + return role; + } + + public int term() { + return term; + } + + public int incTerm() { + return ++term; + } + + public Address leader() { + return leader; + } + + public void persistVote(int term, Address address) { + this.lastVoteTerm = term; + this.votedFor = address; + } + + public void term(int term) { + this.term = term; + } + + public int lastVoteTerm() { + return lastVoteTerm; + } + + public Address votedFor() { + return votedFor; + } + + public int lastLogTerm() { + return 0; + } + + public int lastLogIndex() { + return 0; + } + + public void leader(Address address) { + leader = address; + } +} diff --git a/hazelcast-raft/src/main/java/com/hazelcast/raft/RaftLog.java b/hazelcast-raft/src/main/java/com/hazelcast/raft/RaftLog.java new file mode 100644 index 000000000000..dbe5a9a30495 --- /dev/null +++ b/hazelcast-raft/src/main/java/com/hazelcast/raft/RaftLog.java @@ -0,0 +1,9 @@ +package com.hazelcast.raft; + +/** + * TODO: Javadoc Pending... + * + * @author mdogan 30.10.2017 + */ +public class RaftLog { +} diff --git a/hazelcast-raft/src/main/java/com/hazelcast/raft/RaftNode.java b/hazelcast-raft/src/main/java/com/hazelcast/raft/RaftNode.java new file mode 100644 index 000000000000..2d2f4df68429 --- /dev/null +++ b/hazelcast-raft/src/main/java/com/hazelcast/raft/RaftNode.java @@ -0,0 +1,258 @@ +package com.hazelcast.raft; + +import com.hazelcast.core.ExecutionCallback; +import com.hazelcast.logging.ILogger; +import com.hazelcast.nio.Address; +import com.hazelcast.raft.operation.RaftResponseHandler; +import com.hazelcast.raft.operation.RequestVoteOp; +import com.hazelcast.raft.util.StripeExecutorConveyor; +import com.hazelcast.spi.InternalCompletableFuture; +import com.hazelcast.spi.NodeEngine; +import com.hazelcast.spi.OperationService; +import com.hazelcast.spi.TaskScheduler; +import com.hazelcast.util.RandomPicker; +import com.hazelcast.util.executor.StripedExecutor; +import com.hazelcast.util.executor.StripedRunnable; + +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +/** + * TODO: Javadoc Pending... + * + * @author mdogan 30.10.2017 + */ +public class RaftNode { + + private final RaftContext context; + private final Executor executor; + private final NodeEngine nodeEngine; + private final ILogger logger; + private final TaskScheduler taskScheduler; + + public RaftNode(String name, Collection
addresses, NodeEngine nodeEngine, StripedExecutor executor) { + this.nodeEngine = nodeEngine; + this.executor = executor; + this.context = new RaftContext(name, addresses); + this.taskScheduler = nodeEngine.getExecutionService().getGlobalTaskScheduler(); + this.logger = nodeEngine.getLogger(getClass().getSimpleName() + "[" + name + "]"); + } + + public void start() { + if (nodeEngine.getClusterService().isJoined()) { + logger.warning("Starting raft group..."); + executor.execute(new LeaderElectionTask()); + } else { + scheduleStart(); + } + } + + private void scheduleStart() { + taskScheduler.schedule(new Runnable() { + @Override + public void run() { + start(); + } + }, 500, TimeUnit.MILLISECONDS); + } + + public void handleRequestVote(VoteRequest voteRequest, RaftResponseHandler responseHandler) { + executor.execute(new RequestVoteTask(voteRequest, responseHandler)); + } + + private class LeaderElectionTask implements StripedRunnable { + @Override + public int getKey() { + return getStripeKey(); + } + + @Override + public void run() { + logger.warning("Leader election start"); + context.role(RaftRole.CANDIDATE); + Address thisAddress = nodeEngine.getThisAddress(); + context.persistVote(context.incTerm(), thisAddress); + + if (context.members().size() == 1) { + context.role(RaftRole.LEADER); + logger.warning("We are the one! "); + return; + } + + OperationService operationService = nodeEngine.getOperationService(); + Collection futures = new LinkedList(); + for (Address address : context.members()) { + if (address.equals(thisAddress)) { + continue; + } + + RequestVoteOp op = new RequestVoteOp(context.name(), new VoteRequest(context.term(), + thisAddress, 0, 0)); + InternalCompletableFuture future = + operationService.invokeOnTarget(RaftService.SERVICE_NAME, op, address); + futures.add(future); + } + + int timeout = RandomPicker.getInt(1500, 3000); + + ExecutionCallback callback = new LeaderElectionCallback(); + + for (InternalCompletableFuture future : futures) { + future.andThen(callback, new StripeExecutorConveyor(getStripeKey(), executor)); + } + scheduleTimeout(timeout); + } + + private void scheduleTimeout(int timeout) { + taskScheduler.schedule(new Runnable() { + @Override + public void run() { + executor.execute(new LeaderElectionTimeoutTask()); + } + }, timeout, TimeUnit.MILLISECONDS); + } + + } + + private class LeaderElectionTimeoutTask implements StripedRunnable { + @Override + public int getKey() { + return getStripeKey(); + } + + @Override + public void run() { + if (context.role() != RaftRole.CANDIDATE) { + return; + } + logger.severe("Leader election timed out!"); + new LeaderElectionTask().run(); + } + } + + private class LeaderElectionCallback implements ExecutionCallback { + int majority; + Set
voters = new HashSet
(); + + @Override + public void onResponse(VoteResponse resp) { + if (RaftRole.CANDIDATE != context.role()) { + return; + } + + if (resp.term > context.term()) { + logger.warning("Newer term discovered, fallback to follower"); + context.role(RaftRole.FOLLOWER); + context.term(resp.term); + return; + } + + if (resp.term < context.term()) { + logger.warning("Obsolete vote response received: " + resp); + return; + } + + if (resp.granted && resp.term == context.term()) { + if (voters.add(resp.voter)) { + logger.warning("Vote granted from " + resp.voter + " for term " + context.term() + + ", number of votes: " + voters.size()); + } + } + + if (voters.size() >= majority) { + logger.severe("We are THE leader!"); + context.role(RaftRole.LEADER); + context.leader(nodeEngine.getThisAddress()); + } + } + + @Override + public void onFailure(Throwable t) { + } + } + + private int getStripeKey() { + return context.name().hashCode(); + } + + private class RequestVoteTask implements StripedRunnable { + private final VoteRequest req; + private final RaftResponseHandler responseHandler; + + public RequestVoteTask(VoteRequest req, RaftResponseHandler responseHandler) { + this.req = req; + this.responseHandler = responseHandler; + } + + @Override + public int getKey() { + return getStripeKey(); + } + + @Override + public void run() { + VoteResponse resp = new VoteResponse(); + resp.voter = nodeEngine.getThisAddress(); + try { + if (context.leader() != null && !req.candidate.equals(context.leader())) { + logger.warning("Rejecting vote request from " + req.candidate + " since we have a leader " + context.leader()); + rejectVoteResponse(resp); + return; + } + if (context.term() > req.term) { + logger.warning("Rejecting vote request from " + req.candidate + + " since our term is greater " + context.term() + " > " + req.term); + rejectVoteResponse(resp); + return; + } + + if (context.term() < req.term) { + logger.warning("Demoting to FOLLOWER after vote request from " + req.candidate + + " since our term is lower " + context.term() + " < " + req.term); + context.role(RaftRole.FOLLOWER); + context.term(req.term); + resp.term = req.term; + } + + if (context.lastVoteTerm() == req.term && context.votedFor() != null) { + logger.warning("Duplicate RequestVote for same term " + req.term + ", currently voted-for " + context.votedFor()); + if (req.candidate.equals(context.votedFor())) { + logger.warning("Duplicate RequestVote from candidate " + req.candidate); + resp.granted = true; + } + return; + } + + if (context.lastLogTerm() > req.lastLogTerm) { + logger.warning("Rejecting vote request from " + req.candidate + " since our last term is greater " + + context.lastLogTerm() + " > " + req.lastLogTerm); + return; + } + + if (context.lastLogTerm() == req.lastLogTerm && context.lastLogIndex() > req.lastLogIndex) { + logger.warning("Rejecting vote request from " + req.candidate + " since our last index is greater " + + context.lastLogIndex() + " > " + req.lastLogIndex); + return; + } + + logger.warning("Granted vote for " + req.candidate + ", term: " + req.term); + context.persistVote(req.term, req.candidate); + resp.granted = true; + + } finally { + responseHandler.send(resp); + } + } + } + + private VoteResponse rejectVoteResponse(VoteResponse response) { + response.granted = false; + response.term = context.term(); + response.voter = nodeEngine.getThisAddress(); + return response; + } +} diff --git a/hazelcast-raft/src/main/java/com/hazelcast/raft/RaftRole.java b/hazelcast-raft/src/main/java/com/hazelcast/raft/RaftRole.java new file mode 100644 index 000000000000..fe7c94a2371f --- /dev/null +++ b/hazelcast-raft/src/main/java/com/hazelcast/raft/RaftRole.java @@ -0,0 +1,12 @@ +package com.hazelcast.raft; + +/** + * TODO: Javadoc Pending... + * + */ +public enum RaftRole { + + FOLLOWER, + CANDIDATE, + LEADER +} diff --git a/hazelcast-raft/src/main/java/com/hazelcast/raft/RaftService.java b/hazelcast-raft/src/main/java/com/hazelcast/raft/RaftService.java new file mode 100644 index 000000000000..d6bc35388b51 --- /dev/null +++ b/hazelcast-raft/src/main/java/com/hazelcast/raft/RaftService.java @@ -0,0 +1,94 @@ +package com.hazelcast.raft; + +import com.hazelcast.core.HazelcastException; +import com.hazelcast.internal.util.RuntimeAvailableProcessors; +import com.hazelcast.logging.ILogger; +import com.hazelcast.logging.Logger; +import com.hazelcast.nio.Address; +import com.hazelcast.raft.operation.RaftResponseHandler; +import com.hazelcast.spi.ConfigurableService; +import com.hazelcast.spi.ManagedService; +import com.hazelcast.spi.NodeEngine; +import com.hazelcast.util.AddressUtil; +import com.hazelcast.util.executor.StripedExecutor; + +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * TODO: Javadoc Pending... + */ +public class RaftService implements ManagedService, ConfigurableService { + + public static final String SERVICE_NAME = "hz:core:raft"; + private static final String METADATA_RAFT = "METADATA"; + + private final Map nodes = new ConcurrentHashMap(); + private final StripedExecutor executor = + new StripedExecutor(Logger.getLogger("RaftServiceExecutor"), "raft", RuntimeAvailableProcessors.get(), Integer.MAX_VALUE); + + private volatile ILogger logger; + private volatile NodeEngine nodeEngine; + private volatile RaftConfig config; + + @Override + public void init(NodeEngine nodeEngine, Properties properties) { + this.nodeEngine = nodeEngine; + this.logger = nodeEngine.getLogger(getClass()); + Collection
addresses; + try { + addresses = getAddresses(); + } catch (UnknownHostException e) { + throw new HazelcastException(e); + } + logger.warning("CP nodes: " + addresses); + if (!addresses.contains(nodeEngine.getThisAddress())) { + logger.warning("We are not in CP nodes group :("); + return; + } + + RaftNode node = new RaftNode(METADATA_RAFT, addresses, nodeEngine, executor); + nodes.put(METADATA_RAFT, node); + node.start(); + } + + private Collection
getAddresses() throws UnknownHostException { + Collection endpoints = config.getAddresses(); + Set
addresses = new HashSet
(endpoints.size()); + for (String endpoint : endpoints) { + AddressUtil.AddressHolder addressHolder = AddressUtil.getAddressHolder(endpoint); + Address address = new Address(addressHolder.getAddress(), addressHolder.getPort()); + address.setScopeId(addressHolder.getScopeId()); + addresses.add(address); + } + return addresses; + } + + @Override + public void reset() { + } + + @Override + public void shutdown(boolean terminate) { + executor.shutdown(); + } + + @Override + public void configure(RaftConfig config) { + this.config = config; + } + + public void handleRequestVote(String name, VoteRequest voteRequest, RaftResponseHandler responseHandler) { + RaftNode node = nodes.get(name); + if (node == null) { + responseHandler.send(new VoteResponse(voteRequest.term, false, nodeEngine.getThisAddress())); + return; + } + node.handleRequestVote(voteRequest, responseHandler); + } +} diff --git a/hazelcast-raft/src/main/java/com/hazelcast/raft/VoteRequest.java b/hazelcast-raft/src/main/java/com/hazelcast/raft/VoteRequest.java new file mode 100644 index 000000000000..054d4252cce0 --- /dev/null +++ b/hazelcast-raft/src/main/java/com/hazelcast/raft/VoteRequest.java @@ -0,0 +1,52 @@ +package com.hazelcast.raft; + +import com.hazelcast.nio.Address; +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.nio.serialization.DataSerializable; + +import java.io.IOException; + +/** + * TODO: Javadoc Pending... + * + */ +public class VoteRequest implements DataSerializable { + + int term; + Address candidate; + int lastLogTerm; + int lastLogIndex; + + public VoteRequest() { + } + + public VoteRequest(int term, Address candidate, int lastLogTerm, int lastLogIndex) { + this.term = term; + this.candidate = candidate; + this.lastLogTerm = lastLogTerm; + this.lastLogIndex = lastLogIndex; + } + + @Override + public void writeData(ObjectDataOutput out) throws IOException { + out.writeInt(term); + out.writeObject(candidate); + out.writeInt(lastLogTerm); + out.writeInt(lastLogIndex); + } + + @Override + public void readData(ObjectDataInput in) throws IOException { + term = in.readInt(); + candidate = in.readObject(); + lastLogTerm = in.readInt(); + lastLogIndex = in.readInt(); + } + + @Override + public String toString() { + return "VoteRequest{" + "term=" + term + ", candidate=" + candidate + ", lastLogTerm=" + lastLogTerm + + ", lastLogIndex=" + lastLogIndex + '}'; + } +} diff --git a/hazelcast-raft/src/main/java/com/hazelcast/raft/VoteResponse.java b/hazelcast-raft/src/main/java/com/hazelcast/raft/VoteResponse.java new file mode 100644 index 000000000000..155cc224e4ff --- /dev/null +++ b/hazelcast-raft/src/main/java/com/hazelcast/raft/VoteResponse.java @@ -0,0 +1,47 @@ +package com.hazelcast.raft; + +import com.hazelcast.nio.Address; +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.nio.serialization.DataSerializable; + +import java.io.IOException; + +/** + * TODO: Javadoc Pending... + * + */ +public class VoteResponse implements DataSerializable { + + int term; + boolean granted; + Address voter; + + public VoteResponse() { + } + + public VoteResponse(int term, boolean granted, Address voter) { + this.term = term; + this.granted = granted; + this.voter = voter; + } + + @Override + public void writeData(ObjectDataOutput out) throws IOException { + out.writeInt(term); + out.writeBoolean(granted); + out.writeObject(voter); + } + + @Override + public void readData(ObjectDataInput in) throws IOException { + term = in.readInt(); + granted = in.readBoolean(); + voter = in.readObject(); + } + + @Override + public String toString() { + return "VoteResponse{" + "term=" + term + ", granted=" + granted + ", voter=" + voter + '}'; + } +} diff --git a/hazelcast-raft/src/main/java/com/hazelcast/raft/operation/AsyncRaftOp.java b/hazelcast-raft/src/main/java/com/hazelcast/raft/operation/AsyncRaftOp.java new file mode 100644 index 000000000000..936b9ee5df91 --- /dev/null +++ b/hazelcast-raft/src/main/java/com/hazelcast/raft/operation/AsyncRaftOp.java @@ -0,0 +1,52 @@ +package com.hazelcast.raft.operation; + +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.raft.RaftService; +import com.hazelcast.spi.Operation; + +import java.io.IOException; + +/** + * TODO: Javadoc Pending... + * + */ +public abstract class AsyncRaftOp extends Operation { + + String name; +// int term; + + + public AsyncRaftOp() { + } + + public AsyncRaftOp(String name) { + this.name = name; + } + + RaftResponseHandler newResponseHandler() { + return new RaftResponseHandler(this); + } + + @Override + public boolean returnsResponse() { + return false; + } + + @Override + public String getServiceName() { + return RaftService.SERVICE_NAME; + } + + @Override + protected void writeInternal(ObjectDataOutput out) throws IOException { + out.writeUTF(name); +// out.writeInt(term); + } + + @Override + protected void readInternal(ObjectDataInput in) throws IOException { + name = in.readUTF(); +// term = in.readInt(); + } +} diff --git a/hazelcast-raft/src/main/java/com/hazelcast/raft/operation/RaftResponseHandler.java b/hazelcast-raft/src/main/java/com/hazelcast/raft/operation/RaftResponseHandler.java new file mode 100644 index 000000000000..1b97675eca05 --- /dev/null +++ b/hazelcast-raft/src/main/java/com/hazelcast/raft/operation/RaftResponseHandler.java @@ -0,0 +1,21 @@ +package com.hazelcast.raft.operation; + +import com.hazelcast.spi.Operation; + +/** + * TODO: Javadoc Pending... + * + * @author mdogan 30.10.2017 + */ +public class RaftResponseHandler { + + private final Operation operation; + + public RaftResponseHandler(Operation operation) { + this.operation = operation; + } + + public void send(Object r) { + operation.sendResponse(r); + } +} diff --git a/hazelcast-raft/src/main/java/com/hazelcast/raft/operation/RequestVoteOp.java b/hazelcast-raft/src/main/java/com/hazelcast/raft/operation/RequestVoteOp.java new file mode 100644 index 000000000000..d631939ec7f9 --- /dev/null +++ b/hazelcast-raft/src/main/java/com/hazelcast/raft/operation/RequestVoteOp.java @@ -0,0 +1,44 @@ +package com.hazelcast.raft.operation; + +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.raft.RaftService; +import com.hazelcast.raft.VoteRequest; + +import java.io.IOException; + +/** + * TODO: Javadoc Pending... + * + * @author mdogan 30.10.2017 + */ +public class RequestVoteOp extends AsyncRaftOp { + + private VoteRequest voteRequest; + + public RequestVoteOp() { + } + + public RequestVoteOp(String name, VoteRequest voteRequest) { + super(name); + this.voteRequest = voteRequest; + } + + @Override + public void run() throws Exception { + RaftService service = getService(); + service.handleRequestVote(name, voteRequest, newResponseHandler()); + } + + @Override + protected void writeInternal(ObjectDataOutput out) throws IOException { + super.writeInternal(out); + out.writeObject(voteRequest); + } + + @Override + protected void readInternal(ObjectDataInput in) throws IOException { + super.readInternal(in); + voteRequest = in.readObject(); + } +} diff --git a/hazelcast-raft/src/main/java/com/hazelcast/raft/util/StripeExecutorConveyor.java b/hazelcast-raft/src/main/java/com/hazelcast/raft/util/StripeExecutorConveyor.java new file mode 100644 index 000000000000..51d5d69e7492 --- /dev/null +++ b/hazelcast-raft/src/main/java/com/hazelcast/raft/util/StripeExecutorConveyor.java @@ -0,0 +1,34 @@ +package com.hazelcast.raft.util; + +import com.hazelcast.util.executor.StripedRunnable; + +import java.util.concurrent.Executor; + +/** + * TODO: Javadoc Pending... + * + */ +public class StripeExecutorConveyor implements Executor { + private final int key; + private final Executor executor; + + public StripeExecutorConveyor(int key, Executor executor) { + this.key = key; + this.executor = executor; + } + + @Override + public void execute(final Runnable command) { + executor.execute(new StripedRunnable() { + @Override + public int getKey() { + return key; + } + + @Override + public void run() { + command.run(); + } + }); + } +} diff --git a/hazelcast-raft/src/test/java/com/hazelcast/raft/Test.java b/hazelcast-raft/src/test/java/com/hazelcast/raft/Test.java new file mode 100644 index 000000000000..fee33dfbc968 --- /dev/null +++ b/hazelcast-raft/src/test/java/com/hazelcast/raft/Test.java @@ -0,0 +1,37 @@ +package com.hazelcast.raft; + +import com.hazelcast.config.Config; +import com.hazelcast.config.ServiceConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.spi.properties.GroupProperty; + +import java.util.Arrays; + +/** + * TODO: Javadoc Pending... + * + * @author mdogan 30.10.2017 + */ +public class Test { + + static { + System.setProperty(GroupProperty.LOGGING_TYPE.getName(), "log4j2"); + System.setProperty(GroupProperty.WAIT_SECONDS_BEFORE_JOIN.getName(), "0"); + } + + public static void main(String[] args) { + Config config = new Config(); + config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false); + config.getNetworkConfig().getJoin().getTcpIpConfig().setEnabled(true).clear().addMember("127.0.0.1"); + + RaftConfig raftConfig = new RaftConfig(); + raftConfig.setAddresses(Arrays.asList("127.0.0.1:5701", "127.0.0.1:5702")); + + config.getServicesConfig().addServiceConfig( + new ServiceConfig().setEnabled(true).setName(RaftService.SERVICE_NAME) + .setClassName(RaftService.class.getName()) + .setConfigObject(raftConfig)); + + Hazelcast.newHazelcastInstance(config); + } +} diff --git a/hazelcast-raft/src/test/resources/log4j2.xml b/hazelcast-raft/src/test/resources/log4j2.xml new file mode 100644 index 000000000000..7be6e1593fba --- /dev/null +++ b/hazelcast-raft/src/test/resources/log4j2.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + + + + + diff --git a/pom.xml b/pom.xml index 8b2ee6b073c2..562e0953dc07 100755 --- a/pom.xml +++ b/pom.xml @@ -31,6 +31,7 @@ hazelcast hazelcast-client + hazelcast-raft hazelcast-spring hazelcast-build-utils hazelcast-all