Permalink
Browse files

insert or insert-ignore strategy for edges

  • Loading branch information...
1 parent 3cdc483 commit 285b095872c4d254a640e53b9324ece7120aebea @freels freels committed Jan 8, 2011
Showing with 30 additions and 27 deletions.
  1. +30 −27 src/main/scala/com/twitter/flockdb/shards/SqlShard.scala
@@ -192,12 +192,6 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
(edges, returnedCursor)
}
- private def existingMetadata(ids: Collection[Long]): Seq[Long] = {
- queryEvaluator.select("SELECT source_id FROM " + tablePrefix + "_metadata WHERE source_id IN (?)", ids.toList) { row =>
- row.getLong("source_id")
- }
- }
-
private def statePriority(state: String): String = "-IF(" + state + "=0, 4, " + state + ")"
private def initializeMetadata(queryEvaluator: QueryEvaluator, sourceIds: Set[Long]): Unit = {
@@ -222,8 +216,18 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
if (!edges.isEmpty) {
// FIXME: WTF DIY SQL
val values = edges.map{ edge => "(" + edge.sourceId + ", " + edge.destinationId + ", 0, 0, "+edge.position+", -1)"}.mkString(",")
- val query = "INSERT IGNORE INTO " + tablePrefix + "_edges (source_id, destination_id, updated_at, count, position, state) VALUES " + values
- queryEvaluator.execute(query)
+
+ val query = if (edges.size == 1) {
+ "INSERT INTO " + tablePrefix + "_edges (source_id, destination_id, updated_at, count, position, state) VALUES " + values
+ } else {
+ "INSERT IGNORE INTO " + tablePrefix + "_edges (source_id, destination_id, updated_at, count, position, state) VALUES " + values
+ }
+
+ try {
+ queryEvaluator.execute(query)
+ } catch {
+ case e: SQLIntegrityConstraintViolationException => () // ignore duplicate key exception
+ }
}
}
@@ -242,26 +246,25 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
// Initialize metadata
initializeMetadata(queryEvaluator, Set(edges.map(_.sourceId): _*))
- queryEvaluator.transaction { transaction =>
-
- // Initialize edges
- initializeEdges(transaction, edges)
-
- val query = "UPDATE " + tablePrefix + "_metadata AS metadata, " + tablePrefix + "_edges AS edges " +
- "SET " +
- " metadata.count = metadata.count + " + incr("?") + "," +
- " edges.state = ?, " +
- " edges.position = IF(metadata.state = 1, ?, edges.position), " +
- " edges.updated_at = ? " +
- "WHERE (edges.updated_at < ? OR (edges.updated_at = ? AND " +
- "(" + statePriority("edges.state") + " < " + statePriority("?") + ")))" +
- " AND edges.source_id = ? " +
- " AND edges.destination_id = ? " +
- " AND metadata.source_id = ? "
- edges.foreach { edge =>
- transaction.execute(query, edge.state.id, edge.state.id, edge.state.id, edge.position, edge.updatedAt.inSeconds, edge.updatedAt.inSeconds, edge.updatedAt.inSeconds, edge.state.id, edge.state.id, edge.sourceId, edge.destinationId, edge.sourceId)
- }
+ // Initialize edges
+ initializeEdges(queryEvaluator, edges)
+
+ val query = "UPDATE " + tablePrefix + "_metadata AS metadata, " + tablePrefix + "_edges AS edges " +
+ "SET " +
+ " metadata.count = metadata.count + " + incr("?") + "," +
+ " edges.state = ?, " +
+ " edges.position = IF(metadata.state = 1, ?, edges.position), " +
+ " edges.updated_at = ? " +
+ "WHERE (edges.updated_at < ? OR (edges.updated_at = ? AND " +
+ "(" + statePriority("edges.state") + " < " + statePriority("?") + ")))" +
+ " AND edges.source_id = ? " +
+ " AND edges.destination_id = ? " +
+ " AND metadata.source_id = ? "
+ edges.foreach { edge =>
+ queryEvaluator.execute(query, edge.state.id, edge.state.id, edge.state.id, edge.position, edge.updatedAt.inSeconds, edge.updatedAt.inSeconds, edge.updatedAt.inSeconds, edge.state.id, edge.state.id, edge.sourceId, edge.destinationId, edge.sourceId)
}
+
+ Stats.incr("edges-written", edges.size)
} catch {
case e: MySQLTransactionRollbackException if (tries > 0) =>
write(edges, tries - 1)

0 comments on commit 285b095

Please sign in to comment.