Skip to content

Commit

Permalink
StateMachine in charge of lastAppliedIndex persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
pablosmedina committed Apr 30, 2014
1 parent 8556ae3 commit 8f790d0
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 71 deletions.
18 changes: 8 additions & 10 deletions src/main/scala/ckite/RLog.scala
Expand Up @@ -165,24 +165,22 @@ class RLog(val cluster: Cluster, val stateMachine: StateMachine) extends Logging
}

def stop = {
// db.close()
logAppender.stop
commandApplier.stop
persistentLog.close()
}

def serializeStateMachine = stateMachine.serialize().array()

private def initialize() = {
val nextIndexAfterSnapshot = snapshotManager.reloadSnapshot
logAppender.start
commandApplier.start(nextIndexAfterSnapshot - 1)
// val lastApplied = commandApplier.lastApplied
// if (nextIndexAfterSnapshot <= lastApplied) {
// replay(nextIndexAfterSnapshot, lastApplied)
// } else {
// LOG.debug("No entries to be replayed")
// }
val lastAppliedIndex = commandApplier.lastApplied
if (lastAppliedIndex == 0) {
val nextIndexAfterSnapshot = snapshotManager.reloadSnapshot
commandApplier.start(nextIndexAfterSnapshot - 1)
} else {
commandApplier.start
}
}

