Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
insert or insert-ignore strategy for edges
Browse files Browse the repository at this point in the history
  • Loading branch information
freels committed Jan 8, 2011
1 parent 3cdc483 commit 285b095
Showing 1 changed file with 30 additions and 27 deletions.
57 changes: 30 additions & 27 deletions src/main/scala/com/twitter/flockdb/shards/SqlShard.scala
Expand Up @@ -192,12 +192,6 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
(edges, returnedCursor)
}

private def existingMetadata(ids: Collection[Long]): Seq[Long] = {
queryEvaluator.select("SELECT source_id FROM " + tablePrefix + "_metadata WHERE source_id IN (?)", ids.toList) { row =>
row.getLong("source_id")
}
}

private def statePriority(state: String): String = "-IF(" + state + "=0, 4, " + state + ")"

private def initializeMetadata(queryEvaluator: QueryEvaluator, sourceIds: Set[Long]): Unit = {
Expand All @@ -222,8 +216,18 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
if (!edges.isEmpty) {
// FIXME: WTF DIY SQL
val values = edges.map{ edge => "(" + edge.sourceId + ", " + edge.destinationId + ", 0, 0, "+edge.position+", -1)"}.mkString(",")
val query = "INSERT IGNORE INTO " + tablePrefix + "_edges (source_id, destination_id, updated_at, count, position, state) VALUES " + values
queryEvaluator.execute(query)

val query = if (edges.size == 1) {
"INSERT INTO " + tablePrefix + "_edges (source_id, destination_id, updated_at, count, position, state) VALUES " + values
} else {
"INSERT IGNORE INTO " + tablePrefix + "_edges (source_id, destination_id, updated_at, count, position, state) VALUES " + values
}

try {
queryEvaluator.execute(query)
} catch {
case e: SQLIntegrityConstraintViolationException => () // ignore duplicate key exception
}
}
}

Expand All @@ -242,26 +246,25 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
// Initialize metadata
initializeMetadata(queryEvaluator, Set(edges.map(_.sourceId): _*))

queryEvaluator.transaction { transaction =>

// Initialize edges
initializeEdges(transaction, edges)

val query = "UPDATE " + tablePrefix + "_metadata AS metadata, " + tablePrefix + "_edges AS edges " +
"SET " +
" metadata.count = metadata.count + " + incr("?") + "," +
" edges.state = ?, " +
" edges.position = IF(metadata.state = 1, ?, edges.position), " +
" edges.updated_at = ? " +
"WHERE (edges.updated_at < ? OR (edges.updated_at = ? AND " +
"(" + statePriority("edges.state") + " < " + statePriority("?") + ")))" +
" AND edges.source_id = ? " +
" AND edges.destination_id = ? " +
" AND metadata.source_id = ? "
edges.foreach { edge =>
transaction.execute(query, edge.state.id, edge.state.id, edge.state.id, edge.position, edge.updatedAt.inSeconds, edge.updatedAt.inSeconds, edge.updatedAt.inSeconds, edge.state.id, edge.state.id, edge.sourceId, edge.destinationId, edge.sourceId)
}
// Initialize edges
initializeEdges(queryEvaluator, edges)

val query = "UPDATE " + tablePrefix + "_metadata AS metadata, " + tablePrefix + "_edges AS edges " +
"SET " +
" metadata.count = metadata.count + " + incr("?") + "," +
" edges.state = ?, " +
" edges.position = IF(metadata.state = 1, ?, edges.position), " +
" edges.updated_at = ? " +
"WHERE (edges.updated_at < ? OR (edges.updated_at = ? AND " +
"(" + statePriority("edges.state") + " < " + statePriority("?") + ")))" +
" AND edges.source_id = ? " +
" AND edges.destination_id = ? " +
" AND metadata.source_id = ? "
edges.foreach { edge =>
queryEvaluator.execute(query, edge.state.id, edge.state.id, edge.state.id, edge.position, edge.updatedAt.inSeconds, edge.updatedAt.inSeconds, edge.updatedAt.inSeconds, edge.state.id, edge.state.id, edge.sourceId, edge.destinationId, edge.sourceId)
}

Stats.incr("edges-written", edges.size)
} catch {
case e: MySQLTransactionRollbackException if (tries > 0) =>
write(edges, tries - 1)
Expand Down

0 comments on commit 285b095

Please sign in to comment.