Permalink
Browse files

allow copies to use bulk insert.

  • Loading branch information...
Robey Pointer
Robey Pointer committed Nov 19, 2010
1 parent 26b95e3 commit d111d0912f9bc5e7c30f0253a35777fe5b30604b
Showing with 20 additions and 6 deletions.
  1. +20 −6 src/main/scala/com/twitter/flockdb/shards/SqlShard.scala
@@ -16,10 +16,11 @@
package com.twitter.flockdb.shards
-import java.sql.{ResultSet, SQLException, SQLIntegrityConstraintViolationException}
+import java.sql.{BatchUpdateException, ResultSet, SQLException, SQLIntegrityConstraintViolationException}
import scala.collection.mutable
import com.twitter.gizzard.proxy.SqlExceptionWrappingProxy
import com.twitter.gizzard.shards
+import com.twitter.ostrich.Stats
import com.twitter.querulous.evaluator.{QueryEvaluator, QueryEvaluatorFactory, Transaction}
import com.twitter.querulous.query.{QueryClass, SqlQueryTimeoutException}
import com.twitter.xrayspecs.Time
@@ -345,15 +346,22 @@ class SqlShard(private val queryEvaluator: QueryEvaluator, val shardInfo: shards
if (edge.state == metadata.state) insertedRows else 0
}
- def bulkUnsafeInsertEdges(edges: Seq[Edge]) = {
- if (edges.length > 0) {
+ def bulkUnsafeInsertEdges(edges: Seq[Edge]) {
+ bulkUnsafeInsertEdges(queryEvaluator, State.Normal, edges)
+ }
+
+ def bulkUnsafeInsertEdges(transaction: QueryEvaluator, currentState: State, edges: Seq[Edge]) = {
+ var count = 0
+ if (edges.size > 0) {
val query = "INSERT INTO " + tablePrefix + "_edges (source_id, position, updated_at, destination_id, count, state) VALUES (?, ?, ?, ?, ?, ?)"
- queryEvaluator.executeBatch(query) { batch =>
+ transaction.executeBatch(query) { batch =>
edges.foreach { edge =>
batch(edge.sourceId, edge.position, edge.updatedAt.inSeconds, edge.destinationId, edge.count, edge.state.id)
+ if (edge.state == currentState) count += 1
}
}
}
+ count
}
def bulkUnsafeInsertMetadata(metadatas: Seq[Metadata]) = {
@@ -470,8 +478,14 @@ class SqlShard(private val queryEvaluator: QueryEvaluator, val shardInfo: shards
var countDelta = 0
atomically(currentSourceId) { (transaction, metadata) =>
try {
- burst.foreach { edge =>
- countDelta += writeEdge(transaction, metadata, edge, false)
+ try {
+ countDelta += bulkUnsafeInsertEdges(transaction, metadata.state, burst)
+ } catch {
+ case e: BatchUpdateException =>
+ Stats.incr("x-copy-fallback")
+ burst.foreach { edge =>
+ countDelta += writeEdge(transaction, metadata, edge, false)
+ }
}
} finally {
if (countDelta != 0) {

0 comments on commit d111d09

Please sign in to comment.