Skip to content

Commit

Permalink
hack n slash at this point
Browse files Browse the repository at this point in the history
  • Loading branch information
freels committed Apr 12, 2011
1 parent ce8fe24 commit b7df89c
Show file tree
Hide file tree
Showing 13 changed files with 91 additions and 56 deletions.
2 changes: 1 addition & 1 deletion src/main/scala/com/twitter/rowz/IdGenerator.scala
@@ -1,6 +1,6 @@
package com.twitter.rowz

import com.twitter.xrayspecs.Time
import com.twitter.util.Time


class IdGenerator(workerId: Long) extends (() => Long) {
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/twitter/rowz/Row.scala
@@ -1,6 +1,6 @@
package com.twitter.rowz

import com.twitter.xrayspecs.Time
import com.twitter.util.Time


case class Row(id: Long, name: String, createdAt: Time, updatedAt: Time, state: State.Value)
case class Row(id: Long, name: String, createdAt: Time, updatedAt: Time, state: State.Value)
31 changes: 18 additions & 13 deletions src/main/scala/com/twitter/rowz/Rowz.scala
Expand Up @@ -4,40 +4,45 @@ import com.twitter.gizzard.scheduler._
import com.twitter.gizzard.nameserver
import com.twitter.gizzard.GizzardServer

import com.twitter.rowz.jobs._

object Priority extends Enumeration {
val High, Medium, Low = Value
}

class Rowz(config: com.twitter.rowz.config.Rowz) extends GizzardServer[RowzShard](config) {
class Rowz(config: com.twitter.rowz.config.Rowz)
extends GizzardServer[RowzShard](config) {

// define a factory for Rowz's ReadWriteShardAdapter
val readWriteShardAdapter = { s => new RowzShardAdapter(s) }
val readWriteShardAdapter = new RowzShardAdapter(_)

val jobPriorities = List(Priority.High.id, Priority.Medium.id, Priority.Low.id)

val copyPriority = Priority.Medium.id
val copyFactory = new RowzCopyFactory(nameServer, jobScheduler(Priority.Medium.id))

shardRepo += ("RowzShard" -> new RowzShardFactory(config.queryEvaluator(), config.databaseConnection))

jobCodec += ("Ceate".r -> new CreateJobParser())
jobCodec += ("Destroy".r -> new DestroyJobParser())
// create the id generator

def idGenerator = new IdGenerator(config.nodeId)

// curry findCurrentForwarding to pass to the service and job factories.

def findForwarding(id: Long) = nameServer.findCurrentForwarding(0, id)

// create the id generator
val copyPriority = Priority.Medium.id
val copyFactory = new RowzCopyFactory(nameServer, jobScheduler(Priority.Medium.id))

shardRepo += ("RowzShard" -> new SqlShardFactory(config.rowzQueryEvaluator(), config.databaseConnection))

jobCodec += ("Ceate".r -> new CreateJobParser(findForwarding))
jobCodec += ("Destroy".r -> new DestroyJobParser(findForwarding))



def idGenerator = new IdGenerator(config.idGenWorkerId)

// set up the service listener

val rowzService = new RowzService(findForwarding, jobScheduler, idGenerator)

lazy val rowzThriftServer = {
val processor = new thrift.TestServer.Processor(rowzService)
val processor = new thrift.Rowz.Processor(rowzService)
config.server(processor)
}

Expand All @@ -46,7 +51,7 @@ class Rowz(config: com.twitter.rowz.config.Rowz) extends GizzardServer[RowzShard
new Thread(new Runnable { def run() { rowzThriftServer.serve() } }, "RowzServerThread").start()
}

def shutdown(quiesce: Boolean) {
def shutdown(quiesce: Boolean = false) {
rowzThriftServer.stop()
shutdownGizzard(quiesce)
}
Expand Down
24 changes: 14 additions & 10 deletions src/main/scala/com/twitter/rowz/RowzService.scala
@@ -1,22 +1,26 @@
package com.twitter.rowz

import net.lag.configgy.Config
import com.twitter.gizzard.scheduler.{PrioritizingJobScheduler, Priority}
import jobs.{Create, Destroy}
import com.twitter.xrayspecs.Time
import com.twitter.xrayspecs.TimeConversions._
import com.twitter.gizzard.scheduler.{PrioritizingJobScheduler}
import jobs.{CreateJob, DestroyJob}
import com.twitter.util.Time
import com.twitter.conversions.time._
import thrift.conversions.Row._


class RowzService(findForwarding: Long => RowzShard, scheduler: PrioritizingJobScheduler, makeId: () => Long) extends thrift.Rowz.Iface {
def create(name: String, at: Int) = {
class RowzService(
findForwarding: Long => RowzShard,
scheduler: PrioritizingJobScheduler,
makeId: () => Long)
extends thrift.Rowz.Iface {

def create(name: String, at: Long) = {
val id = makeId()
scheduler(Priority.High.id)(new Create(id, name, Time(at.seconds)))
scheduler.put(Priority.High.id, new CreateJob(id, name, Time.fromMilliseconds(at), findForwarding))
id
}

def destroy(row: thrift.Row, at: Int) {
scheduler(Priority.Low.id)(new Destroy(row.fromThrift, Time(at.seconds)))
def destroy(row: thrift.Row, at: Long) {
scheduler.put(Priority.Low.id, new DestroyJob(row.fromThrift, Time.fromMilliseconds(at), findForwarding))
}

def read(id: Long) = {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/twitter/rowz/RowzShard.scala
@@ -1,7 +1,7 @@
package com.twitter.rowz

import com.twitter.gizzard.shards
import com.twitter.xrayspecs.Time
import com.twitter.util.Time


object RowzShard {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/twitter/rowz/RowzShardAdapter.scala
Expand Up @@ -5,7 +5,7 @@ import com.twitter.util.Time
import RowzShard.Cursor


class RowzShardAdapter(shard: ReadWriteShard[Shard])
class RowzShardAdapter(shard: ReadWriteShard[RowzShard])
extends ReadWriteShardAdapter(shard) with RowzShard {

def create(id: Long, name: String, at: Time) = shard.writeOperation(_.create(id, name, at))
Expand Down
15 changes: 9 additions & 6 deletions src/main/scala/com/twitter/rowz/SqlShard.scala
Expand Up @@ -4,12 +4,14 @@ import java.sql.SQLException
import java.sql.{SQLIntegrityConstraintViolationException, ResultSet}
import com.twitter.querulous.evaluator.{QueryEvaluatorFactory, QueryEvaluator}
import com.twitter.querulous.query.SqlQueryTimeoutException
import com.twitter.gizzard.shards.{ShardException, ShardInfo}
import com.twitter.querulous.config.Connection
import com.twitter.gizzard.shards.{ShardException, ShardTimeoutException, ShardInfo, ShardFactory}
import com.twitter.gizzard.proxy.SqlExceptionWrappingProxy
import com.twitter.util.Time


class SqlShardFactory(qeFactory: QueryEvaluatorFactory, conn: Connection)
extends shards.ShardFactory[Shard] {
extends ShardFactory[RowzShard] {

val TABLE_DDL = """
CREATE TABLE IF NOT EXISTS %s (
Expand All @@ -27,14 +29,14 @@ CREATE TABLE IF NOT EXISTS %s (
new SqlShard(queryEvaluator, shardInfo, weight, children)
}

def materialize(shardInfo: shards.ShardInfo) = {
def materialize(shardInfo: ShardInfo) = {
try {
val evaluator = qeFactory(connection.withHost(shardInfo.hostname).withoutDatabase)
evaluator.execute("CREATE DATABASE IF NOT EXISTS " + conn.database)
evaluator.execute(ddl.format(conn.database +"."+ info.tablePrefix))
} catch {
case e: SQLException => throw new shards.ShardException(e.toString)
case e: SqlQueryTimeoutException => throw new shards.ShardTimeoutException
case e: SQLException => throw new ShardException(e.toString)
case e: SqlQueryTimeoutException => throw new ShardTimeoutException
}
}
}
Expand All @@ -44,7 +46,8 @@ class SqlShard(
queryEvaluator: QueryEvaluator,
val shardInfo: shards.ShardInfo,
val weight: Int,
val children: Seq[RowzShard]) extends RowzShard {
val children: Seq[RowzShard])
extends RowzShard {

private val table = shardInfo.tablePrefix

Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/com/twitter/rowz/config/Rowz.scala
Expand Up @@ -17,11 +17,13 @@ trait RowzThriftServer extends TServer {
var port = 7919
}

trait Rowz {
trait Rowz extends GizzardServer {
def server: RowzThriftServer

def databaseConnection: Connection
def rowzQueryEvaluator: QueryEvaluator

def nodeId: Int

def admin: AdminService
}
6 changes: 4 additions & 2 deletions src/main/scala/com/twitter/rowz/jobs/Copy.scala
@@ -1,8 +1,10 @@
package com.twitter.rowz.jobs
package com.twitter.rowz
package jobs

import com.twitter.gizzard.nameserver.NameServer
import com.twitter.gizzard.shards.ShardId
import com.twitter.gizzard.scheduler._
import com.twitter.rowz.RowzShard.Cursor


class RowzCopyFactory(nameServer: NameServer[RowzShard], scheduler: JobScheduler, defaultCount: Int = 500)
Expand All @@ -25,7 +27,7 @@ extends CopyJobParser[RowzShard] {
class RowzCopyJob(
sourceId: ShardId,
destinationId: ShardId,
cursor: Int,
cursor: Cursor,
count: Int,
nameServer: NameServer[RowzShard],
scheduler: JobScheduler)
Expand Down
7 changes: 4 additions & 3 deletions src/main/scala/com/twitter/rowz/jobs/Create.scala
@@ -1,10 +1,11 @@
package com.twitter.rowz.jobs
package com.twitter.rowz
package jobs

import com.twitter.gizzard.jobs.{JsonJobParser, JsonJob}
import com.twitter.gizzard.scheduler.{JsonJobParser, JsonJob}
import com.twitter.util.Time


class CreateParser(findForwarding: Long => RowzShard) extends JsonJobParser {
class CreateJobParser(findForwarding: Long => RowzShard) extends JsonJobParser {
def appy(attributes: Map[String, Any]): JsonJob = {
new CreateJob(
attributes("id").asInstanceOf[Long],
Expand Down
17 changes: 11 additions & 6 deletions src/main/scala/com/twitter/rowz/jobs/Destroy.scala
@@ -1,19 +1,23 @@
package com.twitter.rowz.jobs
package com.twitter.rowz
package jobs

import com.twitter.gizzard.jobs.{JsonJobParser, JsonJob}
import com.twitter.gizzard.scheduler.{JsonJobParser, JsonJob}
import com.twitter.util.Time


class DestroyParser(findForwarding: Long => RowzShard) extends JsonJobParser {
class DestroyJobParser(findForwarding: Long => RowzShard) extends JsonJobParser {
def apply(attributes: Map[String, Any]): JsonJob = {
new DestroyJob(
new Row(
attributes("id").asInstanceOf[Long],
attributes("name").asInstanceOf[String],
Time.fromMilliseconds(attributes("createdAt").asInstanceOf[Long]),
Time.fromMilliseconds(attributes("updatedAt").asInstanceOf[Long]),
State(attributes("state").asInstanceOf[Int])),
Time.fromMilliseconds(attributes("at").asInstanceOf[Long]))
State(attributes("state").asInstanceOf[Int])
),
Time.fromMilliseconds(attributes("at").asInstanceOf[Long]),
findForwarding
)
}
}

Expand All @@ -25,7 +29,8 @@ class DestroyJob(row: Row, at: Time, findForwarding: Long => RowzShard) extends
"createdAt" -> row.createdAt.inMilliseconds,
"updatedAt" -> row.updatedAt.inMilliseconds,
"state" -> row.state.id,
"at" -> at.inMilliseconds)
"at" -> at.inMilliseconds
)
}

def apply() {
Expand Down
25 changes: 19 additions & 6 deletions src/main/scala/com/twitter/rowz/thrift/conversions/Row.scala
@@ -1,17 +1,30 @@
package com.twitter.rowz.thrift.conversions

import com.twitter.xrayspecs.Time
import com.twitter.xrayspecs.TimeConversions._
package com.twitter.rowz
package thrift.conversions

import com.twitter.util.Time
import com.twitter.conversions.time._
import com.twitter.rowz

object Row {
class RichShardingRow(row: rowz.Row) {
def toThrift = new thrift.Row(row.id, row.name, row.createdAt.inSeconds, row.updatedAt.inSeconds, row.state.id)
def toThrift = new thrift.Row(
row.id,
row.name,
row.createdAt.inMilliseconds,
row.updatedAt.inMilliseconds,
row.state.id
)
}
implicit def shardingRowToRichShardingRow(row: rowz.Row) = new RichShardingRow(row)

class RichThriftRow(row: thrift.Row) {
def fromThrift = new rowz.Row(row.id, row.name, Time(row.created_at.seconds), Time(row.updated_at.seconds), State(row.state))
def fromThrift = new rowz.Row(
row.id,
row.name,
Time.fromMilliseconds(row.created_at),
Time.fromMilliseconds(row.updated_at),
State(row.state)
)
}
implicit def thriftRowToRichThriftRow(row: thrift.Row) = new RichThriftRow(row)
}
8 changes: 4 additions & 4 deletions src/main/thrift/Rowz.thrift
Expand Up @@ -4,8 +4,8 @@ namespace rb Rowz
struct Row {
1: i64 id
2: string name
3: i32 created_at
4: i32 updated_at
3: i64 created_at
4: i64 updated_at
5: i32 state
}

Expand All @@ -14,7 +14,7 @@ exception RowzException {
}

service Rowz {
i64 create(1: string name, 2: i32 at) throws(1: RowzException ex)
void destroy(1: Row row, 2: i32 at) throws(1: RowzException ex)
i64 create(1: string name, 2: i64 at) throws(1: RowzException ex)
void destroy(1: Row row, 2: i64 at) throws(1: RowzException ex)
Row read(1: i64 id) throws(1: RowzException ex)
}

0 comments on commit b7df89c

Please sign in to comment.