Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
stopping point; really hacky integration test infrastructure
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Kallen committed Apr 4, 2010
1 parent 1527072 commit 982ac13
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 48 deletions.
14 changes: 9 additions & 5 deletions config/test.conf
@@ -1,6 +1,10 @@
admin_port = 9991
admin_http_port = 9990

host {
id = 1
}

db_default {
username = "$(DB_USERNAME)"
password = "$(DB_PASSWORD)"
Expand All @@ -13,7 +17,7 @@ errors {
}

log {
level = "fatal"
level = "info"
console = true
}

Expand Down Expand Up @@ -60,7 +64,7 @@ rowz {

nameserver (inherit="db_default") {
hostnames = ["localhost"]
database = "rowz_nameserver"
name = "rowz_nameserver"

connection_pool {
max_wait = 20
Expand Down Expand Up @@ -93,21 +97,21 @@ rowz {
queue {
path = "/tmp"
journal off
primary {
high {
job_queue = "high"
error_queue = "high_errors"
threads = 1
error_limit = 25
replay_interval = 900
}
copy {
medium {
job_queue = "medium"
error_queue = "medium_errors"
threads = 1
error_limit = 25
replay_interval = 900
}
slow {
low {
job_queue = "low"
error_queue = "low_errors"
threads = 1
Expand Down
1 change: 1 addition & 0 deletions ivy/ivy.xml
Expand Up @@ -24,6 +24,7 @@
<dependency org="org.scala-lang" name="scala-library" rev="2.7.7" />
<dependency org="org.scala-tools.testing" name="specs" rev="1.6.1" conf="test->*" />
<dependency org="org.scala-tools" name="vscaladoc" rev="1.1-md-3" conf="bootstrap->*" />
<dependency org="net.lag" name="kestrel" rev="1.2" />

<dependency org="net.lag" name="configgy" rev="1.4" />
<dependency org="commons-logging" name="commons-logging" rev="1.1" />
Expand Down
Binary file modified libs/gizzard-1.0.jar
Binary file not shown.
31 changes: 12 additions & 19 deletions src/main/scala/com/twitter/rowz/Main.scala
@@ -1,11 +1,12 @@
package com.twitter.rowz

import com.twitter.gizzard.thrift.{JobManagerService, ShardManagerService}
import net.lag.configgy.Configgy
import net.lag.logging.Logger
import com.twitter.gizzard.jobs.CopyFactory
import com.twitter.gizzard.nameserver.NameServer
import com.twitter.gizzard.scheduler.PrioritizingJobScheduler
import com.twitter.gizzard.thrift.{TSelectorServer, JobManager, JobManagerService, ShardManager, ShardManagerService}
import com.twitter.gizzard.scheduler.{PrioritizingJobScheduler, Priority}
import com.twitter.gizzard.thrift.{TSelectorServer, JobManager, ShardManager}
import com.facebook.thrift.server.{TServer, TThreadPoolServer}
import com.facebook.thrift.transport.{TServerSocket, TTransportFactory}
import com.twitter.ostrich.{W3CStats, Stats}
Expand All @@ -14,11 +15,7 @@ import com.twitter.gizzard.proxy.LoggingProxy


object Main {
var rowzService: RowzService = null
var nameServer: NameServer[Shard] = null
var scheduler: PrioritizingJobScheduler = null
var copier: CopyFactory[Shard] = null

var state: Rowz.State = null
var rowzServer: TSelectorServer = null
var jobServer: TSelectorServer = null
var shardServer: TSelectorServer = null
Expand All @@ -37,27 +34,23 @@ object Main {
))

def main(args: Array[String]) {
val state = Rowz(config, w3c)
rowzService = state._1
nameServer = state._2
scheduler = state._3
copier = state._4

state = Rowz(config, w3c)
state.start()
startThrift()
}

def startThrift() {
val timeout = config("timeout").toInt.milliseconds
val executor = TSelectorServer.makeThreadPoolExecutor(config)
val processor = new rowz.thrift.Rowz.Processor(LoggingProxy[rowz.thrift.Rowz.Iface](Stats, w3c, "Rowz", rowzService))
val processor = new rowz.thrift.Rowz.Processor(LoggingProxy[rowz.thrift.Rowz.Iface](Stats, w3c, "Rowz", state.rowzService))
rowzServer = TSelectorServer("rowz", config("port").toInt, processor, executor, timeout)

val jobService = new JobManagerService(scheduler)
val jobProcessor = new JobManager.Processor(LoggingProxy[JobManager.Iface](Stats, Main.w3c, "RowzJobs", jobService))
val jobManagerService = new JobManagerService(state.prioritizingScheduler)
val jobProcessor = new JobManager.Processor(LoggingProxy[JobManager.Iface](Stats, Main.w3c, "RowzJobs", jobManagerService))
jobServer = TSelectorServer("rowz-jobs", config("rowz.job_server_port").toInt, jobProcessor, executor, timeout)

val shardService = new ShardManagerService(nameServer, copier, scheduler(Priority.Low.id))
val shardProcessor = new ShardManager.Processor(ExceptionWrappingProxy(LoggingProxy[ShardManager.Iface](Stats, Main.w3c, "RowzShards", shardService)))
val shardManagerService = new ShardManagerService(state.nameServer, state.copyFactory, state.prioritizingScheduler(Priority.Medium.id))
val shardProcessor = new ShardManager.Processor(ExceptionWrappingProxy(LoggingProxy[ShardManager.Iface](Stats, Main.w3c, "RowzShards", shardManagerService)))
shardServer = TSelectorServer("rowz-shards", config("rowz.shard_server_port").toInt, shardProcessor, executor, timeout)

rowzServer.serve()
Expand All @@ -68,7 +61,7 @@ object Main {
def shutdown() {
rowzServer.stop()
jobServer.stop()
scheduler.shutdown()
state.shutdown()

System.exit(0)
}
Expand Down
6 changes: 0 additions & 6 deletions src/main/scala/com/twitter/rowz/Priority.scala

This file was deleted.

43 changes: 27 additions & 16 deletions src/main/scala/com/twitter/rowz/Rowz.scala
@@ -1,13 +1,13 @@
package com.twitter.rowz

import net.lag.configgy.Config
import com.twitter.querulous.database.{ApachePoolingDatabaseFactory, MemoizingDatabaseFactory}
import com.twitter.querulous.database.{ApachePoolingDatabaseFactory, MemoizingDatabaseFactory, DatabaseFactory}
import com.twitter.querulous.query.SqlQueryFactory
import com.twitter.querulous.evaluator.StandardQueryEvaluatorFactory
import com.twitter.xrayspecs.TimeConversions._
import net.lag.logging.{Logger, ThrottledLogger}
import com.twitter.gizzard.Future
import com.twitter.gizzard.scheduler.{JobScheduler, PrioritizingJobScheduler}
import com.twitter.gizzard.scheduler.{JobScheduler, PrioritizingJobScheduler, Priority}
import com.twitter.gizzard.shards._
import com.twitter.gizzard.nameserver
import com.twitter.gizzard.nameserver.{NameServer, ShardRepository, LoadBalancer}
Expand All @@ -17,20 +17,36 @@ import com.twitter.ostrich.W3CStats


object Rowz {
def apply(config: Config, w3c: W3CStats) = {
val log = Logger.get
val databaseFactory = new MemoizingDatabaseFactory(new ApachePoolingDatabaseFactory(
case class State(
rowzService: RowzService,
prioritizingScheduler: PrioritizingJobScheduler,
nameServer: NameServer[Shard],
copyFactory: gizzard.jobs.CopyFactory[Shard]
) {
def start() = {
nameServer.reload()
prioritizingScheduler.start()
}

def shutdown() = prioritizingScheduler.shutdown()
}

def apply(config: Config, w3c: W3CStats): State = apply(
config, w3c,
new MemoizingDatabaseFactory(new ApachePoolingDatabaseFactory(
config("rowz.db.connection_pool.size_min").toInt,
config("rowz.db.connection_pool.size_max").toInt,
config("rowz.db.connection_pool.test_idle_msec").toLong.millis,
config("rowz.db.connection_pool.max_wait").toLong.millis,
config("rowz.db.connection_pool.test_on_borrow").toBoolean,
config("rowz.db.connection_pool.min_evictable_idle_msec").toLong.millis))
)

def apply(config: Config, w3c: W3CStats, databaseFactory: DatabaseFactory): State = {
val queryEvaluatorFactory = new StandardQueryEvaluatorFactory(databaseFactory, new SqlQueryFactory)

val throttledLogger = new ThrottledLogger[String](Logger(), config("throttled_log.period_msec").toInt, config("throttled_log.rate").toInt)
val future = new Future("ReplicatingFuture", config.configMap("replication.future"))
val future = new Future("ReplicatingFuture", config.configMap("rowz.replication.future"))

val shardRepository = new ShardRepository[Shard]
shardRepository += ("com.twitter.rowz.SqlShard" -> new SqlShardFactory(queryEvaluatorFactory, config))
Expand All @@ -39,8 +55,7 @@ object Rowz {
shardRepository += ("com.twitter.gizzard.shards.WriteOnlyShard" -> new WriteOnlyShardFactory(new ReadWriteShardAdapter(_)))
shardRepository += ("com.twitter.gizzard.shards.ReplicatingShard" -> new ReplicatingShardFactory(new ReadWriteShardAdapter(_), throttledLogger, { (x, y) => }, future))


val nameServerShards = config.getList("rowz.nameserver.databases").map { hostname =>
val nameServerShards = config.getList("rowz.nameserver.hostnames").map { hostname =>
new nameserver.SqlShard(
queryEvaluatorFactory(
hostname,
Expand All @@ -55,26 +70,22 @@ object Rowz {
val nameServer = new NameServer(replicatingNameServerShard, shardRepository, Hash)
val forwardingManager = new ForwardingManager(nameServer)


val polymorphicJobParser = new PolymorphicJobParser
val schedulerMap = new mutable.HashMap[Int, JobScheduler]
List((Priority.High, "high"), (Priority.Low, "low")).foreach { case (priority, configName) =>
val queueConfig = config.configMap("edges.queue")
List((Priority.High, "high"), (Priority.Medium, "medium"), (Priority.Low, "low")).foreach { case (priority, configName) =>
val queueConfig = config.configMap("rowz.queue")
val scheduler = JobScheduler(configName, queueConfig, polymorphicJobParser, w3c)
schedulerMap(priority.id) = scheduler
}
val prioritizingScheduler = new PrioritizingJobScheduler(schedulerMap)

val copyJobParser = new BoundJobParser((nameServer, prioritizingScheduler(Priority.Low.id)))
val copyJobParser = new BoundJobParser((nameServer, prioritizingScheduler(Priority.Medium.id)))
val rowzJobParser = new BoundJobParser(forwardingManager)
polymorphicJobParser += ("rowz\\.jobs\\.(Copy|Migrate)".r, copyJobParser)
polymorphicJobParser += ("rowz\\.jobs\\.(Create|Destroy)".r, rowzJobParser)

val rowzService = new RowzService(forwardingManager, prioritizingScheduler, new IdGenerator(config("host.id").toInt))

nameServer.reload()
prioritizingScheduler.start()

(rowzService, nameServer, prioritizingScheduler, jobs.CopyFactory)
State(rowzService, prioritizingScheduler, nameServer, jobs.CopyFactory)
}
}
2 changes: 1 addition & 1 deletion src/main/scala/com/twitter/rowz/thrift/RowzService.scala
@@ -1,7 +1,7 @@
package com.twitter.rowz

import net.lag.configgy.Config
import com.twitter.gizzard.scheduler.PrioritizingJobScheduler
import com.twitter.gizzard.scheduler.{PrioritizingJobScheduler, Priority}
import jobs.{Create, Destroy}
import com.twitter.xrayspecs.Time
import com.twitter.xrayspecs.TimeConversions._
Expand Down
11 changes: 11 additions & 0 deletions src/test/scala/com/twitter/rowz/Database.scala
Expand Up @@ -5,9 +5,17 @@ import com.twitter.querulous.query.SqlQueryFactory
import com.twitter.querulous.database.{ApachePoolingDatabaseFactory, MemoizingDatabaseFactory}
import com.twitter.querulous.evaluator.StandardQueryEvaluatorFactory
import com.twitter.xrayspecs.TimeConversions._
import com.twitter.ostrich.W3CStats
import net.lag.logging.Logger


object Database {
val log = Logger.get
val w3c = new W3CStats(log, Array(
"action-timing",
"db-timing",
"connection-pool-release-timing"
))
val config = Configgy.config
val databaseFactory = new MemoizingDatabaseFactory(new ApachePoolingDatabaseFactory(
config("rowz.db.connection_pool.size_min").toInt,
Expand All @@ -17,4 +25,7 @@ object Database {
config("rowz.db.connection_pool.test_on_borrow").toBoolean,
config("rowz.db.connection_pool.min_evictable_idle_msec").toLong.millis))
val queryEvaluatorFactory = new StandardQueryEvaluatorFactory(databaseFactory, new SqlQueryFactory)
val queryEvaluator = queryEvaluatorFactory("localhost", null, "root", "") // XXX FIXME
queryEvaluator.execute("DROP DATABASE IF EXISTS rowz_nameserver")
queryEvaluator.execute("CREATE DATABASE rowz_nameserver")
}
43 changes: 43 additions & 0 deletions src/test/scala/com/twitter/rowz/integration/RowzSpec.scala
@@ -0,0 +1,43 @@
package com.twitter.rowz.integration

import org.specs.Specification
import com.twitter.xrayspecs.{Time, Eventually}
import com.twitter.xrayspecs.TimeConversions._
import net.lag.logging.Logger
import com.twitter.gizzard.shards.ShardInfo
import com.twitter.gizzard.nameserver.Forwarding


object RowzSpec extends Specification with Eventually {
"Rowz" should {
import Database._
Time.freeze()
val state = Rowz(config, w3c, databaseFactory)
val rowzService = state.rowzService

doBefore {
state.nameServer.rebuildSchema()
val shard1 = new ShardInfo("com.twitter.rowz.SqlShard", "shard_1", "localhost")
val shardId = state.nameServer.createShard(shard1)
state.nameServer.setForwarding(new Forwarding(0, 0, shardId))
// state.nameServer.createShard(shard2)
state.start()
}

"row create & read" in {
println("1")
val id = rowzService.create("row", Time.now.inSeconds)
Thread.sleep(5.seconds.inMillis)
println("2")
rowzService.read(id)
println("3")
val row = rowzService.read(id)
println("4")
row.name mustEqual "row"
println("5")
rowzService.destroy(row, 1.second.fromNow.inSeconds)
println("6")
rowzService.read(id) must eventually(throwA[thrift.RowzException])
}
}
}
@@ -1,4 +1,4 @@
package com.twitter.rowz
package com.twitter.rowz.unit

import org.specs.Specification
import org.specs.mock.{ClassMocker, JMocker}
Expand Down

0 comments on commit 982ac13

Please sign in to comment.