Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Commit

Permalink
Merge branch 'master' into generic_forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
Ed Ceaser committed Mar 15, 2010
2 parents fe7f50d + 089cf98 commit 2f2efec
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 26 deletions.
20 changes: 17 additions & 3 deletions src/main/scala/com/twitter/gizzard/Future.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.twitter.gizzard

import java.util.concurrent._
import com.twitter.xrayspecs.Duration
import com.twitter.ostrich.Stats
import com.twitter.xrayspecs.{Duration, Time}
import com.twitter.xrayspecs.TimeConversions._
import net.lag.configgy.ConfigMap

Expand All @@ -18,16 +19,29 @@ class Future(name: String, poolSize: Int, maxPoolSize: Int, keepAlive: Duration,
config("keep_alive_time_seconds").toInt.seconds,
config("timeout_seconds").toInt.seconds)

private val executor = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAlive.inSeconds,
var executor = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAlive.inSeconds,
TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable], new NamedPoolThreadFactory(name))

Stats.makeGauge("future-" + name + "-queue-size") { executor.getQueue().size() }

def apply[A](a: => A) = {
val future = new FutureTask(new Callable[A] {
def call = a
val startTime = Time.now
def call = {
if (Time.now - startTime > timeout) {
Stats.incr("future-" + name + "-timeout")
throw new TimeoutException("future spent too long in queue")
}
a
}
})
executor.execute(future)
future
}

def shutdown() {
executor.shutdown()
}
}

class ParallelSeq[A](seq: Seq[A], future: Future) extends Seq[A] {
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/twitter/gizzard/jobs/Job.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ case class BoundJob[E](protected val unboundJob: UnboundJob[E], protected val en
def apply() { unboundJob(environment) }
}

class LoggingJob(w3cStats: W3CStats, job: Job) extends JobProxy(job) {
def apply() { LoggingProxy(w3cStats, job.loggingName, job).apply() }
class LoggingJob(stats: Option[StatsProvider], w3cStats: W3CStats, job: Job) extends JobProxy(job) {
def apply() { LoggingProxy(stats, w3cStats, job.loggingName, job).apply() }
}

case class ErrorHandlingConfig(maxErrorCount: Int, badJobsLogger: String => Unit, stats: Option[StatsProvider])
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/twitter/gizzard/jobs/JobParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ class ErrorHandlingJobParser(jobParser: JobParser, config: ErrorHandlingConfig)
}
}

