diff --git a/config/development.conf b/config/development.conf index f603a37..e3970e3 100644 --- a/config/development.conf +++ b/config/development.conf @@ -46,6 +46,7 @@ throttled_log { rowz { timeout_msec = 100 + idle_timeout_sec = 60 server_port = 7919 shard_server_port = 7920 job_server_port = 7921 diff --git a/config/production.conf b/config/production.conf index f603a37..e3970e3 100644 --- a/config/production.conf +++ b/config/production.conf @@ -46,6 +46,7 @@ throttled_log { rowz { timeout_msec = 100 + idle_timeout_sec = 60 server_port = 7919 shard_server_port = 7920 job_server_port = 7921 diff --git a/config/test.conf b/config/test.conf index f603a37..e3970e3 100644 --- a/config/test.conf +++ b/config/test.conf @@ -46,6 +46,7 @@ throttled_log { rowz { timeout_msec = 100 + idle_timeout_sec = 60 server_port = 7919 shard_server_port = 7920 job_server_port = 7921 diff --git a/ivy/ivy.xml b/ivy/ivy.xml index f23658c..91f1440 100644 --- a/ivy/ivy.xml +++ b/ivy/ivy.xml @@ -26,13 +26,17 @@ + + + + + - diff --git a/libs/gizzard-1.0.jar b/libs/gizzard-1.0.jar index 10f3739..038c79b 100644 Binary files a/libs/gizzard-1.0.jar and b/libs/gizzard-1.0.jar differ diff --git a/src/main/scala/com/twitter/rowz/Main.scala b/src/main/scala/com/twitter/rowz/Main.scala index 86020c7..fd6f11c 100644 --- a/src/main/scala/com/twitter/rowz/Main.scala +++ b/src/main/scala/com/twitter/rowz/Main.scala @@ -1,17 +1,17 @@ package com.twitter.rowz -import com.twitter.gizzard.thrift.{JobManagerService, ShardManagerService} -import net.lag.configgy.{Configgy, RuntimeEnvironment, ConfigMap} -import net.lag.logging.Logger import com.twitter.gizzard.jobs.CopyFactory import com.twitter.gizzard.nameserver.NameServer +import com.twitter.gizzard.proxy.LoggingProxy import com.twitter.gizzard.scheduler.{PrioritizingJobScheduler, Priority} -import com.twitter.gizzard.thrift.{TSelectorServer, JobManager, ShardManager} -import com.facebook.thrift.server.{TServer, TThreadPoolServer} -import com.facebook.thrift.transport.{TServerSocket, TTransportFactory} +import com.twitter.gizzard.thrift.{TSelectorServer, JobManager, JobManagerService, ShardManager, + ShardManagerService} import com.twitter.ostrich.{W3CStats, Stats} import com.twitter.xrayspecs.TimeConversions._ -import com.twitter.gizzard.proxy.LoggingProxy +import net.lag.configgy.{Configgy, RuntimeEnvironment, ConfigMap} +import net.lag.logging.Logger +import org.apache.thrift.server.{TServer, TThreadPoolServer} +import org.apache.thrift.transport.{TServerSocket, TTransportFactory} object Main { @@ -41,17 +41,18 @@ object Main { def startThrift(w3c: W3CStats) { val timeout = config("rowz.timeout_msec").toInt.milliseconds + val idleTimeout = config("rowz.idle_timeout_sec").toInt.seconds val executor = TSelectorServer.makeThreadPoolExecutor(config.configMap("rowz")) val processor = new rowz.thrift.Rowz.Processor(LoggingProxy[rowz.thrift.Rowz.Iface](Stats, w3c, "Rowz", state.rowzService)) - rowzServer = TSelectorServer("rowz", config("rowz.server_port").toInt, processor, executor, timeout) + rowzServer = TSelectorServer("rowz", config("rowz.server_port").toInt, processor, executor, timeout, idleTimeout) val jobManagerService = new JobManagerService(state.prioritizingScheduler) val jobProcessor = new JobManager.Processor(LoggingProxy[JobManager.Iface](Stats, w3c, "RowzJobs", jobManagerService)) - jobServer = TSelectorServer("rowz-jobs", config("rowz.job_server_port").toInt, jobProcessor, executor, timeout) + jobServer = TSelectorServer("rowz-jobs", config("rowz.job_server_port").toInt, jobProcessor, executor, timeout, idleTimeout) val shardManagerService = new ShardManagerService(state.nameServer, state.copyFactory, state.prioritizingScheduler(Priority.Medium.id)) val shardProcessor = new ShardManager.Processor(ExceptionWrappingProxy(LoggingProxy[ShardManager.Iface](Stats, w3c, "RowzShards", shardManagerService))) - shardServer = TSelectorServer("rowz-shards", config("rowz.shard_server_port").toInt, shardProcessor, executor, timeout) + shardServer = TSelectorServer("rowz-shards", config("rowz.shard_server_port").toInt, shardProcessor, executor, timeout, idleTimeout) rowzServer.serve() jobServer.serve() diff --git a/src/main/scala/com/twitter/rowz/ReadWriteShardAdapter.scala b/src/main/scala/com/twitter/rowz/ReadWriteShardAdapter.scala index db4941e..9be4b0f 100644 --- a/src/main/scala/com/twitter/rowz/ReadWriteShardAdapter.scala +++ b/src/main/scala/com/twitter/rowz/ReadWriteShardAdapter.scala @@ -15,4 +15,4 @@ class ReadWriteShardAdapter(shard: ReadWriteShard[Shard]) def read(id: Long) = shard.readOperation(_.read(id)) def selectAll(cursor: Cursor, count: Int) = shard.readOperation(_.selectAll(cursor, count)) -} \ No newline at end of file +} diff --git a/src/main/scala/com/twitter/rowz/Rowz.scala b/src/main/scala/com/twitter/rowz/Rowz.scala index 46589e0..2503cd4 100644 --- a/src/main/scala/com/twitter/rowz/Rowz.scala +++ b/src/main/scala/com/twitter/rowz/Rowz.scala @@ -10,7 +10,6 @@ import com.twitter.gizzard.Future import com.twitter.gizzard.scheduler.{JobScheduler, PrioritizingJobScheduler, Priority} import com.twitter.gizzard.shards._ import com.twitter.gizzard.nameserver -import com.twitter.gizzard.nameserver.{NameServer, ShardRepository, LoadBalancer} import com.twitter.gizzard.jobs.{PolymorphicJobParser, BoundJobParser} import scala.collection.mutable import com.twitter.ostrich.W3CStats @@ -20,7 +19,7 @@ object Rowz { case class State( rowzService: RowzService, prioritizingScheduler: PrioritizingJobScheduler, - nameServer: NameServer[Shard], + nameServer: nameserver.NameServer[Shard], copyFactory: gizzard.jobs.CopyFactory[Shard]) { def start() = { nameServer.reload() @@ -47,12 +46,8 @@ object Rowz { val throttledLogger = new ThrottledLogger[String](Logger(), config("throttled_log.period_msec").toInt, config("throttled_log.rate").toInt) val future = new Future("ReplicatingFuture", config.configMap("rowz.replication.future")) - val shardRepository = new ShardRepository[Shard] - shardRepository += ("com.twitter.rowz.SqlShard" -> new SqlShardFactory(queryEvaluatorFactory, config)) - shardRepository += ("com.twitter.gizzard.shards.ReadOnlyShard" -> new ReadOnlyShardFactory(new ReadWriteShardAdapter(_))) - shardRepository += ("com.twitter.gizzard.shards.BlockedShard" -> new BlockedShardFactory(new ReadWriteShardAdapter(_))) - shardRepository += ("com.twitter.gizzard.shards.WriteOnlyShard" -> new WriteOnlyShardFactory(new ReadWriteShardAdapter(_))) - shardRepository += ("com.twitter.gizzard.shards.ReplicatingShard" -> new ReplicatingShardFactory(new ReadWriteShardAdapter(_), throttledLogger, { (x, y) => }, future)) + val shardRepository = new nameserver.BasicShardRepository[Shard](new ReadWriteShardAdapter(_), throttledLogger, future) + shardRepository += ("com.twitter.rowz.SqlShard" -> new SqlShardFactory(queryEvaluatorFactory, config)) val nameServerShards = config.getList("rowz.nameserver.hostnames").map { hostname => new nameserver.SqlShard( @@ -65,8 +60,8 @@ object Rowz { val replicatingNameServerShard = new nameserver.ReadWriteShardAdapter(new ReplicatingShard( new ShardInfo("com.twitter.gizzard.shards.ReplicatingShard", "", ""), - 1, nameServerShards, new LoadBalancer(nameServerShards), throttledLogger, future, { (x, y) => })) - val nameServer = new NameServer(replicatingNameServerShard, shardRepository, Hash) + 1, nameServerShards, new nameserver.LoadBalancer(nameServerShards), throttledLogger, future)) + val nameServer = new nameserver.NameServer(replicatingNameServerShard, shardRepository, Hash) val forwardingManager = new ForwardingManager(nameServer) val polymorphicJobParser = new PolymorphicJobParser diff --git a/src/main/thrift/Rowz.thrift b/src/main/thrift/Rowz.thrift index c3868d4..70ccb54 100644 --- a/src/main/thrift/Rowz.thrift +++ b/src/main/thrift/Rowz.thrift @@ -14,7 +14,7 @@ exception RowzException { } service Rowz { - i64 create(1: string name, 2: i32 at) throws(RowzException ex) - void destroy(1: Row row, 2: i32 at) throws(RowzException ex) - Row read(1: i64 id) throws(RowzException ex) + i64 create(1: string name, 2: i32 at) throws(1: RowzException ex) + void destroy(1: Row row, 2: i32 at) throws(1: RowzException ex) + Row read(1: i64 id) throws(1: RowzException ex) }