Permalink
Browse files

wip, new job style, simplified service

  • Loading branch information...
1 parent 115af61 commit c1efcb74ee131bae054c4ff682906a7daf39c52f @freels freels committed Apr 11, 2011
@@ -1,89 +1,134 @@
package com.twitter.rowz
-import net.lag.configgy.ConfigMap
-import com.twitter.querulous.database.{ApachePoolingDatabaseFactory, MemoizingDatabaseFactory, DatabaseFactory}
-import com.twitter.querulous.query.SqlQueryFactory
-import com.twitter.querulous.evaluator.StandardQueryEvaluatorFactory
-import com.twitter.xrayspecs.TimeConversions._
-import net.lag.logging.{Logger, ThrottledLogger}
-import com.twitter.gizzard.Future
-import com.twitter.gizzard.scheduler.{JobScheduler, PrioritizingJobScheduler, Priority}
-import com.twitter.gizzard.shards._
-import com.twitter.gizzard.nameserver
-import com.twitter.gizzard.jobs.{PolymorphicJobParser, BoundJobParser}
-import scala.collection.mutable
-import com.twitter.ostrich.W3CStats
-
-
-object Rowz {
- case class State(
- rowzService: RowzService,
- prioritizingScheduler: PrioritizingJobScheduler,
- nameServer: nameserver.NameServer[Shard],
- copyFactory: gizzard.jobs.CopyFactory[Shard]) {
- def start() = {
- nameServer.reload()
- prioritizingScheduler.start()
- }
-
- def shutdown() = prioritizingScheduler.shutdown()
- }
-
- def apply(config: ConfigMap, w3c: W3CStats): State = apply(
- config, w3c,
- new MemoizingDatabaseFactory(new ApachePoolingDatabaseFactory(
- config("rowz.db.connection_pool.size_min").toInt,
- config("rowz.db.connection_pool.size_max").toInt,
- config("rowz.db.connection_pool.test_idle_msec").toLong.millis,
- config("rowz.db.connection_pool.max_wait").toLong.millis,
- config("rowz.db.connection_pool.test_on_borrow").toBoolean,
- config("rowz.db.connection_pool.min_evictable_idle_msec").toLong.millis))
- )
-
- def apply(config: ConfigMap, w3c: W3CStats, databaseFactory: DatabaseFactory): State = {
- val queryEvaluatorFactory = new StandardQueryEvaluatorFactory(databaseFactory, new SqlQueryFactory)
-
- val throttledLogger = new ThrottledLogger[String](Logger(), config("throttled_log.period_msec").toInt, config("throttled_log.rate").toInt)
- val future = new Future("ReplicatingFuture", config.configMap("rowz.replication.future"))
-
- val shardRepository = new nameserver.BasicShardRepository[Shard](new ReadWriteShardAdapter(_), throttledLogger, future)
- shardRepository += ("com.twitter.rowz.SqlShard" -> new SqlShardFactory(queryEvaluatorFactory, config))
-
- val nameServerShards = config.getList("rowz.nameserver.hostnames").map { hostname =>
- new nameserver.SqlShard(
- queryEvaluatorFactory(
- hostname,
- config("rowz.nameserver.name"),
- config("rowz.nameserver.username"),
- config("rowz.nameserver.password")))
- }
-
- val replicatingNameServerShard = new nameserver.ReadWriteShardAdapter(new ReplicatingShard(
- new ShardInfo("com.twitter.gizzard.shards.ReplicatingShard", "", ""),
- 1, nameServerShards, new nameserver.LoadBalancer(nameServerShards), throttledLogger, future))
- val nameServer = new nameserver.NameServer(replicatingNameServerShard, shardRepository, Hash)
- val forwardingManager = new ForwardingManager(nameServer)
-
- val polymorphicJobParser = new PolymorphicJobParser
- val schedulerMap = new mutable.HashMap[Int, JobScheduler]
- List((Priority.High, "high"), (Priority.Medium, "medium"), (Priority.Low, "low")).foreach { case (priority, configName) =>
- val queueConfig = config.configMap("rowz.queue")
- val scheduler = JobScheduler(configName, queueConfig, polymorphicJobParser, w3c)
- schedulerMap(priority.id) = scheduler
- }
- val prioritizingScheduler = new PrioritizingJobScheduler(schedulerMap)
-
- val copyJobParser = new BoundJobParser(jobs.CopyParser, (nameServer, prioritizingScheduler(Priority.Medium.id)))
- val migrateJobParser = new BoundJobParser(new gizzard.jobs.MigrateParser(jobs.CopyParser), (nameServer, prioritizingScheduler(Priority.Medium.id)))
- val createJobParser = new BoundJobParser(jobs.CreateParser, forwardingManager)
- val destroyJobParser = new BoundJobParser(jobs.DestroyParser, forwardingManager)
- polymorphicJobParser += ("Copy".r, copyJobParser)
- polymorphicJobParser += ("Migrate".r, migrateJobParser)
- polymorphicJobParser += ("Create".r, createJobParser)
- polymorphicJobParser += ("Destroy".r, destroyJobParser)
-
- val rowzService = new RowzService(forwardingManager, prioritizingScheduler, new IdGenerator(config("host.id").toInt))
-
- State(rowzService, prioritizingScheduler, nameServer, jobs.CopyFactory)
+import java.sql.{ResultSet, SQLException}
+import com.twitter.querulous
+import com.twitter.querulous.evaluator.{QueryEvaluatorFactory, QueryEvaluator}
+import com.twitter.querulous.config.Connection
+import com.twitter.querulous.query.SqlQueryTimeoutException
+
+import com.twitter.gizzard
+import nameserver.NameServer
+import shards.{ShardId, ShardInfo, ShardException, ShardTimeoutException}
+import scheduler.{JobScheduler, JsonJob, CopyJob, CopyJobParser, CopyJobFactory, JsonJobParser, PrioritizingJobScheduler}
+
+object Priority extends Enumeration {
+ val High, Medium, Low = Value
+}
+
+class Rowz(config: com.twitter.rowz.config.Rowz) extends GizzardServer[RowzShard](config) {
+
+ // define a factory for Rowz's ReadWriteShardAdapter
+ val readWriteShardAdapter = { s => new RowzShardAdapter(s) }
+
+ val jobPriorities = List(Priority.High.id, Priority.Medium.id, Priority.Low.id)
+
+ val copyPriority = Priority.Medium.id
+ val copyFactory = new RowzCopyFactory(nameServer, jobScheduler(Priority.Medium.id))
+
+ shardRepo += ("RowzShard" -> new RowzShardFactory(config.queryEvaluator(), config.databaseConnection))
+
+ jobCodec += ("Create".r -> new CreateJobParser())
+ jobCodec += ("Destroy".r -> new DestroyJobParser())
+
+
+ // curry findCurrentForwarding to pass to the service and job factories.
+
+ def findForwarding(id: Long) = nameServer.findCurrentForwarding(0, id)
+
+ // create the id generator
+
+ def idGenerator = new IdGenerator(config.idGenWorkerId)
+
+ // set up the service listener
+
+
+ val rowzService = new RowzService(findForwarding, jobScheduler, idGenerator)
+
+ lazy val rowzThriftServer = {
+ val processor = new thrift.TestServer.Processor(rowzService)
+ config.thriftServer(processor)
}
-}
+
+ def start() {
+ startGizzard()
+ new Thread(new Runnable { def run() { rowzThriftServer.serve() } }, "RowzServerThread").start()
+ }
+
+ def shutdown(quiesce: Boolean) {
+ rowzThriftServer.stop()
+ shutdownGizzard(quiesce)
+ }
+}
+
+// object Rowz {
+// case class State(
+// rowzService: RowzService,
+// prioritizingScheduler: PrioritizingJobScheduler,
+// nameServer: nameserver.NameServer[Shard],
+// copyFactory: gizzard.jobs.CopyFactory[Shard]) {
+// def start() = {
+// nameServer.reload()
+// prioritizingScheduler.start()
+// }
+
+// def shutdown() = prioritizingScheduler.shutdown()
+// }
+
+// def apply(config: ConfigMap, w3c: W3CStats): State = apply(
+// config, w3c,
+// new MemoizingDatabaseFactory(new ApachePoolingDatabaseFactory(
+// config("rowz.db.connection_pool.size_min").toInt,
+// config("rowz.db.connection_pool.size_max").toInt,
+// config("rowz.db.connection_pool.test_idle_msec").toLong.millis,
+// config("rowz.db.connection_pool.max_wait").toLong.millis,
+// config("rowz.db.connection_pool.test_on_borrow").toBoolean,
+// config("rowz.db.connection_pool.min_evictable_idle_msec").toLong.millis))
+// )
+
+// def apply(config: ConfigMap, w3c: W3CStats, databaseFactory: DatabaseFactory): State = {
+// val queryEvaluatorFactory = new StandardQueryEvaluatorFactory(databaseFactory, new SqlQueryFactory)
+
+// val throttledLogger = new ThrottledLogger[String](Logger(), config("throttled_log.period_msec").toInt, config("throttled_log.rate").toInt)
+// val future = new Future("ReplicatingFuture", config.configMap("rowz.replication.future"))
+
+// val shardRepository = new nameserver.BasicShardRepository[Shard](new ReadWriteShardAdapter(_), throttledLogger, future)
+// shardRepository += ("com.twitter.rowz.SqlShard" -> new SqlShardFactory(queryEvaluatorFactory, config))
+
+// val nameServerShards = config.getList("rowz.nameserver.hostnames").map { hostname =>
+// new nameserver.SqlShard(
+// queryEvaluatorFactory(
+// hostname,
+// config("rowz.nameserver.name"),
+// config("rowz.nameserver.username"),
+// config("rowz.nameserver.password")))
+// }
+
+// val replicatingNameServerShard = new nameserver.ReadWriteShardAdapter(new ReplicatingShard(
+// new ShardInfo("com.twitter.gizzard.shards.ReplicatingShard", "", ""),
+// 1, nameServerShards, new nameserver.LoadBalancer(nameServerShards), throttledLogger, future))
+// val nameServer = new nameserver.NameServer(replicatingNameServerShard, shardRepository, Hash)
+// val forwardingManager = new ForwardingManager(nameServer)
+
+// val polymorphicJobParser = new PolymorphicJobParser
+// val schedulerMap = new mutable.HashMap[Int, JobScheduler]
+// List((Priority.High, "high"), (Priority.Medium, "medium"), (Priority.Low, "low")).foreach { case (priority, configName) =>
+// val queueConfig = config.configMap("rowz.queue")
+// val scheduler = JobScheduler(configName, queueConfig, polymorphicJobParser, w3c)
+// schedulerMap(priority.id) = scheduler
+// }
+// val prioritizingScheduler = new PrioritizingJobScheduler(schedulerMap)
+
+// val copyJobParser = new BoundJobParser(jobs.CopyParser, (nameServer, prioritizingScheduler(Priority.Medium.id)))
+// val migrateJobParser = new BoundJobParser(new gizzard.jobs.MigrateParser(jobs.CopyParser), (nameServer, prioritizingScheduler(Priority.Medium.id)))
+// val createJobParser = new BoundJobParser(jobs.CreateParser, forwardingManager)
+// val destroyJobParser = new BoundJobParser(jobs.DestroyParser, forwardingManager)
+// polymorphicJobParser += ("Copy".r, copyJobParser)
+// polymorphicJobParser += ("Migrate".r, migrateJobParser)
+// polymorphicJobParser += ("Create".r, createJobParser)
+// polymorphicJobParser += ("Destroy".r, destroyJobParser)
+
+// val rowzService = new RowzService(forwardingManager, prioritizingScheduler, new IdGenerator(config("host.id").toInt))
+
+// State(rowzService, prioritizingScheduler, nameServer, jobs.CopyFactory)
+// }
+// }
@@ -8,7 +8,7 @@ import com.twitter.xrayspecs.TimeConversions._
import thrift.conversions.Row._
-class RowzService(forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, makeId: () => Long) extends thrift.Rowz.Iface {
+class RowzService(findForwarding: Long => RowzShard, scheduler: PrioritizingJobScheduler, makeId: () => Long) extends thrift.Rowz.Iface {
def create(name: String, at: Int) = {
val id = makeId()
scheduler(Priority.High.id)(new Create(id, name, Time(at.seconds)))
@@ -20,6 +20,6 @@ class RowzService(forwardingManager: ForwardingManager, scheduler: PrioritizingJ
}
def read(id: Long) = {
- forwardingManager(id).read(id).get.toThrift
+ findForwarding(id).read(id).get.toThrift
}
-}
+}
@@ -1,35 +1,43 @@
package com.twitter.rowz.jobs
-import com.twitter.gizzard
-import Shard.Cursor
import com.twitter.gizzard.nameserver.NameServer
-import com.twitter.gizzard.scheduler.JobScheduler
-import com.twitter.xrayspecs.TimeConversions._
+import com.twitter.gizzard.shards.ShardId
+import com.twitter.gizzard.scheduler._
-object CopyFactory extends gizzard.jobs.CopyFactory[Shard] {
- val COUNT = 500
-
- def apply(sourceShardId: Int, destinationShardId: Int) = new Copy(sourceShardId, destinationShardId, Shard.CursorStart, COUNT)
+class RowzCopyFactory(nameServer: NameServer[RowzShard], scheduler: JobScheduler, defaultCount: Int = 500)
+extends CopyJobFactory[RowzShard] {
+ def apply(source: ShardId, dest: ShardId) = {
+ new RowzCopyJob(source, dest, 0, defaultCount, nameServer, scheduler)
+ }
}
-object CopyParser extends gizzard.jobs.CopyParser[Shard] {
- def apply(attributes: Map[String, Any]) = {
- val casted = attributes.asInstanceOf[Map[String, AnyVal]]
- new Copy(
- casted("source_shard_id").toInt,
- casted("destination_shard_id").toInt,
- casted("cursor").toLong,
- casted("count").toInt)
+class RowzCopyParser(nameServer: NameServer[RowzShard], scheduler: JobScheduler)
+extends CopyJobParser[RowzShard] {
+ def deserialize(attributes: Map[String, Any], source: ShardId, dest: ShardId, count: Int) = {
+ val cursor = attributes("cursor").asInstanceOf[Int]
+ val count = attributes("count").asInstanceOf[Int]
+
+ new RowzCopyJob(source, dest, cursor, count, nameServer, scheduler)
}
}
-class Copy(sourceShardId: Int, destinationShardId: Int, cursor: Cursor, count: Int) extends gizzard.jobs.Copy[Shard](sourceShardId, destinationShardId, count) {
- def serialize = Map("cursor" -> cursor)
+class RowzCopyJob(
+ sourceId: ShardId,
+ destinationId: ShardId,
+ cursor: Int,
+ count: Int,
+ nameServer: NameServer[RowzShard],
+ scheduler: JobScheduler)
+extends CopyJob[RowzShard] {
+ def copyPage(source: RowzShard, dest: RowzShard, count: Int) = {
+ val rows = source.selectAll(cursor, count)
- def copyPage(sourceShard: Shard, destinationShard: Shard, count: Int) = {
- val (items, nextCursor) = sourceShard.selectAll(cursor, count)
- destinationShard.write(items)
- nextCursor.map(new Copy(sourceShardId, destinationShardId, _, count))
+ if (rows.isEmpty) {
+ None
+ } else {
+ dest.write(rows)
+ Some(new RowzCopyJob(sourceId, destinationId, rows.last.id, count, nameServer, scheduler))
+ }
}
}
@@ -1,25 +1,26 @@
package com.twitter.rowz.jobs
-import com.twitter.gizzard.jobs.UnboundJob
-import com.twitter.xrayspecs.Time
-import com.twitter.xrayspecs.TimeConversions._
+import com.twitter.gizzard.jobs.{JsonJobParser, JsonJob}
+import com.twitter.util.Time
-object CreateParser extends gizzard.jobs.UnboundJobParser[ForwardingManager] {
- def apply(attributes: Map[String, Any]) = {
- new Create(
+class CreateParser(findForwarding: Long => RowzShard) extends JsonJobParser {
+ def appy(attributes: Map[String, Any]): JsonJob = {
+ new CreateJob(
attributes("id").asInstanceOf[Long],
attributes("name").asInstanceOf[String],
- Time(attributes("at").asInstanceOf[Int].seconds))
+ Time.fromMilliseconds(attributes("at").asInstanceOf[Long]),
+ findForwarding
+ )
}
}
-case class Create(id: Long, name: String, at: Time) extends UnboundJob[ForwardingManager] {
+class CreateJob(id: Long, name: String, at: Time, findForwarding) extends JsonJob {
def toMap = {
- Map("id" -> id, "name" -> name, "at" -> at.inSeconds)
+ Map("id" -> id, "name" -> name, "at" -> at.inMilliseconds)
}
- def apply(forwardingManager: ForwardingManager) = {
- forwardingManager(id).create(id, name, at)
+ def apply() {
+ findForwarding(id).create(id, name, at)
}
-}
+}
@@ -1,35 +1,34 @@
package com.twitter.rowz.jobs
-import com.twitter.xrayspecs.Time
-import com.twitter.xrayspecs.TimeConversions._
-import com.twitter.gizzard.jobs.UnboundJob
+import com.twitter.gizzard.jobs.{JsonJobParser, JsonJob}
+import com.twitter.util.Time
-object DestroyParser extends gizzard.jobs.UnboundJobParser[ForwardingManager] {
- def apply(attributes: Map[String, Any]) = {
- new Destroy(
+class DestroyParser(findForwarding: Long => RowzShard) extends JsonJobParser {
+ def apply(attributes: Map[String, Any]): JsonJob = {
+ new DestroyJob(
new Row(
attributes("id").asInstanceOf[Long],
attributes("name").asInstanceOf[String],
- Time(attributes("createdAt").asInstanceOf[Int].seconds),
- Time(attributes("updatedAt").asInstanceOf[Int].seconds),
+ Time.fromMilliseconds(attributes("createdAt").asInstanceOf[Long]),
+ Time.fromMilliseconds(attributes("updatedAt").asInstanceOf[Long]),
State(attributes("state").asInstanceOf[Int])),
- Time(attributes("at").asInstanceOf[Int].seconds))
+ Time.fromMilliseconds(attributes("at").asInstanceOf[Long]))
}
}
-case class Destroy(row: Row, at: Time) extends UnboundJob[ForwardingManager] {
+class DestroyJob(row: Row, at: Time, findForwarding: Long => RowzShard) extends JsonJob {
def toMap = {
Map(
"id" -> row.id,
"name" -> row.name,
- "createdAt" -> row.createdAt.inSeconds,
- "updatedAt" -> row.updatedAt.inSeconds,
+ "createdAt" -> row.createdAt.inMilliseconds,
+ "updatedAt" -> row.updatedAt.inMilliseconds,
"state" -> row.state.id,
- "at" -> at.inSeconds)
+ "at" -> at.inMilliseconds)
}
- def apply(forwardingManager: ForwardingManager) = {
- forwardingManager(row.id).destroy(row, at)
+ def apply() {
+ findForwarding(row.id).destroy(row, at)
}
-}
+}

0 comments on commit c1efcb7

Please sign in to comment.