Skip to content

Commit

Permalink
integrate new gizzard shardservice
Browse files Browse the repository at this point in the history
Conflicts:

	project/build/FlockDBProject.scala
	src/main/scala/com/twitter/flockdb/jobs/Copy.scala
	src/main/scala/com/twitter/flockdb/test/EdgesDatabase.scala
  • Loading branch information
Ed Ceaser committed Jun 3, 2010
1 parent 9f3e51c commit ee08d89
Show file tree
Hide file tree
Showing 13 changed files with 90 additions and 109 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -9,3 +9,4 @@ kestrel/*
lib_managed/
src_managed/
project/boot/
project/plugins/project/
26 changes: 26 additions & 0 deletions project/build/FlockDBProject.scala
@@ -0,0 +1,26 @@
import sbt._
import Process._
import com.twitter.sbt.StandardProject


class FlockDBProject(info: ProjectInfo) extends StandardProject(info) {
val asm = "asm" % "asm" % "1.5.3"
val cglib = "cglib" % "cglib" % "2.1_3"
val configgy = "net.lag" % "configgy" % "1.5.2"
val dbcp = "commons-dbcp" % "commons-dbcp" % "1.2.2"
val gizzard = "com.twitter" % "gizzard" % "1.1.0"
val hamcrest = "org.hamcrest" % "hamcrest-all" % "1.1"
val jmock = "org.jmock" % "jmock" % "2.4.0"
val kestrel = "net.lag" % "kestrel" % "1.2"
val mysqljdbc = "mysql" % "mysql-connector-java" % "5.1.6"
val objenesis = "org.objenesis" % "objenesis" % "1.1"
val pool = "commons-pool" % "commons-pool" % "1.3"
val results = "com.twitter" % "results" % "1.0"
val slf4j = "org.slf4j" % "slf4j-jdk14" % "1.5.2"
val slf4jApi = "org.slf4j" % "slf4j-api" % "1.5.2"
val smile = "net.lag" % "smile" % "0.8.11"
val specs = "org.scala-tools.testing" % "specs" % "1.6.2.1"
val thrift = "thrift" % "libthrift" % "0.2.0"
val xrayspecs = "com.twitter" % "xrayspecs" % "1.0.7"
val log4j = "log4j" % "log4j" % "1.2.12"
}
66 changes: 0 additions & 66 deletions project/build/Flockdb.scala

This file was deleted.

7 changes: 7 additions & 0 deletions project/plugins/Plugins.scala
@@ -0,0 +1,7 @@
import sbt._


class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
val twitterNest = "com.twitter" at "http://www.lag.net/nest"
val defaultProject = "com.twitter" % "standard-project" % "0.5.5"
}
4 changes: 2 additions & 2 deletions src/main/scala/com/twitter/flockdb/FlockDB.scala
Expand Up @@ -105,7 +105,7 @@ object FlockDB {
List((Priority.High, "primary"), (Priority.Medium, "copy"),
(Priority.Low, "slow")).foreach { case (priority, configName) =>
val queueConfig = config.configMap("edges.queue")
val scheduler = JobScheduler(configName, queueConfig, jobParser, w3c)
val scheduler = JobScheduler(configName, queueConfig, jobParser)
schedulerMap(priority.id) = scheduler
}
val scheduler = new PrioritizingJobScheduler(schedulerMap)
Expand All @@ -118,7 +118,7 @@ object FlockDB {
replicationFuture))

val nameServer = new nameserver.NameServer(replicatingNameServerShard, shardRepository,
nameserver.ByteSwapper, nameserver.RandomIdGenerator)
nameserver.ByteSwapper)
val forwardingManager = new ForwardingManager(nameServer)
val copyFactory = jobs.CopyFactory
nameServer.reload()
Expand Down
23 changes: 14 additions & 9 deletions src/main/scala/com/twitter/flockdb/jobs/Copy.scala
Expand Up @@ -2,6 +2,7 @@ package com.twitter.flockdb.jobs

import com.twitter.gizzard.jobs.BoundJobParser
import com.twitter.gizzard.scheduler.JobScheduler
import com.twitter.gizzard.shards.ShardId
import com.twitter.gizzard.nameserver.NameServer
import com.twitter.results
import com.twitter.ostrich.Stats
Expand All @@ -19,15 +20,15 @@ object Copy {
}

object CopyFactory extends gizzard.jobs.CopyFactory[Shard] {
def apply(sourceShardId: Int, destinationShardId: Int) = new MetadataCopy(sourceShardId, destinationShardId, MetadataCopy.START)
def apply(sourceShardId: ShardId, destinationShardId: ShardId) = new MetadataCopy(sourceShardId, destinationShardId, MetadataCopy.START)
}

class Copy(sourceShardId: Int, destinationShardId: Int, cursor: Copy.Cursor, count: Int) extends gizzard.jobs.Copy[Shard](sourceShardId, destinationShardId, count) {
def this(sourceShardId: Int, destinationShardId: Int, cursor: Copy.Cursor) = this(sourceShardId, destinationShardId, cursor, Copy.COUNT)
class Copy(sourceShardId: ShardId, destinationShardId: ShardId, cursor: Copy.Cursor, count: Int) extends gizzard.jobs.Copy[Shard](sourceShardId, destinationShardId, count) {
def this(sourceShardId: ShardId, destinationShardId: ShardId, cursor: Copy.Cursor) = this(sourceShardId, destinationShardId, cursor, Copy.COUNT)
def this(attributes: Map[String, AnyVal]) = {
this(
attributes("source_shard_id").toInt,
attributes("destination_shard_id").toInt,
ShardId(attributes("source_shard_hostname").toString, attributes("source_shard_table_prefix").toString),
ShardId(attributes("destination_shard_hostname").toString, attributes("destination_shard_table_prefix").toString),
(results.Cursor(attributes("cursor1").toInt), results.Cursor(attributes("cursor2").toInt)),
attributes("count").toInt)
}
Expand All @@ -51,12 +52,16 @@ object MetadataCopy {
val END = results.Cursor.End
}

class MetadataCopy(sourceShardId: Int, destinationShardId: Int, cursor: MetadataCopy.Cursor, count: Int) extends gizzard.jobs.Copy[Shard](sourceShardId, destinationShardId, count) {
def this(sourceShardId: Int, destinationShardId: Int, cursor: MetadataCopy.Cursor) = this(sourceShardId, destinationShardId, cursor, Copy.COUNT)
class MetadataCopy(sourceShardId: ShardId, destinationShardId: ShardId, cursor: MetadataCopy.Cursor,
count: Int)
extends gizzard.jobs.Copy[Shard](sourceShardId, destinationShardId, count) {
def this(sourceShardId: ShardId, destinationShardId: ShardId, cursor: MetadataCopy.Cursor) =
this(sourceShardId, destinationShardId, cursor, Copy.COUNT)

def this(attributes: Map[String, AnyVal]) = {
this(
attributes("source_shard_id").toInt,
attributes("destination_shard_id").toInt,
ShardId(attributes("source_shard_hostname").toString, attributes("source_shard_table_prefix").toString),
ShardId(attributes("destination_shard_hostname").toString, attributes("destination_shard_table_prefix").toString),
results.Cursor(attributes("cursor").toInt),
attributes("count").toInt)
}
Expand Down
31 changes: 18 additions & 13 deletions src/main/scala/com/twitter/flockdb/test/EdgesDatabase.scala
Expand Up @@ -4,7 +4,7 @@ import com.twitter.querulous.query.SqlQueryFactory
import com.twitter.querulous.evaluator.StandardQueryEvaluatorFactory
import net.lag.configgy.{ConfigMap, Configgy}
import com.twitter.gizzard.nameserver.Forwarding
import com.twitter.gizzard.shards.{Busy, ShardInfo}
import com.twitter.gizzard.shards.{Busy, ShardId, ShardInfo}
import com.twitter.gizzard.test.NameServerDatabase


Expand All @@ -18,23 +18,28 @@ trait EdgesDatabase extends NameServerDatabase {
val queryEvaluator = evaluator(config)

for (graph <- (1 until 10)) {
val forwardShardId = flock.nameServer.createShard(new ShardInfo("com.twitter.flockdb.SqlShard",
"forward_" + graph, "localhost", "INT UNSIGNED", "INT UNSIGNED"))
val backwardShardId = flock.nameServer.createShard(new ShardInfo("com.twitter.flockdb.SqlShard",
"backward_" + graph, "localhost", "INT UNSIGNED", "INT UNSIGNED"))
val forwardShardId = ShardId("localhost", "forward_" + graph)
val backwardShardId = ShardId("localhost", "backward_" + graph)

flock.nameServer.createShard(ShardInfo(forwardShardId,
"com.twitter.flockdb.SqlShard", "INT UNSIGNED", "INT UNSIGNED", Busy.Normal))
flock.nameServer.createShard(ShardInfo(backwardShardId,
"com.twitter.flockdb.SqlShard", "INT UNSIGNED", "INT UNSIGNED", Busy.Normal))
queryEvaluator.execute("DELETE FROM forward_" + graph + "_edges")
queryEvaluator.execute("DELETE FROM forward_" + graph + "_metadata")
queryEvaluator.execute("DELETE FROM backward_" + graph + "_edges")
queryEvaluator.execute("DELETE FROM backward_" + graph + "_metadata")

val replicatingForwardShardId = flock.nameServer.createShard(new ShardInfo("com.twitter.gizzard.shards.ReplicatingShard",
"replicating_forward_" + graph, "localhost", "", ""))
val replicatingBackwardShardId = flock.nameServer.createShard(new ShardInfo("com.twitter.gizzard.shards.ReplicatingShard",
"replicating_backward_" + graph, "localhost", "", ""))
flock.nameServer.addChildShard(replicatingForwardShardId, forwardShardId, 1)
flock.nameServer.addChildShard(replicatingBackwardShardId, backwardShardId, 1)
flock.nameServer.setForwarding(new Forwarding(graph, 0, replicatingForwardShardId))
flock.nameServer.setForwarding(new Forwarding(-1 * graph, 0, replicatingBackwardShardId))
val replicatingForwardShardId = ShardId("localhost", "replicating_forward_" + graph)
val replicatingBackwardShardId = ShardId("localhost", "replicating_backward_" + graph)
flock.nameServer.createShard(ShardInfo(replicatingForwardShardId,
"com.twitter.gizzard.shards.ReplicatingShard", "", "", Busy.Normal))
flock.nameServer.createShard(ShardInfo(replicatingBackwardShardId,
"com.twitter.gizzard.shards.ReplicatingShard", "", "", Busy.Normal))
flock.nameServer.addLink(replicatingForwardShardId, forwardShardId, 1)
flock.nameServer.addLink(replicatingBackwardShardId, backwardShardId, 1)
flock.nameServer.setForwarding(Forwarding(graph, 0, replicatingForwardShardId))
flock.nameServer.setForwarding(Forwarding(-1 * graph, 0, replicatingBackwardShardId))
}
flock.nameServer.reload()
} catch {
Expand Down
Expand Up @@ -4,7 +4,7 @@ import net.lag.configgy.Configgy
import org.specs.Specification

abstract class ConfiguredSpecification extends Specification {
// Configgy.configure("config/test.conf")
Configgy.configure("config/test.conf")
lazy val config = Configgy.config
}

Expand Up @@ -3,7 +3,7 @@ package com.twitter.flockdb.integration
import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.results.Cursor
import com.twitter.ostrich.Stats
import com.twitter.xrayspecs.{Eventually, Time}
import com.twitter.xrayspecs.Time
import com.twitter.xrayspecs.TimeConversions._
import net.lag.smile.kestrel.{KestrelClient, MemoryStore}
import thrift._
Expand All @@ -12,7 +12,7 @@ import conversions.SelectOperation._
import test.{EdgesDatabase, StaticEdges}


object EdgesSpec extends ConfiguredSpecification with EdgesDatabase with Eventually {
object EdgesSpec extends ConfiguredSpecification with EdgesDatabase {
val poolConfig = config.configMap("db.connection_pool")

import StaticEdges._
Expand Down
Expand Up @@ -3,12 +3,11 @@ package com.twitter.flockdb.integration
import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.querulous.evaluator.QueryEvaluatorFactory
import com.twitter.results.Cursor
import com.twitter.xrayspecs.Eventually
import test.{EdgesDatabase, StaticEdges}
import thrift.{Page, QueryTerm, Results, SelectOperation, SelectOperationType}


object IntersectionSpec extends ConfiguredSpecification with Eventually with EdgesDatabase {
object IntersectionSpec extends ConfiguredSpecification with EdgesDatabase {
val poolConfig = config.configMap("db.connection_pool")

import StaticEdges._
Expand Down
Expand Up @@ -3,14 +3,14 @@ package com.twitter.flockdb.integration
import scala.collection.mutable
import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.results.Cursor
import com.twitter.xrayspecs.{Time, Eventually}
import com.twitter.xrayspecs.Time
import com.twitter.xrayspecs.TimeConversions._
import org.specs.mock.{ClassMocker, JMocker}
import test.{EdgesDatabase, StaticEdges}
import thrift._


object SelectCompilerSpec extends ConfiguredSpecification with Eventually with EdgesDatabase with JMocker with ClassMocker {
object SelectCompilerSpec extends ConfiguredSpecification with EdgesDatabase with JMocker with ClassMocker {
val poolConfig = config.configMap("db.connection_pool")

import StaticEdges._
Expand Down
18 changes: 11 additions & 7 deletions src/test/scala/com/twitter/flockdb/unit/CopySpec.scala
Expand Up @@ -2,7 +2,7 @@ package com.twitter.flockdb.unit

import com.twitter.gizzard.scheduler.JobScheduler
import com.twitter.gizzard.nameserver.{NameServer, ShardMigration}
import com.twitter.gizzard.shards.{Busy, ShardTimeoutException}
import com.twitter.gizzard.shards.{Busy, ShardId, ShardTimeoutException}
import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.results.Cursor
import com.twitter.xrayspecs.Time
Expand All @@ -13,8 +13,8 @@ import shards.{Metadata, Shard}


object CopySpec extends ConfiguredSpecification with JMocker with ClassMocker {
val shard1Id = 10
val shard2Id = 20
val shard1Id = ShardId("test", "shard1")
val shard2Id = ShardId("test", "shard2")
val count = 2300

"Copy" should {
Expand Down Expand Up @@ -70,8 +70,10 @@ object CopySpec extends ConfiguredSpecification with JMocker with ClassMocker {
val job = new Copy(shard1Id, shard2Id, (cursor1, cursor2), count)
val json = job.toJson
json mustMatch "Copy"
json mustMatch "\"source_shard_id\":" + shard1Id
json mustMatch "\"destination_shard_id\":" + shard2Id
json mustMatch "\"source_shard_hostname\":\"%s\"".format(shard1Id.hostname)
json mustMatch "\"source_shard_table_prefix\":\"%s\"".format(shard1Id.tablePrefix)
json mustMatch "\"destination_shard_hostname\":\"%s\"".format(shard2Id.hostname)
json mustMatch "\"destination_shard_table_prefix\":\"%s\"".format(shard2Id.tablePrefix)
json mustMatch "\"count\":" + count
json mustMatch "\"cursor1\":" + cursor1.position
json mustMatch "\"cursor2\":" + cursor2.position
Expand Down Expand Up @@ -119,8 +121,10 @@ object CopySpec extends ConfiguredSpecification with JMocker with ClassMocker {
val job = new MetadataCopy(shard1Id, shard2Id, cursor, count)
val json = job.toJson
json mustMatch "MetadataCopy"
json mustMatch "\"source_shard_id\":" + shard1Id
json mustMatch "\"destination_shard_id\":" + shard2Id
json mustMatch "\"source_shard_hostname\":\"%s\"".format(shard1Id.hostname)
json mustMatch "\"source_shard_table_prefix\":\"%s\"".format(shard1Id.tablePrefix)
json mustMatch "\"destination_shard_hostname\":\"%s\"".format(shard2Id.hostname)
json mustMatch "\"destination_shard_table_prefix\":\"%s\"".format(shard2Id.tablePrefix)
json mustMatch "\"count\":" + count
json mustMatch "\"cursor\":" + cursor.position
}
Expand Down
10 changes: 5 additions & 5 deletions src/test/scala/com/twitter/flockdb/unit/SqlShardSpec.scala
Expand Up @@ -2,7 +2,7 @@ package com.twitter.flockdb.unit

import java.sql.SQLException
import scala.collection.mutable
import com.twitter.gizzard.shards.{Busy, ShardInfo}
import com.twitter.gizzard.shards.{Busy, ShardId, ShardInfo}
import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.querulous.evaluator.{StandardQueryEvaluatorFactory, QueryEvaluator, QueryEvaluatorFactory}
import com.twitter.querulous.query.SqlQueryFactory
Expand Down Expand Up @@ -34,8 +34,8 @@ object SqlShardSpec extends ConfiguredSpecification with JMocker with EdgesDatab
evalConf.update("database", config("edges.db_name"))
val queryEvaluator = evaluator(evalConf)
val shardFactory = new SqlShardFactory(queryEvaluatorFactory, queryEvaluatorFactory, config)
val shardInfo = new ShardInfo("com.twitter.flockdb.SqlShard",
"table_001", "localhost", "INT UNSIGNED", "INT UNSIGNED", Busy.Normal, 1)
val shardInfo = ShardInfo(ShardId("localhost", "table_001"), "com.twitter.flockdb.SqlShard",
"INT UNSIGNED", "INT UNSIGNED", Busy.Normal)
var shard: Shard = null

doBefore {
Expand All @@ -50,8 +50,8 @@ object SqlShardSpec extends ConfiguredSpecification with JMocker with EdgesDatab

"create" in {
val createShardFactory = new SqlShardFactory(queryEvaluatorFactory, queryEvaluatorFactory, config)
val createShardInfo = new ShardInfo("com.twitter.flockdb.SqlShard",
"create_test", "localhost", "INT UNSIGNED", "INT UNSIGNED", Busy.Normal, 1)
val createShardInfo = ShardInfo(ShardId("localhost", "create_test"), "com.twitter.flockdb.SqlShard",
"INT UNSIGNED", "INT UNSIGNED", Busy.Normal)
val createShard = new SqlShard(queryEvaluator, createShardInfo, 1, Nil, config)

"when the database doesn't exist" >> {
Expand Down

0 comments on commit ee08d89

Please sign in to comment.