Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

if a bulk insert fails in a recoverable way, recover it, and report t…

…he edges that didn't get written. this way, if some succeed (which we occasionally see), we track their counts correctly.
  • Loading branch information...
commit 2dacf2efe321046e2daf46f4146cdc24d28315d0 1 parent 74610f2
Robey Pointer authored
46 src/main/scala/com/twitter/flockdb/shards/SqlShard.scala
View
@@ -86,7 +86,7 @@ CREATE TABLE IF NOT EXISTS %s (
}
-class SqlShard(private val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardInfo,
+class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardInfo,
val weight: Int, val children: Seq[Shard], config: ConfigMap) extends Shard {
val log = Logger.get(getClass.getName)
private val tablePrefix = shardInfo.tablePrefix
@@ -464,28 +464,54 @@ class SqlShard(private val queryEvaluator: QueryEvaluator, val shardInfo: shards
}
}
+ /**
+ * Mysql may throw an exception for a bulk operation that actually partially completed.
+ * If that happens, try to pick up the pieces and indicate what happened.
+ */
+ case class BurstResult(completed: Seq[Edge], failed: Seq[Edge])
+
+ def writeBurst(transaction: Transaction, state: State, edges: Seq[Edge]): BurstResult = {
+ try {
+ val modified = bulkUnsafeInsertEdges(transaction, state, edges)
+ BurstResult(edges, Nil)
+ } catch {
+ case e: BatchUpdateException =>
+ val completed = new mutable.ArrayBuffer[Edge]
+ val failed = new mutable.ArrayBuffer[Edge]
+ e.getUpdateCounts().zip(edges.toArray).foreach { case (errorCode, edge) =>
+ if (errorCode < 0) {
+ failed += edge
+ } else {
+ completed += edge
+ }
+ }
+ BurstResult(completed, failed)
+ }
+ }
+
def writeCopies(edges: Seq[Edge]) {
var remaining = edges
+ val burst = new mutable.ArrayBuffer[Edge]
while (remaining.size > 0) {
- val burst = new mutable.ArrayBuffer[Edge]
+ burst.clear()
val currentSourceId = remaining(0).sourceId
var index = 0
while (remaining.size > index && remaining(index).sourceId == currentSourceId) {
burst += remaining(index)
index += 1
}
+ Stats.addTiming("x-copy-burst", edges.size)
var countDelta = 0
atomically(currentSourceId) { (transaction, metadata) =>
try {
- try {
- countDelta += bulkUnsafeInsertEdges(transaction, metadata.state, burst)
- } catch {
- case e: BatchUpdateException =>
- Stats.incr("x-copy-fallback")
- burst.foreach { edge =>
- countDelta += writeEdge(transaction, metadata, edge, false)
- }
+ val result = writeBurst(transaction, metadata.state, burst)
+ countDelta += result.completed.size
+ if (result.failed.size > 0) {
+ Stats.incr("x-copy-fallback")
+ result.failed.foreach { edge =>
+ countDelta += writeEdge(transaction, metadata, edge, false)
+ }
}
} finally {
if (countDelta != 0) {
63 src/test/scala/com/twitter/flockdb/unit/SqlShardSpec.scala
View
@@ -20,7 +20,7 @@ import java.sql.SQLException
import scala.collection.mutable
import com.twitter.gizzard.shards.{Busy, ShardId, ShardInfo}
import com.twitter.gizzard.thrift.conversions.Sequences._
-import com.twitter.querulous.evaluator.{StandardQueryEvaluatorFactory, QueryEvaluator, QueryEvaluatorFactory}
+import com.twitter.querulous.evaluator.{QueryEvaluator, QueryEvaluatorFactory, StandardQueryEvaluatorFactory, Transaction}
import com.twitter.querulous.query.SqlQueryFactory
import com.twitter.xrayspecs.Time
import com.twitter.xrayspecs.TimeConversions._
@@ -53,13 +53,11 @@ class SqlShardSpec extends ConfiguredSpecification with JMocker with EdgesDataba
var shard: Shard = null
doBefore {
- try {
- Time.reset()
- Time.freeze()
- reset(config.configMap("db"), config("edges.db_name"))
- shardFactory.materialize(shardInfo)
- shard = shardFactory.instantiate(shardInfo, 1, List[Shard]())
- } catch { case e => e.printStackTrace() }
+ Time.reset()
+ Time.freeze()
+ reset(config.configMap("db"), config("edges.db_name"))
+ shardFactory.materialize(shardInfo)
+ shard = shardFactory.instantiate(shardInfo, 1, List[Shard]())
}
"create" in {
@@ -679,13 +677,48 @@ class SqlShardSpec extends ConfiguredSpecification with JMocker with EdgesDataba
new Edge(frank, carl, 5, Time.now, 1, State.Normal) ::
new Edge(frank, darcy, 6, Time.now, 1, State.Normal) ::
Nil
- shard.writeCopies(edges)
- shard.get(alice, bob) mustEqual Some(edges(0))
- shard.get(alice, darcy) mustEqual Some(edges(1))
- shard.get(bob, carl) mustEqual Some(edges(2))
- shard.get(frank, bob) mustEqual Some(edges(3))
- shard.get(frank, carl) mustEqual Some(edges(4))
- shard.get(frank, darcy) mustEqual Some(edges(5))
+
+ "no conflicts" in {
+ shard.writeCopies(edges)
+ shard.get(alice, bob) mustEqual Some(edges(0))
+ shard.get(alice, darcy) mustEqual Some(edges(1))
+ shard.get(bob, carl) mustEqual Some(edges(2))
+ shard.get(frank, bob) mustEqual Some(edges(3))
+ shard.get(frank, carl) mustEqual Some(edges(4))
+ shard.get(frank, darcy) mustEqual Some(edges(5))
+ }
+
+ "conflicts" in {
+ shard.add(frank, carl, 5, Time.now)
+ shard.writeCopies(edges)
+ shard.get(alice, bob) mustEqual Some(edges(0))
+ shard.get(alice, darcy) mustEqual Some(edges(1))
+ shard.get(bob, carl) mustEqual Some(edges(2))
+ shard.get(frank, bob) mustEqual Some(edges(3))
+ shard.get(frank, carl) must beSome[Edge]
+ shard.get(frank, darcy) mustEqual Some(edges(5))
+ }
+
+ "retries edges that failed a bulk-insert" in {
+ val stubShard = new SqlShard(queryEvaluator, shardInfo, 0, Nil, config) {
+ override def writeBurst(transaction: Transaction, state: State, edges: Seq[Edge]) = {
+ val completed = new mutable.ArrayBuffer[Edge]
+ val failed = new mutable.ArrayBuffer[Edge]
+ edges.foreach { edge =>
+ if (edge.destinationId == darcy) {
+ failed += edge
+ } else {
+ completed += edge
+ }
+ }
+ BurstResult(completed, failed)
+ }
+ }
+
+ stubShard.writeCopies(edges)
+ shard.get(alice, darcy) mustEqual Some(edges(1))
+ shard.get(frank, darcy) mustEqual Some(edges(5))
+ }
}
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.