Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
using thew new parsing technology
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Kallen authored and Nick Kallen committed Apr 9, 2010
1 parent a3a82ac commit 5f7ec3f
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 32 deletions.
Binary file modified libs/gizzard-1.0.jar
Binary file not shown.
12 changes: 8 additions & 4 deletions src/main/scala/com/twitter/rowz/Rowz.scala
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -73,10 +73,14 @@ object Rowz {
} }
val prioritizingScheduler = new PrioritizingJobScheduler(schedulerMap) val prioritizingScheduler = new PrioritizingJobScheduler(schedulerMap)


val copyJobParser = new BoundJobParser((nameServer, prioritizingScheduler(Priority.Medium.id))) val copyJobParser = new BoundJobParser(jobs.CopyParser, (nameServer, prioritizingScheduler(Priority.Medium.id)))
val rowzJobParser = new BoundJobParser(forwardingManager) val migrateJobParser = new BoundJobParser(new gizzard.jobs.MigrateParser(jobs.CopyParser), (nameServer, prioritizingScheduler(Priority.Medium.id)))
polymorphicJobParser += ("Copy|Migrate".r, copyJobParser) val createJobParser = new BoundJobParser(jobs.CreateParser, forwardingManager)
polymorphicJobParser += ("Create|Destroy".r, rowzJobParser) val destroyJobParser = new BoundJobParser(jobs.DestroyParser, forwardingManager)
polymorphicJobParser += ("Copy".r, copyJobParser)
polymorphicJobParser += ("Migrate".r, migrateJobParser)
polymorphicJobParser += ("Create".r, createJobParser)
polymorphicJobParser += ("Destroy".r, destroyJobParser)


val rowzService = new RowzService(forwardingManager, prioritizingScheduler, new IdGenerator(config("host.id").toInt)) val rowzService = new RowzService(forwardingManager, prioritizingScheduler, new IdGenerator(config("host.id").toInt))


Expand Down
28 changes: 15 additions & 13 deletions src/main/scala/com/twitter/rowz/jobs/Copy.scala
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -2,32 +2,34 @@ package com.twitter.rowz.jobs


import com.twitter.gizzard import com.twitter.gizzard
import Shard.Cursor import Shard.Cursor
import com.twitter.gizzard.nameserver.NameServer
import com.twitter.gizzard.scheduler.JobScheduler
import com.twitter.xrayspecs.TimeConversions._ import com.twitter.xrayspecs.TimeConversions._




object Copy { object CopyFactory extends gizzard.jobs.CopyFactory[Shard] {
val COUNT = 500 val COUNT = 500
}


object CopyFactory extends gizzard.jobs.CopyFactory[Shard] { def apply(sourceShardId: Int, destinationShardId: Int) = new Copy(sourceShardId, destinationShardId, Shard.CursorStart, COUNT)
def apply(sourceShardId: Int, destinationShardId: Int) = new Copy(sourceShardId, destinationShardId, Shard.CursorStart)
} }


class Copy(sourceShardId: Int, destinationShardId: Int, cursor: Cursor, count: Int) extends gizzard.jobs.Copy[Shard](sourceShardId, destinationShardId, count) { object CopyParser extends gizzard.jobs.CopyParser[Shard] {
def this(sourceShardId: Int, destinationShardId: Int, cursor: Cursor) = this(sourceShardId, destinationShardId, cursor, Copy.COUNT) def apply(attributes: Map[String, Any]) = {
def this(attributes: Map[String, AnyVal]) = { val casted = attributes.asInstanceOf[Map[String, AnyVal]]
this( new Copy(
attributes("source_shard_id").toInt, casted("source_shard_id").toInt,
attributes("destination_shard_id").toInt, casted("destination_shard_id").toInt,
attributes("cursor").toInt, casted("cursor").toLong,
attributes("count").toInt) casted("count").toInt)
} }
}


class Copy(sourceShardId: Int, destinationShardId: Int, cursor: Cursor, count: Int) extends gizzard.jobs.Copy[Shard](sourceShardId, destinationShardId, count) {
def serialize = Map("cursor" -> cursor) def serialize = Map("cursor" -> cursor)


def copyPage(sourceShard: Shard, destinationShard: Shard, count: Int) = { def copyPage(sourceShard: Shard, destinationShard: Shard, count: Int) = {
val (items, nextCursor) = sourceShard.selectAll(cursor, count) val (items, nextCursor) = sourceShard.selectAll(cursor, count)
destinationShard.write(items) destinationShard.write(items)
nextCursor.map(new Copy(sourceShardId, destinationShardId, _)) nextCursor.map(new Copy(sourceShardId, destinationShardId, _, count))
} }
} }
14 changes: 8 additions & 6 deletions src/main/scala/com/twitter/rowz/jobs/Create.scala
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import com.twitter.xrayspecs.Time
import com.twitter.xrayspecs.TimeConversions._ import com.twitter.xrayspecs.TimeConversions._




