Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

jobs

  • Loading branch information...
commit 7c1423038d52867e57ded82656f44497702c8e0f 1 parent 5c6f991
Nick Kallen authored
View
30 src/main/scala/com/twitter/rowz/SqlShard.scala
@@ -1,5 +1,6 @@
package com.twitter.rowz
+import java.sql.SQLIntegrityConstraintViolationException
import com.twitter.querulous.evaluator.{QueryEvaluatorFactory, QueryEvaluator}
import net.lag.configgy.ConfigMap
import com.twitter.gizzard.shards
@@ -51,19 +52,30 @@ class SqlShard(private val queryEvaluator: QueryEvaluator, val shardInfo: shards
private val table = shardInfo.tablePrefix + "_rowz"
- def create(id: Long, name: String, at: Time) = {
- queryEvaluator.execute("INSERT INTO " + table + " (id, name, created_at, updated_at, state) VALUES (?, ?, ?, ?, ?)",
- id, name, at.inSeconds, at.inSeconds, State.Normal.id)
- }
-
- def destroy(row: Row, at: Time) = {
- queryEvaluator.execute("UPDATE " + table + " SET updated_at = ?, state = ?",
- at.inSeconds, State.Destroyed.id)
- }
+ def create(id: Long, name: String, at: Time) = write(id, name, at, State.Normal, at)
+ def destroy(row: Row, at: Time) = write(row.id, row.name, row.createdAt, State.Destroyed, at)
def read(id: Long) = {
queryEvaluator.selectOne("SELECT * FROM " + table + " WHERE id = ? AND state = ?", id, State.Normal.id) { row =>
new Row(row.getLong("id"), row.getString("name"), Time(row.getLong("created_at").seconds))
}
}
+
+ private def write(id: Long, name: String, createdAt: Time, state: State.Value, at: Time) = {
+ insertOrUpdate {
+ queryEvaluator.execute("INSERT INTO " + table + " (id, name, created_at, updated_at, state) VALUES (?, ?, ?, ?, ?)",
+ id, name, createdAt.inSeconds, at.inSeconds, state.id)
+ } {
+ queryEvaluator.execute("UPDATE " + table + " SET id = ?, name = ?, created_at = ?, updated_at = ?, state = ? WHERE updated_at < ?",
+ id, name, createdAt.inSeconds, at.inSeconds, state.id, at.inSeconds)
+ }
+ }
+
+ private def insertOrUpdate(f: => Unit)(g: => Unit) {
+ try {
+ f
+ } catch {
+ case e: SQLIntegrityConstraintViolationException => g
+ }
+ }
}
View
4 src/main/scala/com/twitter/rowz/jobs/Create.scala
@@ -9,5 +9,7 @@ case class Create(id: Long, name: String, at: Time) extends UnboundJob[Forwardin
Map("id" -> id, "name" -> name, "at" -> at.inSeconds)
}
- def apply(forwardingManager: ForwardingManager) = ()
+ def apply(forwardingManager: ForwardingManager) = {
+ forwardingManager(row.id).create(row.id, row.name, at)
+ }
}
View
4 src/main/scala/com/twitter/rowz/jobs/Destroy.scala
@@ -10,5 +10,7 @@ case class Destroy(row: Row, at: Time) extends UnboundJob[ForwardingManager] {
Map("id" -> row.id, "name" -> row.name, "createdAt" -> row.createdAt.inSeconds, "at" -> at.inSeconds)
}
- def apply(forwardingManager: ForwardingManager) = ()
+ def apply(forwardingManager: ForwardingManager) = {
+ forwardingManager(row.id).destroy(row, at)
+ }
}
View
13 src/test/scala/com/twitter/rowz/SqlShardSpec.scala
@@ -16,6 +16,7 @@ object SqlShard extends Specification with JMocker with ClassMocker {
"table_001", "localhost", "INT UNSIGNED", "INT UNSIGNED", Busy.Normal, 1)
val sqlShard = shardFactory.instantiate(shardInfo, 1, List[Shard]())
val queryEvaluator = queryEvaluatorFactory(shardInfo.hostname, null, config("rowz.db.username"), config("rowz.db.password"))
+ val row = new Row(1, "a row", Time.now)
doBefore {
queryEvaluator.execute("DROP DATABASE IF EXISTS " + config("rowz.db.name"))
@@ -23,13 +24,11 @@ object SqlShard extends Specification with JMocker with ClassMocker {
}
"create & read" in {
- val row = new Row(1, "a row", Time.now)
sqlShard.create(row.id, row.name, row.createdAt)
sqlShard.read(row.id) mustEqual Some(row)
}
"create, destroy then read" in {
- val row = new Row(1, "a row", Time.now)
sqlShard.create(row.id, row.name, row.createdAt)
sqlShard.destroy(row, row.createdAt + 1.second)
sqlShard.read(row.id) mustEqual None
@@ -37,15 +36,13 @@ object SqlShard extends Specification with JMocker with ClassMocker {
"idempotent" in {
"read a nonexistent row" in {
-
- }
-
- "destroy a nonexistent row" in {
-
+ sqlShard.read(row.id) mustEqual None
}
"destroy, create, then read" in {
-
+ sqlShard.destroy(row, row.createdAt)
+ sqlShard.create(row.id, row.name, row.createdAt - 1.second)
+ sqlShard.read(row.id) mustEqual None
}
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.