Permalink
Browse files

remove delete job, unnecessary.

  • Loading branch information...
1 parent d72530a commit 5bbb669ed1cb8ea7f7690b9fd841afc3960e6e71 @freels freels committed Apr 13, 2011
View
@@ -11,7 +11,7 @@ trait Credentials extends Connection {
import scala.collection.JavaConversions._
val env = System.getenv().toMap
val username = env.getOrElse("DB_USERNAME", "root")
- val password = env.getOrElse("DB_PASSWORD", null)
+ val password = env.getOrElse("DB_PASSWORD", "")
}
class TestQueryEvaluator(label: String) extends QueryEvaluator {
View
@@ -11,7 +11,7 @@ trait Credentials extends Connection {
import scala.collection.JavaConversions._
val env = System.getenv().toMap
val username = env.getOrElse("DB_USERNAME", "root")
- val password = env.getOrElse("DB_PASSWORD", null)
+ val password = env.getOrElse("DB_PASSWORD", "")
}
class TestQueryEvaluator(label: String) extends QueryEvaluator {
@@ -26,15 +26,13 @@ extends GizzardServer[RowzShard](config) {
def findForwarding(id: Long) = nameServer.findCurrentForwarding(0, id)
+
+ // wire up copy job, the rowz sql shard and single job type
val copyPriority = Priority.Medium.id
val copyFactory = new RowzCopyFactory(nameServer, jobScheduler(Priority.Medium.id))
shardRepo += ("SqlShard" -> new SqlShardFactory(config.rowzQueryEvaluator(), config.databaseConnection))
-
- jobCodec += ("Set".r -> new SetJobParser(findForwarding))
- jobCodec += ("Destroy".r -> new DestroyJobParser(findForwarding))
-
-
+ jobCodec += ("Set".r -> new SetJobParser(findForwarding))
// set up the service listener
@@ -1,7 +1,7 @@
package com.twitter.rowz
import com.twitter.gizzard.scheduler.{PrioritizingJobScheduler}
-import jobs.{SetJob, DestroyJob}
+import jobs.SetJob
import com.twitter.util.Time
import com.twitter.conversions.time._
import thrift.conversions.Row._
@@ -10,30 +10,37 @@ import thrift.conversions.Row._
class RowzService(
findForwarding: Long => RowzShard,
scheduler: PrioritizingJobScheduler,
- makeId: () => Long)
+ nextId: () => Long)
extends thrift.Rowz.Iface {
+ def read(id: Long) = {
+ findForwarding(id).read(id).get.toThrift
+ }
+
def create(name: String) = {
val at = Time.now
- val row = Row(makeId(), name, at, at, RowState.Normal)
+ val row = Row(nextId(), name, at, at, RowState.Normal)
+
+ enqueueSet(row)
- scheduler.put(Priority.High.id, new SetJob(row, findForwarding))
row.id
}
def update(row: thrift.Row) {
// set the row's updated_at, to control our own destiny.
row.setUpdated_at(Time.now.inMilliseconds)
- scheduler.put(Priority.High.id, new SetJob(row.fromThrift, findForwarding))
+ enqueueSet(row.fromThrift)
}
def destroy(id: Long) {
val at = Time.now
- scheduler.put(Priority.Low.id, new DestroyJob(id, at, findForwarding))
+ val row = Row(id, "", at, at, RowState.Destroyed)
+
+ enqueueSet(row)
}
- def read(id: Long) = {
- findForwarding(id).read(id).get.toThrift
+ private def enqueueSet(row: Row) {
+ scheduler.put(Priority.High.id, new SetJob(row, findForwarding))
}
}
@@ -13,7 +13,6 @@ trait RowzShard extends shards.Shard {
import RowzShard._
def set(rows: Seq[Row])
- def destroy(id: Long, at: Time)
def read(id: Long): Option[Row]
def selectAll(cursor: Cursor, count: Int): (Seq[Row], Option[Cursor])
@@ -9,7 +9,6 @@ class RowzShardAdapter(shard: ReadWriteShard[RowzShard])
extends ReadWriteShardAdapter(shard) with RowzShard {
def set(rows: Seq[Row]) = shard.writeOperation(_.set(rows))
- def destroy(id: Long, at: Time) = shard.writeOperation(_.destroy(id, at))
def read(id: Long) = shard.readOperation(_.read(id))
def selectAll(cursor: Cursor, count: Int) = shard.readOperation(_.selectAll(cursor, count))
@@ -62,56 +62,44 @@ extends RowzShard {
val updateSql = "UPDATE " + table + " SET id = ?, name = ?, created_at = ?, updated_at = ?, state = ? WHERE updated_at < ?"
def set(rows: Seq[Row]) = {
- rows.foreach(write)
- }
-
- def destroy(id: Long, at: Time) = {
- write(Row(id, "", Time.epoch, at, RowState.Destroyed))
+ rows.foreach { row => write(row) }
}
def read(id: Long) = {
- queryEvaluator.selectOne(readSql, id, RowState.Normal.id)(makeRow)
+ queryEvaluator.selectOne(readSql, id, RowState.Normal.id) { rs => makeRow(rs) }
}
def selectAll(cursor: Cursor, count: Int) = {
- val rows = queryEvaluator.select(selectAllSql, cursor, count + 1)(makeRow)
+ val rows = queryEvaluator.select(selectAllSql, cursor, count + 1) { rs => makeRow(rs) }
val chomped = rows.take(count)
val nextCursor = if (chomped.size < rows.size) Some(chomped.last.id) else None
(chomped, nextCursor)
}
- def write(row: Row) = {
+ def write(row: Row) {
val Row(id, name, createdAt, updatedAt, state) = row
- insertOrUpdate {
- queryEvaluator.execute(
- insertSql,
+ try {
+ queryEvaluator.execute(insertSql,
id,
name,
createdAt.inMilliseconds,
updatedAt.inMilliseconds,
state.id
)
- } {
- queryEvaluator.execute(
- updateSql,
- id,
- name,
- createdAt.inMilliseconds,
- updatedAt.inMilliseconds,
- state.id,
- updatedAt.inMilliseconds
- )
- }
- }
-
- private def insertOrUpdate(f: => Unit)(g: => Unit) {
- try {
- f
} catch {
case e: ShardException => e.getCause match {
- case cause: SQLIntegrityConstraintViolationException => g
+ case cause: SQLIntegrityConstraintViolationException => {
+ queryEvaluator.execute(updateSql,
+ id,
+ name,
+ createdAt.inMilliseconds,
+ updatedAt.inMilliseconds,
+ state.id,
+ updatedAt.inMilliseconds
+ )
+ }
case _ => throw e
}
}
@@ -1,26 +0,0 @@
-package com.twitter.rowz
-package jobs
-
-import com.twitter.gizzard.scheduler.{JsonJobParser, JsonJob}
-import com.twitter.util.Time
-
-
-class DestroyJobParser(findForwarding: Long => RowzShard) extends JsonJobParser {
- def apply(attributes: Map[String, Any]): JsonJob = {
- new DestroyJob(
- attributes("id").asInstanceOf[Long],
- Time.fromMilliseconds(attributes("at").asInstanceOf[Long]),
- findForwarding
- )
- }
-}
-
-class DestroyJob(id: Long, at: Time, findForwarding: Long => RowzShard) extends JsonJob {
- def toMap = {
- Map("id" -> id, "at" -> at.inMilliseconds)
- }
-
- def apply() {
- findForwarding(id).destroy(id, at)
- }
-}
@@ -35,7 +35,7 @@ object SqlShardSpec extends ConfiguredSpecification with JMocker with ClassMocke
"create, destroy then read" in {
sqlShard.set(Seq(row))
- sqlShard.destroy(row.id, row.createdAt + 1.second)
+ sqlShard.set(Seq(row.copy(updatedAt = row.updatedAt + 1.second, state = RowState.Destroyed)))
sqlShard.read(row.id) mustEqual None
}
@@ -45,7 +45,7 @@ object SqlShardSpec extends ConfiguredSpecification with JMocker with ClassMocke
}
"destroy, create, then read" in {
- sqlShard.destroy(row.id, row.createdAt + 1.second)
+ sqlShard.set(Seq(row.copy(updatedAt = row.updatedAt + 1.second, state = RowState.Destroyed)))
sqlShard.set(Seq(row))
sqlShard.read(row.id) mustEqual None
}

0 comments on commit 5bbb669

Please sign in to comment.