Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
eaceaser committed Jul 14, 2011
1 parent d795fc7 commit a8f8a25
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 30 deletions.
4 changes: 2 additions & 2 deletions config/development.scala
Expand Up @@ -49,8 +49,8 @@ class ProductionNameServerReplica(host: String) extends Mysql {
QueryClass.SelectModify -> QueryTimeout(3.seconds),
QueryClass.SelectSingle -> QueryTimeout(1.second),
QueryClass.SelectIntersection -> QueryTimeout(1.second),
QueryClass.SelectMetadata -> QueryTimeout(1.second),
QueryClass.SelectMetadataIntersection -> QueryTimeout(1.second)
QueryClass.SelectIntersectionSmall -> QueryTimeout(1.second),
QueryClass.SelectMetadata -> QueryTimeout(1.second)
)
}
}
Expand Down
4 changes: 2 additions & 2 deletions config/production.scala
Expand Up @@ -47,8 +47,8 @@ class ProductionNameServerReplica(host: String) extends Mysql {
QueryClass.SelectModify -> QueryTimeout(3.seconds),
QueryClass.SelectSingle -> QueryTimeout(1.second),
QueryClass.SelectIntersection -> QueryTimeout(1.second),
QueryClass.SelectMetadata -> QueryTimeout(1.second),
QueryClass.SelectMetadataIntersection -> QueryTimeout(1.second)
QueryClass.SelectIntersectionSmall -> QueryTimeout(1.second),
QueryClass.SelectMetadata -> QueryTimeout(1.second)
)
}
}
Expand Down
4 changes: 2 additions & 2 deletions config/test.scala
Expand Up @@ -50,8 +50,8 @@ class TestQueryEvaluator(label: String) extends QueryEvaluator {
QueryClass.Execute -> QueryTimeout(5.seconds),
QueryClass.SelectSingle -> QueryTimeout(100.millis),
QueryClass.SelectIntersection -> QueryTimeout(100.millis),
QueryClass.SelectMetadata -> QueryTimeout(100.millis),
QueryClass.SelectMetadataIntersection -> QueryTimeout(100.millis)
QueryClass.SelectIntersectionSmall -> QueryTimeout(100.millis),
QueryClass.SelectMetadata -> QueryTimeout(100.millis)
)