class LoggingJobParser(w3cStats: W3CStats, jobParser: JobParser) extends JobParser {
def apply(json: Map[String, Map[String, Any]]) = new LoggingJob(w3cStats, jobParser(json))
class LoggingJobParser(stats: Option[StatsProvider], w3cStats: W3CStats, jobParser: JobParser) extends JobParser {
def apply(json: Map[String, Map[String, Any]]) = new LoggingJob(stats, w3cStats, jobParser(json))
}

class JournaledJobParser(jobParser: JobParser, journaller: String => Unit) extends JobParser {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.twitter.gizzard.nameserver
import jobs.{CopyMachine, JobScheduler}
import shards.Shard


trait CopyManager[S <: Shard] {
/** Return a scheduler to be used for running copy/migrate jobs. */
def scheduler: JobScheduler
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/com/twitter/gizzard/proxy/LoggingProxy.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.twitter.gizzard.proxy

import scala.reflect.Manifest
import com.twitter.ostrich.W3CStats
import com.twitter.ostrich.{StatsProvider, W3CStats}


/**
Expand All @@ -11,13 +11,14 @@ import com.twitter.ostrich.W3CStats
object LoggingProxy {
var counter = 0

def apply[T <: AnyRef](logger: W3CStats, name: String, obj: T)(implicit manifest: Manifest[T]): T = {
def apply[T <: AnyRef](stats: Option[StatsProvider], logger: W3CStats, name: String, obj: T)(implicit manifest: Manifest[T]): T = {
Proxy(obj) { method =>
val shortName = name.lastIndexOf('.') match {
case -1 => name
case n => name.substring(n + 1)
}

stats.map { _.incr("x-operation-" + shortName + ":" + method.name) }
logger.transaction {
logger.log("operation", shortName + ":" + method.name)
val arguments = (if (method.args != null) method.args.mkString(",") else "").replace(' ', '_')
Expand Down
36 changes: 25 additions & 11 deletions src/main/scala/com/twitter/gizzard/thrift/TSelectorServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ package com.twitter.gizzard.thrift
import java.io.IOException
import java.net.InetSocketAddress
import java.nio.channels._
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue, ThreadPoolExecutor,
TimeoutException, TimeUnit}
import scala.collection.jcl
import scala.collection.mutable
import com.facebook.thrift._
import com.facebook.thrift.protocol._
import com.facebook.thrift.transport._
import com.facebook.thrift.server._
import com.twitter.ostrich.Stats
import com.twitter.xrayspecs.{Duration, Time}
import net.lag.configgy.ConfigMap
import net.lag.logging.Logger

Expand All @@ -26,16 +28,18 @@ object TSelectorServer {
new ThreadPoolExecutor(minThreads, maxThreads, stopTimeout, TimeUnit.SECONDS, queue)
}

def apply(port: Int, processor: TProcessor, executor: ThreadPoolExecutor) = {
def apply(name: String, port: Int, processor: TProcessor, executor: ThreadPoolExecutor,
timeout: Duration) = {
val socket = ServerSocketChannel.open()
socket.socket().setReuseAddress(true)
socket.socket().bind(new InetSocketAddress(port), 8192)
log.info("Starting %s on port %d", processor.getClass.getName, port)
new TSelectorServer(processor, socket, executor)
log.info("Starting %s (%s) on port %d", name, processor.getClass.getName, port)
new TSelectorServer(name, processor, socket, executor, timeout)
}
}

class TSelectorServer(processor: TProcessor, serverSocket: ServerSocketChannel, executor: ThreadPoolExecutor) extends TServer(null, null) {
class TSelectorServer(name: String, processor: TProcessor, serverSocket: ServerSocketChannel,
executor: ThreadPoolExecutor, timeout: Duration) extends TServer(null, null) {
val log = Logger.get(getClass.getName)

val processorFactory = new TProcessorFactory(processor)
Expand All @@ -54,14 +58,24 @@ class TSelectorServer(processor: TProcessor, serverSocket: ServerSocketChannel,
val registerQueue = new ConcurrentLinkedQueue[SocketChannel]


Stats.makeGauge("thrift-worker-threads") { executor.getPoolSize().toDouble }
Stats.makeGauge("thrift-connections") { clientMap.synchronized { clientMap.size } }
Stats.makeGauge("thrift-queue-size") { executor.getQueue().size() }
Stats.makeGauge("thrift-" + name + "-worker-threads") { executor.getPoolSize().toDouble }
Stats.makeGauge("thrift-" + name + "-connections") { clientMap.synchronized { clientMap.size } }
Stats.makeGauge("thrift-" + name + "-queue-size") { executor.getQueue().size() }

def isRunning = running

def execute(f: => Unit) {
executor.execute(new Runnable() { def run() { f } })
executor.execute(new Runnable() {
val startTime = Time.now

def run() {
if (Time.now - startTime > timeout) {
Stats.incr("thrift-" + name + "-timeout")
throw new TimeoutException("thrift connection spent too long in queue")
}
f
}
})
}

def serve() {
Expand Down Expand Up @@ -150,7 +164,7 @@ class TSelectorServer(processor: TProcessor, serverSocket: ServerSocketChannel,
try {
client.socketChannel.configureBlocking(true)
client.processor.process(client.inputProtocol, client.outputProtocol)
Stats.incr("thrift-calls")
Stats.incr("thrift-" + name + "-calls")
registerQueue.add(client.socketChannel)
selector.wakeup()
} catch {
Expand All @@ -163,7 +177,7 @@ class TSelectorServer(processor: TProcessor, serverSocket: ServerSocketChannel,
}
}
if (duration > 50) {
Stats.incr("thrift-work-50")
Stats.incr("thrift-" + name + "-work-50")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ object ShardInfo {
def toThrift = new thrift.ShardInfo(shardInfo.className, shardInfo.tablePrefix, shardInfo.hostname,
shardInfo.sourceType, shardInfo.destinationType,
shardInfo.busy.toThrift, shardInfo.shardId)

}
implicit def shardingShardInfoToRichShardingShardInfo(shardInfo: shards.ShardInfo) = new RichShardingShardInfo(shardInfo)

class RichThriftShardInfo(shardInfo: thrift.ShardInfo) {
def fromThrift = new shards.ShardInfo(shardInfo.class_name, shardInfo.table_prefix, shardInfo.hostname,
shardInfo.source_type, shardInfo.destination_type,
shardInfo.busy.fromThrift, shardInfo.shard_id)

}
implicit def thriftShardInfoToRichThriftShardInfo(shardInfo: thrift.ShardInfo) = new RichThriftShardInfo(shardInfo)
}
}
43 changes: 43 additions & 0 deletions src/test/scala/com/twitter/gizzard/FutureSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.twitter.gizzard

import java.util.concurrent.{CountDownLatch, ExecutionException, SynchronousQueue,
ThreadPoolExecutor, TimeoutException, TimeUnit}
import scala.collection.mutable
import com.twitter.xrayspecs.TimeConversions._
import org.specs.Specification
import org.specs.mock.{ClassMocker, JMocker}


object FutureSpec extends Specification with JMocker with ClassMocker {

"Future" should {
var future: Future = null

doBefore {
future = new Future("test", 1, 1, 1.hour, 10.milliseconds)
}

doAfter {
future.shutdown()
}

"execute in the future" in {
future { 3 * 4 }.get mustEqual 12
}

"timeout appropriately" in {
future { Thread.sleep(20) }.get(10, TimeUnit.MILLISECONDS) must throwA[TimeoutException]
}

"timeout a stuffed-up queue" in {
val startFlag = new CountDownLatch(1)
new Thread() {
override def run() {
future { startFlag.countDown(); Thread.sleep(20) }
}
}.start()
startFlag.await()
future { 3 * 4 }.get must throwA[ExecutionException]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ object LoggingProxySpec extends Specification with JMocker with ClassMocker {
"log stats on a proxied object" in {
val logger = mock[Logger]
val bob = new Named { def name = "bob" }
val bobProxy = LoggingProxy[Named](new W3CStats(logger, Array("operation", "arguments", "action-timing")), "Bob", bob)
val w3cStats = new W3CStats(logger, Array("operation", "arguments", "action-timing"))
val bobProxy = LoggingProxy[Named](None, w3cStats, "Bob", bob)

val line = capturingParam[String]
expect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ object ReplicatingShardSpec extends Specification with JMocker with ClassMocker
val exception = new ShardException("o noes")
expect {
allowing(shard1).weight willReturn 1
allowing(shard2).weight willReturn 0
allowing(shard2).weight willReturn 1
one(shard1).shardInfo willReturn shard1Info
one(shard2).shardInfo willReturn shard1Info
one(shard1).getName().willThrow(exception) then
one(shard1).getName().willThrow(exception)
one(shard2).getName().willThrow(exception)
}
replicatingShard.getName() must throwA[ShardException]
Expand Down

0 comments on commit 2f2efec

Please sign in to comment.