Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

DS 133: Serverside rollback log support #94

Merged
merged 16 commits into from Mar 26, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion project/build.properties
Expand Up @@ -3,6 +3,6 @@
project.organization=com.twitter
project.name=gizzard
sbt.version=0.7.4
project.version=3.0.8-SNAPSHOT
project.version=3.0.8-SNAPSHOT-3-stuhood
build.scala.versions=2.8.1
project.initialize=false
1 change: 1 addition & 0 deletions project/build/GizzardProject.scala
Expand Up @@ -20,6 +20,7 @@ with SubversionPublisher {
val finagleThrift = "com.twitter" % "finagle-thrift" % finagleVersion
val finagleOstrich4 = "com.twitter" % "finagle-ostrich4" % finagleVersion

val utilThrift = "com.twitter" % "util-thrift" % "2.0.0"
val jackson = "org.codehaus.jackson" % "jackson-core-asl" % "1.9.2"
val jacksonMap = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.2"

Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/com/twitter/gizzard/GizzardServer.scala
Expand Up @@ -3,7 +3,7 @@ package com.twitter.gizzard
import com.twitter.util.Duration
import com.twitter.conversions.time._
import com.twitter.logging.Logger
import com.twitter.gizzard.nameserver.{NameServer, RemoteClusterManager}
import com.twitter.gizzard.nameserver.{NameServer, RemoteClusterManager, RollbackLogManager}
import com.twitter.gizzard.scheduler._
import com.twitter.gizzard.config.{GizzardServer => ServerConfig}
import com.twitter.gizzard.proxy.LoggingProxy
Expand All @@ -27,6 +27,7 @@ abstract class GizzardServer(config: ServerConfig) {
val nameServer = config.buildNameServer()
val remoteClusterManager = config.buildRemoteClusterManager()
val shardManager = nameServer.shardManager
val rollbackLogManager = new RollbackLogManager(nameServer.shard)
lazy val adminJobManager = new AdminJobManager(nameServer.shardRepository, shardManager, jobScheduler(copyPriority))

// job wiring
Expand All @@ -49,7 +50,7 @@ abstract class GizzardServer(config: ServerConfig) {

// service wiring

lazy val managerServer = new thrift.ManagerService(nameServer, shardManager, adminJobManager, remoteClusterManager, jobScheduler)
lazy val managerServer = new thrift.ManagerService(nameServer, shardManager, adminJobManager, remoteClusterManager, rollbackLogManager, jobScheduler)
lazy val managerThriftServer = config.manager(new thrift.Manager.Processor(managerServer))

lazy val jobInjectorServer = new thrift.JobInjectorService(jobCodec, jobScheduler)
Expand Down
70 changes: 70 additions & 0 deletions src/main/scala/com/twitter/gizzard/nameserver/MemoryShard.scala
@@ -1,18 +1,33 @@
package com.twitter.gizzard.nameserver

import java.nio.ByteBuffer
import scala.collection.mutable
import com.twitter.gizzard.shards._
import com.twitter.gizzard.thrift


/**
* NameServer implementation that doesn't actually store anything anywhere.
* Useful for tests or stubbing out the partitioning scheme.
*/
object MemoryShardManagerSource {
class LogEntry(val logId: ByteBuffer, val id: Int, val command: thrift.TransformOperation, var deleted: Boolean) {
override def equals(o: Any) = o match {
case that: LogEntry if that.logId == this.logId && that.id == this.id => true
case _ => false
}

override def hashCode() = logId.hashCode() + (31 * id)
}
}
class MemoryShardManagerSource extends ShardManagerSource {
import MemoryShardManagerSource._

val shardTable = new mutable.ListBuffer[ShardInfo]()
val parentTable = new mutable.ListBuffer[LinkInfo]()
val forwardingTable = new mutable.ListBuffer[Forwarding]()
val logs = new mutable.ListBuffer[(ByteBuffer,String)]()
val logEntries = new mutable.ListBuffer[LogEntry]()

private def find(info: ShardInfo): Option[ShardInfo] = {
shardTable.find { x =>
Expand Down Expand Up @@ -148,6 +163,19 @@ class MemoryShardManagerSource extends ShardManagerSource {
parentTable.toList
}

def batchExecute(commands : Seq[TransformOperation]) {
for (cmd <- commands) {
cmd match {
case CreateShard(shardInfo) => createShard(shardInfo)
case DeleteShard(shardId) => deleteShard(shardId)
case AddLink(upId, downId, weight) => addLink(upId, downId, weight)
case RemoveLink(upId, downId) => removeLink(upId, downId)
case SetForwarding(forwarding) => setForwarding(forwarding)
case RemoveForwarding(forwarding) => removeForwarding(forwarding)
}
}
}

def getBusyShards(): Seq[ShardInfo] = {
shardTable.filter { _.busy.id > 0 }.toList
}
Expand All @@ -156,6 +184,48 @@ class MemoryShardManagerSource extends ShardManagerSource {
forwardingTable.map(_.tableId).toSet.toSeq.sortWith((a,b) => a < b)
}

def logCreate(id: Array[Byte], logName: String): Unit = {
val pair = (ByteBuffer.wrap(id), logName)
if (!logs.contains(pair))
logs += pair
else
throw new RuntimeException("Log already exists: " + pair)
}

def logGet(logName: String): Option[Array[Byte]] =
logs.collect {
case (id, `logName`) => id.array
}.headOption

def logEntryPush(logId: Array[Byte], entry: thrift.LogEntry): Unit = {
val le = new LogEntry(ByteBuffer.wrap(logId), entry.id, entry.command, false)
if (!logEntries.contains(le))
logEntries += le
else
throw new RuntimeException("Log entry already exists: " + le)
}

def logEntryPeek(rawLogId: Array[Byte], count: Int): Seq[thrift.LogEntry] = {
val logId = ByteBuffer.wrap(rawLogId)
val peeked =
logEntries.reverseIterator.collect {
case e if e.logId == logId && !e.deleted => e
}.take(count)
peeked.map { e =>
new thrift.LogEntry().setId(e.id).setCommand(e.command)
}.toSeq
}

def logEntryPop(rawLogId: Array[Byte], entryId: Int): Unit = {
val logId = ByteBuffer.wrap(rawLogId)
val entry = logEntries.collect {
case e if e.logId == logId && e.id == entryId => e
}.headOption.getOrElse { throw new RuntimeException(entryId + " not found for " + logId) }

// side effect: mark deleted
entry.deleted = true
}

def prepareReload() { }
}

Expand Down
103 changes: 103 additions & 0 deletions src/main/scala/com/twitter/gizzard/nameserver/RollbackLogManager.scala
@@ -0,0 +1,103 @@
package com.twitter.gizzard.nameserver

import java.util.UUID
import java.nio.ByteBuffer
import scala.collection.mutable
import scala.math.Ordering

import com.twitter.logging.Logger
import com.twitter.gizzard.shards.RoutingNode
import com.twitter.gizzard.thrift


class RollbackLogManager(shard: RoutingNode[ShardManagerSource]) {
import RollbackLogManager._

def create(logName: String): ByteBuffer = {
val idArray = new Array[Byte](UUID_BYTES)
val id = ByteBuffer.wrap(idArray)
val uuid = UUID.randomUUID
id.asLongBuffer
.put(uuid.getMostSignificantBits)
.put(uuid.getLeastSignificantBits)
shard.write.foreach(_.logCreate(idArray, logName))
log.info("Created new rollback log for name '%s' with id %s", logName, uuid)
id
}

def get(logName: String): Option[ByteBuffer] =
shard.read.any(_.logGet(logName).map(ByteBuffer.wrap))

def entryPush(logId: ByteBuffer, entry: thrift.LogEntry): Unit =
shard.write.foreach(_.logEntryPush(unwrap(logId), entry))

/**
* @return the entries with the highest ids on all replicas: note that this means
* that logs cannot be rolled back while any nameserver replicas are unavailable.
*/
def entryPeek(logId: ByteBuffer, count: Int): Seq[thrift.LogEntry] = {
// collect k from each shard
val descendingEntryLists = shard.read.map(_.logEntryPeek(unwrap(logId), count))
// and take the top k
RollbackLogManager.topK(descendingEntryLists, count)
}

def entryPop(logId: ByteBuffer, entryId: Int): Unit =
shard.write.foreach(_.logEntryPop(unwrap(logId), entryId))

private def unwrap(bb: ByteBuffer): Array[Byte] = {
if (bb.hasArray && bb.remaining == bb.capacity) {
bb.array
} else {
val arr = new Array[Byte](bb.remaining)
bb.duplicate.get(arr)
arr
}
}
}

object RollbackLogManager {
private val log = Logger.get(getClass.getName)

val UUID_BYTES = 16

/**
* Calculates the top-k entries by id.
* TODO: this is not the hot path, but we should eventually do a real linear merge.
*/
def topK(
descendingEntryLists: Iterable[Seq[thrift.LogEntry]],
count: Int
): Seq[thrift.LogEntry] = count match {
case 1 =>
val heads = descendingEntryLists.flatMap(_.lastOption)
// no optional form of max
if (!heads.isEmpty)
Seq(heads.max(EntryOrdering))
else
Nil
case _ =>
val q = new mutable.PriorityQueue[thrift.LogEntry]()(EntryOrdering)
val result = mutable.Buffer[thrift.LogEntry]()
descendingEntryLists.foreach { q ++= _ }
val iterator = q.iterator
// take the first k non-equal entries
var gathered = 0
var lastId = Int.MinValue
while (gathered < count && iterator.hasNext) {
val entry = iterator.next
if (entry.id != lastId) {
result += entry
lastId = entry.id
gathered += 1
}
}
result
}

// TODO: Scala 2.8.1 doesn't have maxBy
object EntryOrdering extends Ordering[thrift.LogEntry] {
override def compare(a: thrift.LogEntry, b: thrift.LogEntry) =
Ordering.Int.compare(a.id, b.id)
}
}
15 changes: 14 additions & 1 deletion src/main/scala/com/twitter/gizzard/nameserver/ShardManager.scala
Expand Up @@ -3,6 +3,7 @@ package com.twitter.gizzard.nameserver
import scala.collection.mutable
import com.twitter.gizzard.shards._
import com.twitter.gizzard.util.TreeUtils
import com.twitter.gizzard.thrift


class ShardManager(shard: RoutingNode[ShardManagerSource], repo: ShardRepository) {
Expand All @@ -11,6 +12,8 @@ class ShardManager(shard: RoutingNode[ShardManagerSource], repo: ShardRepository
def diffState(lastUpdatedSeq: Long) = shard.read.any(_.diffState(lastUpdatedSeq))
def dumpStructure(tableIds: Seq[Int]) = shard.read.any(_.dumpStructure(tableIds))

def batchExecute(commands : Seq[TransformOperation]) { shard.write.foreach(_.batchExecute(commands)) }

@throws(classOf[ShardException])
def createAndMaterializeShard(shardInfo: ShardInfo) {
shard.write.foreach(_.createShard(shardInfo))
Expand All @@ -27,7 +30,6 @@ class ShardManager(shard: RoutingNode[ShardManagerSource], repo: ShardRepository
def listShards() = shard.read.any(_.listShards())
def getBusyShards() = shard.read.any(_.getBusyShards())


def addLink(upId: ShardId, downId: ShardId, weight: Int) { shard.write.foreach(_.addLink(upId, downId, weight)) }
def removeLink(upId: ShardId, downId: ShardId) { shard.write.foreach(_.removeLink(upId, downId)) }

Expand Down Expand Up @@ -65,6 +67,8 @@ trait ShardManagerSource {
tableIds.map(extractor)
}

@throws(classOf[ShardException]) def batchExecute(commands : Seq[TransformOperation])

@throws(classOf[ShardException]) def createShard(shardInfo: ShardInfo)
@throws(classOf[ShardException]) def deleteShard(id: ShardId)
@throws(classOf[ShardException]) def markShardBusy(id: ShardId, busy: Busy.Value)
Expand Down Expand Up @@ -93,4 +97,13 @@ trait ShardManagerSource {

@throws(classOf[ShardException]) def listHostnames(): Seq[String]
@throws(classOf[ShardException]) def listTables(): Seq[Int]

@throws(classOf[ShardException]) def logCreate(id: Array[Byte], logName: String): Unit
@throws(classOf[ShardException]) def logGet(logName: String): Option[Array[Byte]]
@throws(classOf[ShardException]) def logEntryPush(logId: Array[Byte], entry: thrift.LogEntry): Unit
@throws(classOf[ShardException]) def logEntryPeek(logId: Array[Byte], count: Int): Seq[thrift.LogEntry]
@throws(classOf[ShardException]) def logEntryPop(logId: Array[Byte], entryId: Int): Unit

/** For JMocker. TODO: switch to a better mocking framework */
override def toString() = "<%s>".format(this.getClass)
}