Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
thrift 0.2.0 support (and updated gizzard).
Browse files Browse the repository at this point in the history
  • Loading branch information
Robey Pointer committed Apr 8, 2010
1 parent c523b34 commit eb1a4e5
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 25 deletions.
1 change: 1 addition & 0 deletions config/development.conf
Expand Up @@ -46,6 +46,7 @@ throttled_log {

rowz {
timeout_msec = 100
idle_timeout_sec = 60
server_port = 7919
shard_server_port = 7920
job_server_port = 7921
Expand Down
1 change: 1 addition & 0 deletions config/production.conf
Expand Up @@ -46,6 +46,7 @@ throttled_log {

rowz {
timeout_msec = 100
idle_timeout_sec = 60
server_port = 7919
shard_server_port = 7920
job_server_port = 7921
Expand Down
1 change: 1 addition & 0 deletions config/test.conf
Expand Up @@ -46,6 +46,7 @@ throttled_log {

rowz {
timeout_msec = 100
idle_timeout_sec = 60
server_port = 7919
shard_server_port = 7920
job_server_port = 7921
Expand Down
6 changes: 5 additions & 1 deletion ivy/ivy.xml
Expand Up @@ -26,13 +26,17 @@
<dependency org="org.scala-tools" name="vscaladoc" rev="1.1-md-3" conf="bootstrap->*" />
<dependency org="net.lag" name="kestrel" rev="1.2" />

<!-- thrift dependencies -->
<dependency org="org.slf4j" name="slf4j-jdk14" rev="1.5.2" />
<dependency org="org.slf4j" name="slf4j-api" rev="1.5.2" />
<dependency org="thrift" name="libthrift" rev="0.2.0" conf="*" /> <!--auto-->

<dependency org="net.lag" name="configgy" rev="1.4" />
<dependency org="commons-logging" name="commons-logging" rev="1.1" />
<dependency org="commons-lang" name="commons-lang" rev="2.2" />
<dependency org="com.twitter" name="ostrich" rev="1.1.13" conf="*" /> <!--auto-->
<dependency org="com.twitter" name="gizzard" rev="1.0" conf="*" /> <!--auto-->
<dependency org="com.twitter" name="querulous" rev="1.1.4" />
<dependency org="thrift" name="libthrift" rev="751142" conf="*" /> <!--auto-->
<dependency org="org.jmock" name="jmock" rev="2.4.0" conf="test->*" /> <!--auto-->
<dependency org="org.hamcrest" name="hamcrest-all" rev="1.1" conf="test->*" /> <!--auto-->
<dependency org="cglib" name="cglib" rev="2.1_3" conf="test->*" /> <!--auto-->
Expand Down
Binary file modified libs/gizzard-1.0.jar
Binary file not shown.
21 changes: 11 additions & 10 deletions src/main/scala/com/twitter/rowz/Main.scala
@@ -1,17 +1,17 @@
package com.twitter.rowz

import com.twitter.gizzard.thrift.{JobManagerService, ShardManagerService}
import net.lag.configgy.{Configgy, RuntimeEnvironment, ConfigMap}
import net.lag.logging.Logger
import com.twitter.gizzard.jobs.CopyFactory
import com.twitter.gizzard.nameserver.NameServer
import com.twitter.gizzard.proxy.LoggingProxy
import com.twitter.gizzard.scheduler.{PrioritizingJobScheduler, Priority}
import com.twitter.gizzard.thrift.{TSelectorServer, JobManager, ShardManager}
import com.facebook.thrift.server.{TServer, TThreadPoolServer}
import com.facebook.thrift.transport.{TServerSocket, TTransportFactory}
import com.twitter.gizzard.thrift.{TSelectorServer, JobManager, JobManagerService, ShardManager,
ShardManagerService}
import com.twitter.ostrich.{W3CStats, Stats}
import com.twitter.xrayspecs.TimeConversions._
import com.twitter.gizzard.proxy.LoggingProxy
import net.lag.configgy.{Configgy, RuntimeEnvironment, ConfigMap}
import net.lag.logging.Logger
import org.apache.thrift.server.{TServer, TThreadPoolServer}
import org.apache.thrift.transport.{TServerSocket, TTransportFactory}


object Main {
Expand Down Expand Up @@ -40,17 +40,18 @@ object Main {

def startThrift(w3c: W3CStats) {
val timeout = config("rowz.timeout_msec").toInt.milliseconds
val idleTimeout = config("rowz.idle_timeout_sec").toInt.seconds
val executor = TSelectorServer.makeThreadPoolExecutor(config.configMap("rowz"))
val processor = new rowz.thrift.Rowz.Processor(LoggingProxy[rowz.thrift.Rowz.Iface](Stats, w3c, "Rowz", state.rowzService))
rowzServer = TSelectorServer("rowz", config("rowz.server_port").toInt, processor, executor, timeout)
rowzServer = TSelectorServer("rowz", config("rowz.server_port").toInt, processor, executor, timeout, idleTimeout)

val jobManagerService = new JobManagerService(state.prioritizingScheduler)
val jobProcessor = new JobManager.Processor(LoggingProxy[JobManager.Iface](Stats, w3c, "RowzJobs", jobManagerService))
jobServer = TSelectorServer("rowz-jobs", config("rowz.job_server_port").toInt, jobProcessor, executor, timeout)
jobServer = TSelectorServer("rowz-jobs", config("rowz.job_server_port").toInt, jobProcessor, executor, timeout, idleTimeout)

val shardManagerService = new ShardManagerService(state.nameServer, state.copyFactory, state.prioritizingScheduler(Priority.Medium.id))
val shardProcessor = new ShardManager.Processor(ExceptionWrappingProxy(LoggingProxy[ShardManager.Iface](Stats, w3c, "RowzShards", shardManagerService)))
shardServer = TSelectorServer("rowz-shards", config("rowz.shard_server_port").toInt, shardProcessor, executor, timeout)
shardServer = TSelectorServer("rowz-shards", config("rowz.shard_server_port").toInt, shardProcessor, executor, timeout, idleTimeout)

rowzServer.serve()
jobServer.serve()
Expand Down
Expand Up @@ -15,4 +15,4 @@ class ReadWriteShardAdapter(shard: ReadWriteShard[Shard])

def read(id: Long) = shard.readOperation(_.read(id))
def selectAll(cursor: Cursor, count: Int) = shard.readOperation(_.selectAll(cursor, count))
}
}
15 changes: 5 additions & 10 deletions src/main/scala/com/twitter/rowz/Rowz.scala
Expand Up @@ -10,7 +10,6 @@ import com.twitter.gizzard.Future
import com.twitter.gizzard.scheduler.{JobScheduler, PrioritizingJobScheduler, Priority}
import com.twitter.gizzard.shards._
import com.twitter.gizzard.nameserver
import com.twitter.gizzard.nameserver.{NameServer, ShardRepository, LoadBalancer}
import com.twitter.gizzard.jobs.{PolymorphicJobParser, BoundJobParser}
import scala.collection.mutable
import com.twitter.ostrich.W3CStats
Expand All @@ -20,7 +19,7 @@ object Rowz {
case class State(
rowzService: RowzService,
prioritizingScheduler: PrioritizingJobScheduler,
nameServer: NameServer[Shard],
nameServer: nameserver.NameServer[Shard],
copyFactory: gizzard.jobs.CopyFactory[Shard]) {
def start() = {
nameServer.reload()
Expand All @@ -47,12 +46,8 @@ object Rowz {
val throttledLogger = new ThrottledLogger[String](Logger(), config("throttled_log.period_msec").toInt, config("throttled_log.rate").toInt)
val future = new Future("ReplicatingFuture", config.configMap("rowz.replication.future"))

val shardRepository = new ShardRepository[Shard]
shardRepository += ("com.twitter.rowz.SqlShard" -> new SqlShardFactory(queryEvaluatorFactory, config))
shardRepository += ("com.twitter.gizzard.shards.ReadOnlyShard" -> new ReadOnlyShardFactory(new ReadWriteShardAdapter(_)))
shardRepository += ("com.twitter.gizzard.shards.BlockedShard" -> new BlockedShardFactory(new ReadWriteShardAdapter(_)))
shardRepository += ("com.twitter.gizzard.shards.WriteOnlyShard" -> new WriteOnlyShardFactory(new ReadWriteShardAdapter(_)))
shardRepository += ("com.twitter.gizzard.shards.ReplicatingShard" -> new ReplicatingShardFactory(new ReadWriteShardAdapter(_), throttledLogger, { (x, y) => }, future))
val shardRepository = new nameserver.BasicShardRepository[Shard](new ReadWriteShardAdapter(_), throttledLogger, future)
shardRepository += ("com.twitter.rowz.SqlShard" -> new SqlShardFactory(queryEvaluatorFactory, config))

val nameServerShards = config.getList("rowz.nameserver.hostnames").map { hostname =>
new nameserver.SqlShard(
Expand All @@ -65,8 +60,8 @@ object Rowz {

val replicatingNameServerShard = new nameserver.ReadWriteShardAdapter(new ReplicatingShard(
new ShardInfo("com.twitter.gizzard.shards.ReplicatingShard", "", ""),
1, nameServerShards, new LoadBalancer(nameServerShards), throttledLogger, future, { (x, y) => }))
val nameServer = new NameServer(replicatingNameServerShard, shardRepository, Hash)
1, nameServerShards, new nameserver.LoadBalancer(nameServerShards), throttledLogger, future))
val nameServer = new nameserver.NameServer(replicatingNameServerShard, shardRepository, Hash)
val forwardingManager = new ForwardingManager(nameServer)

val polymorphicJobParser = new PolymorphicJobParser
Expand Down
6 changes: 3 additions & 3 deletions src/main/thrift/Rowz.thrift
Expand Up @@ -14,7 +14,7 @@ exception RowzException {
}

service Rowz {
i64 create(1: string name, 2: i32 at) throws(RowzException ex)
void destroy(1: Row row, 2: i32 at) throws(RowzException ex)
Row read(1: i64 id) throws(RowzException ex)
i64 create(1: string name, 2: i32 at) throws(1: RowzException ex)
void destroy(1: Row row, 2: i32 at) throws(1: RowzException ex)
Row read(1: i64 id) throws(1: RowzException ex)
}

0 comments on commit eb1a4e5

Please sign in to comment.