Permalink
Browse files

hack n slash at this point

  • Loading branch information...
1 parent ce8fe24 commit b7df89cb250c032b98c66783aeede8997418a838 @freels freels committed Apr 12, 2011
@@ -1,6 +1,6 @@
package com.twitter.rowz
-import com.twitter.xrayspecs.Time
+import com.twitter.util.Time
class IdGenerator(workerId: Long) extends (() => Long) {
@@ -1,6 +1,6 @@
package com.twitter.rowz
-import com.twitter.xrayspecs.Time
+import com.twitter.util.Time
-case class Row(id: Long, name: String, createdAt: Time, updatedAt: Time, state: State.Value)
+case class Row(id: Long, name: String, createdAt: Time, updatedAt: Time, state: State.Value)
@@ -4,40 +4,45 @@ import com.twitter.gizzard.scheduler._
import com.twitter.gizzard.nameserver
import com.twitter.gizzard.GizzardServer
+import com.twitter.rowz.jobs._
+
object Priority extends Enumeration {
val High, Medium, Low = Value
}
-class Rowz(config: com.twitter.rowz.config.Rowz) extends GizzardServer[RowzShard](config) {
+class Rowz(config: com.twitter.rowz.config.Rowz)
+extends GizzardServer[RowzShard](config) {
// define a factory for Rowz's ReadWriteShardAdapter
- val readWriteShardAdapter = { s => new RowzShardAdapter(s) }
+ val readWriteShardAdapter = new RowzShardAdapter(_)
val jobPriorities = List(Priority.High.id, Priority.Medium.id, Priority.Low.id)
- val copyPriority = Priority.Medium.id
- val copyFactory = new RowzCopyFactory(nameServer, jobScheduler(Priority.Medium.id))
-
- shardRepo += ("RowzShard" -> new RowzShardFactory(config.queryEvaluator(), config.databaseConnection))
-
- jobCodec += ("Ceate".r -> new CreateJobParser())
- jobCodec += ("Destroy".r -> new DestroyJobParser())
+ // create the id generator
+ def idGenerator = new IdGenerator(config.nodeId)
// curry findCurrentForwarding to pass to the service and job factories.
def findForwarding(id: Long) = nameServer.findCurrentForwarding(0, id)
- // create the id generator
+ val copyPriority = Priority.Medium.id
+ val copyFactory = new RowzCopyFactory(nameServer, jobScheduler(Priority.Medium.id))
+
+ shardRepo += ("RowzShard" -> new SqlShardFactory(config.rowzQueryEvaluator(), config.databaseConnection))
+
+ jobCodec += ("Ceate".r -> new CreateJobParser(findForwarding))
+ jobCodec += ("Destroy".r -> new DestroyJobParser(findForwarding))
+
+
- def idGenerator = new IdGenerator(config.idGenWorkerId)
// set up the service listener
val rowzService = new RowzService(findForwarding, jobScheduler, idGenerator)
lazy val rowzThriftServer = {
- val processor = new thrift.TestServer.Processor(rowzService)
+ val processor = new thrift.Rowz.Processor(rowzService)
config.server(processor)
}
@@ -46,7 +51,7 @@ class Rowz(config: com.twitter.rowz.config.Rowz) extends GizzardServer[RowzShard
new Thread(new Runnable { def run() { rowzThriftServer.serve() } }, "RowzServerThread").start()
}
- def shutdown(quiesce: Boolean) {
+ def shutdown(quiesce: Boolean = false) {
rowzThriftServer.stop()
shutdownGizzard(quiesce)
}
@@ -1,22 +1,26 @@
package com.twitter.rowz
-import net.lag.configgy.Config
-import com.twitter.gizzard.scheduler.{PrioritizingJobScheduler, Priority}
-import jobs.{Create, Destroy}
-import com.twitter.xrayspecs.Time
-import com.twitter.xrayspecs.TimeConversions._
+import com.twitter.gizzard.scheduler.{PrioritizingJobScheduler}
+import jobs.{CreateJob, DestroyJob}
+import com.twitter.util.Time
+import com.twitter.conversions.time._
import thrift.conversions.Row._
-class RowzService(findForwarding: Long => RowzShard, scheduler: PrioritizingJobScheduler, makeId: () => Long) extends thrift.Rowz.Iface {
- def create(name: String, at: Int) = {
+class RowzService(
+ findForwarding: Long => RowzShard,
+ scheduler: PrioritizingJobScheduler,
+ makeId: () => Long)
+extends thrift.Rowz.Iface {
+
+ def create(name: String, at: Long) = {
val id = makeId()
- scheduler(Priority.High.id)(new Create(id, name, Time(at.seconds)))
+ scheduler.put(Priority.High.id, new CreateJob(id, name, Time.fromMilliseconds(at), findForwarding))
id
}
- def destroy(row: thrift.Row, at: Int) {
- scheduler(Priority.Low.id)(new Destroy(row.fromThrift, Time(at.seconds)))
+ def destroy(row: thrift.Row, at: Long) {
+ scheduler.put(Priority.Low.id, new DestroyJob(row.fromThrift, Time.fromMilliseconds(at), findForwarding))
}
def read(id: Long) = {
@@ -1,7 +1,7 @@
package com.twitter.rowz
import com.twitter.gizzard.shards
-import com.twitter.xrayspecs.Time
+import com.twitter.util.Time
object RowzShard {
@@ -5,7 +5,7 @@ import com.twitter.util.Time
import RowzShard.Cursor
-class RowzShardAdapter(shard: ReadWriteShard[Shard])
+class RowzShardAdapter(shard: ReadWriteShard[RowzShard])
extends ReadWriteShardAdapter(shard) with RowzShard {
def create(id: Long, name: String, at: Time) = shard.writeOperation(_.create(id, name, at))
@@ -4,12 +4,14 @@ import java.sql.SQLException
import java.sql.{SQLIntegrityConstraintViolationException, ResultSet}
import com.twitter.querulous.evaluator.{QueryEvaluatorFactory, QueryEvaluator}
import com.twitter.querulous.query.SqlQueryTimeoutException
-import com.twitter.gizzard.shards.{ShardException, ShardInfo}
+import com.twitter.querulous.config.Connection
+import com.twitter.gizzard.shards.{ShardException, ShardTimeoutException, ShardInfo, ShardFactory}
import com.twitter.gizzard.proxy.SqlExceptionWrappingProxy
+import com.twitter.util.Time
class SqlShardFactory(qeFactory: QueryEvaluatorFactory, conn: Connection)
-extends shards.ShardFactory[Shard] {
+extends ShardFactory[RowzShard] {
val TABLE_DDL = """
CREATE TABLE IF NOT EXISTS %s (
@@ -27,14 +29,14 @@ CREATE TABLE IF NOT EXISTS %s (
new SqlShard(queryEvaluator, shardInfo, weight, children)
}
- def materialize(shardInfo: shards.ShardInfo) = {
+ def materialize(shardInfo: ShardInfo) = {
try {
val evaluator = qeFactory(connection.withHost(shardInfo.hostname).withoutDatabase)
evaluator.execute("CREATE DATABASE IF NOT EXISTS " + conn.database)
evaluator.execute(ddl.format(conn.database +"."+ info.tablePrefix))
} catch {
- case e: SQLException => throw new shards.ShardException(e.toString)
- case e: SqlQueryTimeoutException => throw new shards.ShardTimeoutException
+ case e: SQLException => throw new ShardException(e.toString)
+ case e: SqlQueryTimeoutException => throw new ShardTimeoutException
}
}
}
@@ -44,7 +46,8 @@ class SqlShard(
queryEvaluator: QueryEvaluator,
val shardInfo: shards.ShardInfo,
val weight: Int,
- val children: Seq[RowzShard]) extends RowzShard {
+ val children: Seq[RowzShard])
+extends RowzShard {
private val table = shardInfo.tablePrefix
@@ -17,11 +17,13 @@ trait RowzThriftServer extends TServer {
var port = 7919
}
-trait Rowz {
+trait Rowz extends GizzardServer {
def server: RowzThriftServer
def databaseConnection: Connection
def rowzQueryEvaluator: QueryEvaluator
+ def nodeId: Int
+
def admin: AdminService
}
@@ -1,8 +1,10 @@
-package com.twitter.rowz.jobs
+package com.twitter.rowz
+package jobs
import com.twitter.gizzard.nameserver.NameServer
import com.twitter.gizzard.shards.ShardId
import com.twitter.gizzard.scheduler._
+import com.twitter.rowz.RowzShard.Cursor
class RowzCopyFactory(nameServer: NameServer[RowzShard], scheduler: JobScheduler, defaultCount: Int = 500)
@@ -25,7 +27,7 @@ extends CopyJobParser[RowzShard] {
class RowzCopyJob(
sourceId: ShardId,
destinationId: ShardId,
- cursor: Int,
+ cursor: Cursor,
count: Int,
nameServer: NameServer[RowzShard],
scheduler: JobScheduler)
@@ -1,10 +1,11 @@
-package com.twitter.rowz.jobs
+package com.twitter.rowz
+package jobs
-import com.twitter.gizzard.jobs.{JsonJobParser, JsonJob}
+import com.twitter.gizzard.scheduler.{JsonJobParser, JsonJob}
import com.twitter.util.Time
-class CreateParser(findForwarding: Long => RowzShard) extends JsonJobParser {
+class CreateJobParser(findForwarding: Long => RowzShard) extends JsonJobParser {
def appy(attributes: Map[String, Any]): JsonJob = {
new CreateJob(
attributes("id").asInstanceOf[Long],
@@ -1,19 +1,23 @@
-package com.twitter.rowz.jobs
+package com.twitter.rowz
+package jobs
-import com.twitter.gizzard.jobs.{JsonJobParser, JsonJob}
+import com.twitter.gizzard.scheduler.{JsonJobParser, JsonJob}
import com.twitter.util.Time
-class DestroyParser(findForwarding: Long => RowzShard) extends JsonJobParser {
+class DestroyJobParser(findForwarding: Long => RowzShard) extends JsonJobParser {
def apply(attributes: Map[String, Any]): JsonJob = {
new DestroyJob(
new Row(
attributes("id").asInstanceOf[Long],
attributes("name").asInstanceOf[String],
Time.fromMilliseconds(attributes("createdAt").asInstanceOf[Long]),
Time.fromMilliseconds(attributes("updatedAt").asInstanceOf[Long]),
- State(attributes("state").asInstanceOf[Int])),
- Time.fromMilliseconds(attributes("at").asInstanceOf[Long]))
+ State(attributes("state").asInstanceOf[Int])
+ ),
+ Time.fromMilliseconds(attributes("at").asInstanceOf[Long]),
+ findForwarding
+ )
}
}
@@ -25,7 +29,8 @@ class DestroyJob(row: Row, at: Time, findForwarding: Long => RowzShard) extends
"createdAt" -> row.createdAt.inMilliseconds,
"updatedAt" -> row.updatedAt.inMilliseconds,
"state" -> row.state.id,
- "at" -> at.inMilliseconds)
+ "at" -> at.inMilliseconds
+ )
}
def apply() {
@@ -1,17 +1,30 @@
-package com.twitter.rowz.thrift.conversions
-
-import com.twitter.xrayspecs.Time
-import com.twitter.xrayspecs.TimeConversions._
+package com.twitter.rowz
+package thrift.conversions
+import com.twitter.util.Time
+import com.twitter.conversions.time._
+import com.twitter.rowz
object Row {
class RichShardingRow(row: rowz.Row) {
- def toThrift = new thrift.Row(row.id, row.name, row.createdAt.inSeconds, row.updatedAt.inSeconds, row.state.id)
+ def toThrift = new thrift.Row(
+ row.id,
+ row.name,
+ row.createdAt.inMilliseconds,
+ row.updatedAt.inMilliseconds,
+ row.state.id
+ )
}
implicit def shardingRowToRichShardingRow(row: rowz.Row) = new RichShardingRow(row)
class RichThriftRow(row: thrift.Row) {
- def fromThrift = new rowz.Row(row.id, row.name, Time(row.created_at.seconds), Time(row.updated_at.seconds), State(row.state))
+ def fromThrift = new rowz.Row(
+ row.id,
+ row.name,
+ Time.fromMilliseconds(row.created_at),
+ Time.fromMilliseconds(row.updated_at),
+ State(row.state)
+ )
}
implicit def thriftRowToRichThriftRow(row: thrift.Row) = new RichThriftRow(row)
}
@@ -4,8 +4,8 @@ namespace rb Rowz
struct Row {
1: i64 id
2: string name
- 3: i32 created_at
- 4: i32 updated_at
+ 3: i64 created_at
+ 4: i64 updated_at
5: i32 state
}
@@ -14,7 +14,7 @@ exception RowzException {
}
service Rowz {
- i64 create(1: string name, 2: i32 at) throws(1: RowzException ex)
- void destroy(1: Row row, 2: i32 at) throws(1: RowzException ex)
+ i64 create(1: string name, 2: i64 at) throws(1: RowzException ex)
+ void destroy(1: Row row, 2: i64 at) throws(1: RowzException ex)
Row read(1: i64 id) throws(1: RowzException ex)
}

0 comments on commit b7df89c

Please sign in to comment.