case class Create(id: Long, name: String, at: Time) extends UnboundJob[ForwardingManager] { object CreateParser extends gizzard.jobs.UnboundJobParser[ForwardingManager] {
def this(attributes: Map[String, AnyVal]) = { def apply(attributes: Map[String, Any]) = {
this( new Create(
attributes("id").toLong, attributes("id").asInstanceOf[Long],
attributes("name").toString, attributes("name").asInstanceOf[String],
Time(attributes("at").toInt.seconds)) Time(attributes("at").asInstanceOf[Int].seconds))
} }
}


case class Create(id: Long, name: String, at: Time) extends UnboundJob[ForwardingManager] {
def toMap = { def toMap = {
Map("id" -> id, "name" -> name, "at" -> at.inSeconds) Map("id" -> id, "name" -> name, "at" -> at.inSeconds)
} }
Expand Down
20 changes: 11 additions & 9 deletions src/main/scala/com/twitter/rowz/jobs/Destroy.scala
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@ import com.twitter.xrayspecs.TimeConversions._
import com.twitter.gizzard.jobs.UnboundJob import com.twitter.gizzard.jobs.UnboundJob




case class Destroy(row: Row, at: Time) extends UnboundJob[ForwardingManager] { object DestroyParser extends gizzard.jobs.UnboundJobParser[ForwardingManager] {
def this(attributes: Map[String, AnyVal]) = { def apply(attributes: Map[String, Any]) = {
this( new Destroy(
new Row( new Row(
attributes("id").toLong, attributes("id").asInstanceOf[Long],
attributes("name").toString, attributes("name").asInstanceOf[String],
Time(attributes("createdAt").toInt.seconds), Time(attributes("createdAt").asInstanceOf[Int].seconds),
Time(attributes("updatedAt").toInt.seconds), Time(attributes("updatedAt").asInstanceOf[Int].seconds),
State(attributes("state").toInt)), State(attributes("state").asInstanceOf[Int])),
Time(attributes("at").toInt.seconds)) Time(attributes("at").asInstanceOf[Int].seconds))
} }
}


case class Destroy(row: Row, at: Time) extends UnboundJob[ForwardingManager] {
def toMap = { def toMap = {
Map( Map(
"id" -> row.id, "id" -> row.id,
Expand Down
2 changes: 2 additions & 0 deletions src/test/scala/com/twitter/rowz/integration/RowzSpec.scala
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ object RowzSpec extends Specification with Eventually {
val queryEvaluator = queryEvaluatorFactory("localhost", null, config("rowz.db.username"), config("rowz.db.password")) val queryEvaluator = queryEvaluatorFactory("localhost", null, config("rowz.db.username"), config("rowz.db.password"))


doBefore { doBefore {
queryEvaluator.execute("DROP DATABASE IF EXISTS " + config("rowz.nameserver.name"))
queryEvaluator.execute("CREATE DATABASE " + config("rowz.nameserver.name"))
state.nameServer.rebuildSchema() state.nameServer.rebuildSchema()
queryEvaluator.execute("DROP DATABASE IF EXISTS " + config("rowz.db.name")) queryEvaluator.execute("DROP DATABASE IF EXISTS " + config("rowz.db.name"))


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ object ShardManagerSpec extends Specification with Eventually {
var shardIdB = 0 var shardIdB = 0


doBefore { doBefore {
queryEvaluator.execute("DROP DATABASE IF EXISTS " + config("rowz.nameserver.name"))
queryEvaluator.execute("CREATE DATABASE " + config("rowz.nameserver.name"))
state.nameServer.rebuildSchema() state.nameServer.rebuildSchema()
queryEvaluator.execute("DROP DATABASE IF EXISTS " + config("rowz.db.name")) queryEvaluator.execute("DROP DATABASE IF EXISTS " + config("rowz.db.name"))


Expand Down

0 comments on commit 5f7ec3f

Please sign in to comment.