Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

thrift 0.2.0 support (and updated gizzard).

  • Loading branch information...
commit eb1a4e529c6336ff25dc47fba3845901a86ad514 1 parent c523b34
Robey Pointer authored
View
1  config/development.conf
@@ -46,6 +46,7 @@ throttled_log {
rowz {
timeout_msec = 100
+ idle_timeout_sec = 60
server_port = 7919
shard_server_port = 7920
job_server_port = 7921
View
1  config/production.conf
@@ -46,6 +46,7 @@ throttled_log {
rowz {
timeout_msec = 100
+ idle_timeout_sec = 60
server_port = 7919
shard_server_port = 7920
job_server_port = 7921
View
1  config/test.conf
@@ -46,6 +46,7 @@ throttled_log {
rowz {
timeout_msec = 100
+ idle_timeout_sec = 60
server_port = 7919
shard_server_port = 7920
job_server_port = 7921
View
6 ivy/ivy.xml
@@ -26,13 +26,17 @@
<dependency org="org.scala-tools" name="vscaladoc" rev="1.1-md-3" conf="bootstrap->*" />
<dependency org="net.lag" name="kestrel" rev="1.2" />
+ <!-- thrift dependencies -->
+ <dependency org="org.slf4j" name="slf4j-jdk14" rev="1.5.2" />
+ <dependency org="org.slf4j" name="slf4j-api" rev="1.5.2" />
+ <dependency org="thrift" name="libthrift" rev="0.2.0" conf="*" /> <!--auto-->
+
<dependency org="net.lag" name="configgy" rev="1.4" />
<dependency org="commons-logging" name="commons-logging" rev="1.1" />
<dependency org="commons-lang" name="commons-lang" rev="2.2" />
<dependency org="com.twitter" name="ostrich" rev="1.1.13" conf="*" /> <!--auto-->
<dependency org="com.twitter" name="gizzard" rev="1.0" conf="*" /> <!--auto-->
<dependency org="com.twitter" name="querulous" rev="1.1.4" />
- <dependency org="thrift" name="libthrift" rev="751142" conf="*" /> <!--auto-->
<dependency org="org.jmock" name="jmock" rev="2.4.0" conf="test->*" /> <!--auto-->
<dependency org="org.hamcrest" name="hamcrest-all" rev="1.1" conf="test->*" /> <!--auto-->
<dependency org="cglib" name="cglib" rev="2.1_3" conf="test->*" /> <!--auto-->
View
BIN  libs/gizzard-1.0.jar
Binary file not shown
View
21 src/main/scala/com/twitter/rowz/Main.scala
@@ -1,17 +1,17 @@
package com.twitter.rowz
-import com.twitter.gizzard.thrift.{JobManagerService, ShardManagerService}
-import net.lag.configgy.{Configgy, RuntimeEnvironment, ConfigMap}
-import net.lag.logging.Logger
import com.twitter.gizzard.jobs.CopyFactory
import com.twitter.gizzard.nameserver.NameServer
+import com.twitter.gizzard.proxy.LoggingProxy
import com.twitter.gizzard.scheduler.{PrioritizingJobScheduler, Priority}
-import com.twitter.gizzard.thrift.{TSelectorServer, JobManager, ShardManager}
-import com.facebook.thrift.server.{TServer, TThreadPoolServer}
-import com.facebook.thrift.transport.{TServerSocket, TTransportFactory}
+import com.twitter.gizzard.thrift.{TSelectorServer, JobManager, JobManagerService, ShardManager,
+ ShardManagerService}
import com.twitter.ostrich.{W3CStats, Stats}
import com.twitter.xrayspecs.TimeConversions._
-import com.twitter.gizzard.proxy.LoggingProxy
+import net.lag.configgy.{Configgy, RuntimeEnvironment, ConfigMap}
+import net.lag.logging.Logger
+import org.apache.thrift.server.{TServer, TThreadPoolServer}
+import org.apache.thrift.transport.{TServerSocket, TTransportFactory}
object Main {
@@ -40,17 +40,18 @@ object Main {
def startThrift(w3c: W3CStats) {
val timeout = config("rowz.timeout_msec").toInt.milliseconds
+ val idleTimeout = config("rowz.idle_timeout_sec").toInt.seconds
val executor = TSelectorServer.makeThreadPoolExecutor(config.configMap("rowz"))
val processor = new rowz.thrift.Rowz.Processor(LoggingProxy[rowz.thrift.Rowz.Iface](Stats, w3c, "Rowz", state.rowzService))
- rowzServer = TSelectorServer("rowz", config("rowz.server_port").toInt, processor, executor, timeout)
+ rowzServer = TSelectorServer("rowz", config("rowz.server_port").toInt, processor, executor, timeout, idleTimeout)
val jobManagerService = new JobManagerService(state.prioritizingScheduler)
val jobProcessor = new JobManager.Processor(LoggingProxy[JobManager.Iface](Stats, w3c, "RowzJobs", jobManagerService))
- jobServer = TSelectorServer("rowz-jobs", config("rowz.job_server_port").toInt, jobProcessor, executor, timeout)
+ jobServer = TSelectorServer("rowz-jobs", config("rowz.job_server_port").toInt, jobProcessor, executor, timeout, idleTimeout)
val shardManagerService = new ShardManagerService(state.nameServer, state.copyFactory, state.prioritizingScheduler(Priority.Medium.id))
val shardProcessor = new ShardManager.Processor(ExceptionWrappingProxy(LoggingProxy[ShardManager.Iface](Stats, w3c, "RowzShards", shardManagerService)))
- shardServer = TSelectorServer("rowz-shards", config("rowz.shard_server_port").toInt, shardProcessor, executor, timeout)
+ shardServer = TSelectorServer("rowz-shards", config("rowz.shard_server_port").toInt, shardProcessor, executor, timeout, idleTimeout)
rowzServer.serve()
jobServer.serve()
View
2  src/main/scala/com/twitter/rowz/ReadWriteShardAdapter.scala
@@ -15,4 +15,4 @@ class ReadWriteShardAdapter(shard: ReadWriteShard[Shard])
def read(id: Long) = shard.readOperation(_.read(id))
def selectAll(cursor: Cursor, count: Int) = shard.readOperation(_.selectAll(cursor, count))
-}
+}
View
15 src/main/scala/com/twitter/rowz/Rowz.scala
@@ -10,7 +10,6 @@ import com.twitter.gizzard.Future
import com.twitter.gizzard.scheduler.{JobScheduler, PrioritizingJobScheduler, Priority}
import com.twitter.gizzard.shards._
import com.twitter.gizzard.nameserver
-import com.twitter.gizzard.nameserver.{NameServer, ShardRepository, LoadBalancer}
import com.twitter.gizzard.jobs.{PolymorphicJobParser, BoundJobParser}
import scala.collection.mutable
import com.twitter.ostrich.W3CStats
@@ -20,7 +19,7 @@ object Rowz {
case class State(
rowzService: RowzService,
prioritizingScheduler: PrioritizingJobScheduler,
- nameServer: NameServer[Shard],
+ nameServer: nameserver.NameServer[Shard],
copyFactory: gizzard.jobs.CopyFactory[Shard]) {
def start() = {
nameServer.reload()
@@ -47,12 +46,8 @@ object Rowz {
val throttledLogger = new ThrottledLogger[String](Logger(), config("throttled_log.period_msec").toInt, config("throttled_log.rate").toInt)
val future = new Future("ReplicatingFuture", config.configMap("rowz.replication.future"))
- val shardRepository = new ShardRepository[Shard]
- shardRepository += ("com.twitter.rowz.SqlShard" -> new SqlShardFactory(queryEvaluatorFactory, config))
- shardRepository += ("com.twitter.gizzard.shards.ReadOnlyShard" -> new ReadOnlyShardFactory(new ReadWriteShardAdapter(_)))
- shardRepository += ("com.twitter.gizzard.shards.BlockedShard" -> new BlockedShardFactory(new ReadWriteShardAdapter(_)))
- shardRepository += ("com.twitter.gizzard.shards.WriteOnlyShard" -> new WriteOnlyShardFactory(new ReadWriteShardAdapter(_)))
- shardRepository += ("com.twitter.gizzard.shards.ReplicatingShard" -> new ReplicatingShardFactory(new ReadWriteShardAdapter(_), throttledLogger, { (x, y) => }, future))
+ val shardRepository = new nameserver.BasicShardRepository[Shard](new ReadWriteShardAdapter(_), throttledLogger, future)
+ shardRepository += ("com.twitter.rowz.SqlShard" -> new SqlShardFactory(queryEvaluatorFactory, config))
val nameServerShards = config.getList("rowz.nameserver.hostnames").map { hostname =>
new nameserver.SqlShard(
@@ -65,8 +60,8 @@ object Rowz {
val replicatingNameServerShard = new nameserver.ReadWriteShardAdapter(new ReplicatingShard(
new ShardInfo("com.twitter.gizzard.shards.ReplicatingShard", "", ""),
- 1, nameServerShards, new LoadBalancer(nameServerShards), throttledLogger, future, { (x, y) => }))
- val nameServer = new NameServer(replicatingNameServerShard, shardRepository, Hash)
+ 1, nameServerShards, new nameserver.LoadBalancer(nameServerShards), throttledLogger, future))
+ val nameServer = new nameserver.NameServer(replicatingNameServerShard, shardRepository, Hash)
val forwardingManager = new ForwardingManager(nameServer)
val polymorphicJobParser = new PolymorphicJobParser
View
6 src/main/thrift/Rowz.thrift
@@ -14,7 +14,7 @@ exception RowzException {
}
service Rowz {
- i64 create(1: string name, 2: i32 at) throws(RowzException ex)
- void destroy(1: Row row, 2: i32 at) throws(RowzException ex)
- Row read(1: i64 id) throws(RowzException ex)
+ i64 create(1: string name, 2: i32 at) throws(1: RowzException ex)
+ void destroy(1: Row row, 2: i32 at) throws(1: RowzException ex)
+ Row read(1: i64 id) throws(1: RowzException ex)
}
Please sign in to comment.
Something went wrong with that request. Please try again.