override def apply(stats: StatsCollector, dbStatsFactory: Option[DatabaseFactory => DatabaseFactory], queryStatsFactory: Option[QueryFactory => QueryFactory]) = {
Expand Down
Expand Up @@ -77,8 +77,8 @@ class SelectCompiler(forwardingManager: ForwardingManager, intersectionConfig: c
val name = if (complexity > 0) {
"select-complex-"+complexity
} else {
"select-" + (rv match {
case query: WhereInQuery => if (query.sizeEstimate() == 1) "single" else "simple"
"select" + (rv match {
case query: WhereInQuery => if (query.sizeEstimate() == 1) "-single" else "-simple"
case query: SimpleQuery => if (program.head.term.get.states.size > 1) "-multistate" else ""
})
}
Expand Down
Expand Up @@ -36,7 +36,6 @@ class ReadWriteShardAdapter(shard: shards.ReadWriteShard[Shard])
def selectAllMetadata(cursor: Cursor, count: Int) = shard.readOperation(_.selectAllMetadata(cursor, count))
def get(sourceId: Long, destinationId: Long) = shard.readOperation(_.get(sourceId, destinationId))
def count(sourceId: Long, states: Seq[State]) = shard.readOperation(_.count(sourceId, states))
def counts(sourceIds: Seq[Long], results: mutable.Map[Long, Int]) = shard.readOperation(_.counts(sourceIds, results))

def bulkUnsafeInsertEdges(edges: Seq[Edge]) = shard.writeOperation(_.bulkUnsafeInsertEdges(edges))
def bulkUnsafeInsertMetadata(metadata: Seq[Metadata]) = shard.writeOperation(_.bulkUnsafeInsertMetadata(metadata))
Expand Down
1 change: 0 additions & 1 deletion src/main/scala/com/twitter/flockdb/shards/Shard.scala
Expand Up @@ -35,7 +35,6 @@ trait Shard extends shards.Shard {
@throws(classOf[shards.ShardException]) def optimistically(sourceId: Long)(f: State => Unit)

@throws(classOf[shards.ShardException]) def count(sourceId: Long, states: Seq[State]): Int
@throws(classOf[shards.ShardException]) def counts(sourceIds: Seq[Long], results: mutable.Map[Long, Int])

@throws(classOf[shards.ShardException]) def selectAll(cursor: (Cursor, Cursor), count: Int): (Seq[Edge], (Cursor, Cursor))
@throws(classOf[shards.ShardException]) def selectAllMetadata(cursor: Cursor, count: Int): (Seq[Metadata], Cursor)
Expand Down
24 changes: 13 additions & 11 deletions src/main/scala/com/twitter/flockdb/shards/SqlShard.scala
Expand Up @@ -40,8 +40,8 @@ object QueryClass {
val SelectModify = QuerulousQueryClass("select_modify")
val SelectCopy = QuerulousQueryClass("select_copy")
val SelectIntersection = QuerulousQueryClass("select_intersection")
val SelectIntersectionSmall = QuerulousQueryClass("select_intersection_small")
val SelectMetadata = QuerulousQueryClass("select_metadata")
val SelectMetadataIntersection = QuerulousQueryClass("select_metadata_intersection")
}

object FlockExceptionWrappingProxyFactory extends SqlExceptionWrappingProxyFactory[Shard]
Expand Down Expand Up @@ -110,7 +110,7 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val lowLatencyQueryEvaluator:
}

def getMetadata(sourceId: Long): Option[Metadata] = {
queryEvaluator.selectOne(SelectMetadata, "SELECT * FROM " + tablePrefix + "_metadata WHERE source_id = ?", sourceId) { row =>
lowLatencyQueryEvaluator.selectOne(SelectMetadata, "SELECT * FROM " + tablePrefix + "_metadata WHERE source_id = ?", sourceId) { row =>
new Metadata(sourceId, State(row.getInt("state")), row.getInt("count"), Time.fromSeconds(row.getInt("updated_at")))
}
}
Expand Down Expand Up @@ -139,7 +139,7 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val lowLatencyQueryEvaluator:
}

def count(sourceId: Long, states: Seq[State]): Int = {
queryEvaluator.selectOne(SelectMetadata, "SELECT state, `count` FROM " + tablePrefix + "_metadata WHERE source_id = ?", sourceId) { row =>
lowLatencyQueryEvaluator.selectOne(SelectMetadata, "SELECT state, `count` FROM " + tablePrefix + "_metadata WHERE source_id = ?", sourceId) { row =>
states.foldLeft(0) { (result, state) =>
result + (if (state == State(row.getInt("state"))) row.getInt("count") else 0)
}
Expand All @@ -149,12 +149,6 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val lowLatencyQueryEvaluator:
}
}

def counts(sourceIds: Seq[Long], results: mutable.Map[Long, Int]) {
queryEvaluator.select(SelectMetadataIntersection, "SELECT source_id, `count` FROM " + tablePrefix + "_metadata WHERE source_id IN (?)", sourceIds) { row =>
results(row.getLong("source_id")) = row.getInt("count")
}
}

private def populateMetadata(sourceId: Long, state: State) { populateMetadata(sourceId, state, Time.epoch) }

private def populateMetadata(sourceId: Long, state: State, updatedAt: Time) {
Expand Down Expand Up @@ -285,7 +279,11 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val lowLatencyQueryEvaluator:

def intersect(sourceId: Long, states: Seq[State], destinationIds: Seq[Long]) = {
if (destinationIds.size == 0) Nil else {
val (evaluator, queryClass) = if (destinationIds.size == 1) (lowLatencyQueryEvaluator, SelectSingle) else (queryEvaluator, SelectIntersection)
val (evaluator, queryClass) = destinationIds.size match {
case s if s == 1 => (lowLatencyQueryEvaluator, SelectSingle)
case s if s <= 50 => (lowLatencyQueryEvaluator, SelectIntersectionSmall)
case s => (queryEvaluator, SelectIntersection)
}
evaluator.select(queryClass, "SELECT destination_id FROM " + tablePrefix + "_edges USE INDEX (unique_source_id_destination_id) WHERE source_id = ? AND state IN (?) AND destination_id IN (?) ORDER BY destination_id DESC",
sourceId, states.map(_.id), destinationIds) { row =>
row.getLong("destination_id")
Expand All @@ -295,7 +293,11 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val lowLatencyQueryEvaluator:

def intersectEdges(sourceId: Long, states: Seq[State], destinationIds: Seq[Long]) = {
if (destinationIds.size == 0) Nil else {
val (evaluator, queryClass) = if (destinationIds.size == 1) (lowLatencyQueryEvaluator, SelectSingle) else (queryEvaluator, SelectIntersection)
val (evaluator, queryClass) = destinationIds.size match {
case s if s == 1 => (lowLatencyQueryEvaluator, SelectSingle)
case s if s <= 50 => (lowLatencyQueryEvaluator, SelectIntersectionSmall)
case s => (queryEvaluator, SelectIntersection)
}
evaluator.select(queryClass, "SELECT * FROM " + tablePrefix + "_edges USE INDEX (unique_source_id_destination_id) WHERE source_id = ? AND state IN (?) AND destination_id IN (?) ORDER BY destination_id DESC",
sourceId, states.map(_.id), destinationIds) { row =>
makeEdge(row)
Expand Down
9 changes: 0 additions & 9 deletions src/test/scala/com/twitter/flockdb/unit/SqlShardSpec.scala
Expand Up @@ -93,15 +93,6 @@ class SqlShardSpec extends IntegrationSpecification with JMocker {
shard.count(carl, List(State.Normal)) mustEqual 1
}

"multiple counts" >> {
val results = new mutable.HashMap[Long, Int]
shard.add(alice, bob, 1, now)
shard.add(alice, carl, 2, now)
shard.add(carl, alice, 1, now)
shard.counts(List(alice, carl), results)
results mustEqual Map(alice -> 2, carl -> 1)
}

"when the user does not exist yet" >> {
shard.count(bob, List(State.Normal)) mustEqual 0
}
Expand Down

0 comments on commit a8f8a25

Please sign in to comment.