private def raiseMissingLogEntryException(entryIndex: Long) = {
Expand Down
27 changes: 19 additions & 8 deletions src/main/scala/ckite/example/KVStore.scala
Expand Up @@ -10,17 +10,28 @@ import java.nio.ByteBuffer
class KVStore extends StateMachine {

val map = new ConcurrentHashMap[String, String]()

def apply = {
@volatile
var lastIndex: Long = 0

def applyWrite = {
case (index, Put(key: String, value: String)) => {
map.put(key, value);
lastIndex = index
value
}
}

def applyRead = {
case Get(key) => map.get(key)
case Put(key:String,value:String) => map.put(key,value); value
}


def lastAppliedIndex: Long = lastIndex

def deserialize(byteBuffer: ByteBuffer) = {
val snapshotBytes = byteBuffer.array()
val deserializedMap:ConcurrentHashMap[String, String] = Serializer.deserialize[ConcurrentHashMap[String, String]](snapshotBytes)
map.clear()
map.putAll(deserializedMap)
val snapshotBytes = byteBuffer.array()
val deserializedMap: ConcurrentHashMap[String, String] = Serializer.deserialize[ConcurrentHashMap[String, String]](snapshotBytes)
map.clear()
map.putAll(deserializedMap)
}

def serialize(): ByteBuffer = ByteBuffer.wrap(Serializer.serialize(map))
Expand Down
58 changes: 30 additions & 28 deletions src/main/scala/ckite/rlog/CommandApplier.scala
Expand Up @@ -23,10 +23,13 @@ import ckite.rpc.LeaveJointConsensus
import ckite.rpc.ReadCommand
import scala.util.Try
import ckite.rpc.LogEntry
import ckite.statemachine.CommandExecutor
import ckite.rpc.WriteCommand
import ckite.rpc.WriteCommand

class CommandApplier(rlog: RLog, stateMachine: StateMachine) extends Logging {

val applyPartialFunction = stateMachine.apply
val commandExecutor = new CommandExecutor(stateMachine)
val commitIndexQueue = new LinkedBlockingQueue[Long]()

val asyncPool = new ThreadPoolExecutor(0, 1,
Expand All @@ -39,37 +42,41 @@ class CommandApplier(rlog: RLog, stateMachine: StateMachine) extends Logging {
@volatile
var commitIndex: Long = 0
@volatile
var lastApplied: Long = 0
var lastApplied: Long = stateMachine.lastAppliedIndex

def start(index: Long) = {
lastApplied = index
def start:Unit = {
workerExecutor.execute(asyncApplier _)
}

def start(index: Long):Unit = {
lastApplied = index
start
}

def stop = {
workerPool.shutdownNow()
asyncPool.shutdown()
workerPool.awaitTermination(10, TimeUnit.SECONDS)
asyncPool.awaitTermination(10, TimeUnit.SECONDS)
}

def commit(index: Long) = {
if (lastApplied < index) commitIndexQueue.offer(index)
}

def applyRead(command: ReadCommand) = applyPartialFunction.apply(command)

private def asyncApplier = {
LOG.info(s"Starting applier from index #{}",lastApplied)
LOG.info(s"Starting applier from index #{}", lastApplied)
try {
while (!Thread.currentThread().isInterrupted()) {
val index = next
if (lastApplied < index) {
val entry = rlog.logEntry(index)
if (isFromCurrentTerm(entry)) {
applyUntil(entry.get)
}
}
}
} catch {
while (true) {
val index = next
if (lastApplied < index) {
val entry = rlog.logEntry(index)
if (isFromCurrentTerm(entry)) {
applyUntil(entry.get)
}
}
}
} catch {
case e: InterruptedException => LOG.info("Shutdown CommandApplier...")
}
}
Expand All @@ -83,10 +90,10 @@ class CommandApplier(rlog: RLog, stateMachine: StateMachine) extends Logging {
val entryToApply = if (index == entry.index) Some(entry) else rlog.logEntry(index)
entryToApply.map { entry =>
commitIndex = index
LOG.debug("New commitIndex #{}", index)
LOG.debug("New commitIndex is #{}", index)
val command = entry.command
LOG.debug("Will apply committed entry {}", entry)
val result = execute(entry.command)
val result = execute(entry.index, entry.command)
lastApplied = index //What do we assume about the StateMachine persistence?
LOG.debug("Last applied index is #{}", lastApplied)
notifyResult(index, result)
Expand All @@ -107,13 +114,13 @@ class CommandApplier(rlog: RLog, stateMachine: StateMachine) extends Logging {

private def isCommitted(index: Long) = index <= commitIndex

private def execute(command: Command)(implicit context: ExecutionContext = asyncExecutionContext) = {
private def execute(index: Long, command: Command)(implicit context: ExecutionContext = asyncExecutionContext) = {
LOG.debug("Executing {}", command)
command match {
case c: EnterJointConsensus => executeEnterJointConsensus(c)
case c: LeaveJointConsensus => true
case c: NoOp => true
case _ => applyCommand(command)
case w: WriteCommand => commandExecutor.applyWrite(index, w)
}
}

Expand All @@ -128,13 +135,8 @@ class CommandApplier(rlog: RLog, stateMachine: StateMachine) extends Logging {
true
}

private def applyCommand(command: Command) = applyPartialFunction.apply(command)

private def apply(command: Command): Any = {
if (applyPartialFunction.isDefinedAt(command)) applyPartialFunction(command)
else throw new UnsupportedOperationException("No command handler in StateMachine")
}

def applyRead(read: ReadCommand) = commandExecutor.applyRead(read)

private def next = {
if (commitIndexQueue.isEmpty()) {
commitIndexQueue.take()
Expand Down
27 changes: 15 additions & 12 deletions src/main/scala/ckite/rlog/LogAppender.scala
Expand Up @@ -28,7 +28,10 @@ class LogAppender(rlog: RLog, log: PersistentLog) extends Logging {

def start = asyncExecutionContext.execute(asyncAppend _)

def stop = asyncPool.shutdownNow()
def stop = {
asyncPool.shutdownNow()
asyncPool.awaitTermination(10, TimeUnit.SECONDS)
}

//leader append
def append(term: Int, write: WriteCommand): Promise[(LogEntry, Promise[Any])] = append(LeaderAppend(term, write))
Expand All @@ -44,17 +47,17 @@ class LogAppender(rlog: RLog, log: PersistentLog) extends Logging {

private def asyncAppend = {
try {
while (!Thread.currentThread().isInterrupted()) {
val append = next
val logEntry = append.logEntry
rlog.shared {
log.append(logEntry)
}
pendingFlushes = pendingFlushes :+ (logEntry, append)
}
while (true) {
val append = next

val logEntry = append.logEntry

rlog.shared {
log.append(logEntry)
}

pendingFlushes = pendingFlushes :+ (logEntry, append)
}
} catch {
case e: InterruptedException => LOG.info("Shutdown LogAppender...")
}
Expand Down
19 changes: 13 additions & 6 deletions src/main/scala/ckite/statemachine/CommandExecutor.scala
@@ -1,14 +1,21 @@
package ckite.statemachine

import ckite.rpc.Command
import ckite.rpc.WriteCommand
import ckite.rpc.ReadCommand

class CommandExecutor(stateMachine: StateMachine) {

val applyPartialFunction = stateMachine.apply

def apply(command: Command): Any = {
if (applyPartialFunction.isDefinedAt(command)) applyPartialFunction(command)
else throw new UnsupportedOperationException("No command handler in StateMachine")

val writeFunction = stateMachine.applyWrite
val readFunction = stateMachine.applyRead

def applyWrite(index: Long, write: WriteCommand): Any = {
val params = (index, write)
if (writeFunction.isDefinedAt(params)) writeFunction(params)
}

def applyRead(read: ReadCommand): Any = {
if (readFunction.isDefinedAt(read)) readFunction(read)
}

}
33 changes: 29 additions & 4 deletions src/main/scala/ckite/statemachine/StateMachine.scala
Expand Up @@ -2,13 +2,38 @@ package ckite.statemachine

import java.nio.ByteBuffer
import ckite.rpc.Command
import ckite.rpc.ReadCommand
import ckite.rpc.WriteCommand

trait StateMachine {

/**
* Called when consensus has been reached on a WriteCommand.
* Along with the WriteCommand an index is provided to allow
* persistent StateMachines to save atomically both the WriteCommand's
* updates and the index. CKite will ask the lastAppliedIndex
* when deciding which WriteCommands can be replayed during startups.
*/
def applyWrite: PartialFunction[(Long, WriteCommand), Any]

/**
* The last applied index in the StateMachine. It is called
*/
def lastAppliedIndex: Long

/**
* Called when readonly commands are requested.
*/
def applyRead: PartialFunction[ReadCommand, Any]

/**
* Restore the StateMachine state from a Snapshot
*/
def deserialize(byteBuffer: ByteBuffer)


/**
* Captures the StateMachine state as a Snapshot
*/
def serialize(): ByteBuffer

def apply: PartialFunction[Command,Any]


}
8 changes: 7 additions & 1 deletion src/main/scala/ckite/statemachine/j/StateMachine.scala
Expand Up @@ -2,13 +2,19 @@ package ckite.statemachine.j

import java.nio.ByteBuffer
import ckite.rpc.Command
import ckite.rpc.WriteCommand
import ckite.rpc.ReadCommand

trait StateMachine {

def deserialize(byteBuffer: ByteBuffer)

def serialize(): ByteBuffer

def apply(command: Command):Any
def applyWrite(index:Long, write: WriteCommand):Any

def applyRead(read: ReadCommand):Any

def lastAppliedIndex: Long

}
11 changes: 9 additions & 2 deletions src/main/scala/ckite/statemachine/j/StateMachineWrapper.scala
Expand Up @@ -2,6 +2,8 @@ package ckite.statemachine.j

import java.nio.ByteBuffer
import ckite.rpc.Command
import ckite.rpc.WriteCommand
import ckite.rpc.ReadCommand


class StateMachineWrapper(jstateMachine: StateMachine) extends ckite.statemachine.StateMachine {
Expand All @@ -10,8 +12,13 @@ class StateMachineWrapper(jstateMachine: StateMachine) extends ckite.statemachin

def serialize(): ByteBuffer = jstateMachine.serialize

def apply: PartialFunction[Command,Any] = {
case c:Command => jstateMachine.apply(c)
def applyWrite: PartialFunction[(Long, WriteCommand),Any] = {
case (index, write) => jstateMachine.applyWrite(index, write)
}

def applyRead: PartialFunction[ReadCommand,Any] = {
case read => jstateMachine.applyRead(read)
}

def lastAppliedIndex: Long = jstateMachine.lastAppliedIndex
}

0 comments on commit 8f790d0

Please sign in to comment.