Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://git.twitter.biz/kestrel
Browse files Browse the repository at this point in the history
  • Loading branch information
Robey Pointer committed Mar 2, 2012
2 parents 834c4e3 + 35e4ac1 commit 8bd2821
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 15 deletions.
2 changes: 1 addition & 1 deletion loadtest/project/build/LoadtestProject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class LoadtestProject(info: ProjectInfo) extends StandardServiceProject(info)
def runParrot(config: String) = runParrotTask(Array("-f", config))
def killParrot(config: String) = runParrotTask(Array("-f", config, "-k"))

val parrot = "com.twitter" % "parrot" % "0.4.3"
val parrot = "com.twitter" % "parrot" % "0.4.5"

override def ivyXML =
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package net.lag.kestrel.loadtest

import util.Random
import com.twitter.conversions.time._
import com.twitter.parrot.util.RequestDistribution
import com.twitter.util.{Duration, JavaTimer, Time}
import org.apache.commons.math.distribution.ExponentialDistributionImpl
import java.util.concurrent.TimeUnit
import java.lang.IllegalArgumentException

case class ArrivalRate(val arrivalsPerSecond: Int, val duration: Duration) {
if (arrivalsPerSecond <= 0) {
throw new IllegalArgumentException("arrivalsPerSecond must be >= 1: " + arrivalsPerSecond)
}

if (duration < 1.millisecond) {
throw new IllegalArgumentException("duration must be >= 1 millisecond: " + duration)
}
}

class VaryingPoissonProcess(val arrivalRates: Seq[ArrivalRate]) extends RequestDistribution {
if (arrivalRates.isEmpty) {
throw new IllegalArgumentException("must have at least one arrival rate")
}

private[this] val rand = new Random(Time.now.inMillis)
private[this] val timer = new JavaTimer(true)

private[this] var iter: Iterator[ArrivalRate] = Iterator.empty
private[this] var dist: Option[ExponentialDistributionImpl] = None

private def updateDistribution() {
synchronized {
if (!iter.hasNext) {
iter = arrivalRates.iterator
}

val nextArrivalRate = iter.next
dist = Some(new ExponentialDistributionImpl(1000000000.0 / nextArrivalRate.arrivalsPerSecond))
timer.schedule(Time.now + nextArrivalRate.duration) {
updateDistribution()
}
}
}

def timeToNextArrival(): Duration = {
if (!dist.isDefined) {
updateDistribution()
}

val nanosToNextArrival = dist.get.inverseCumulativeProbability(rand.nextDouble())
Duration(nanosToNextArrival.toLong, TimeUnit.NANOSECONDS)
}
}
8 changes: 4 additions & 4 deletions project/build/KestrelProject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ class KestrelProject(info: ProjectInfo) extends StandardServiceProject(info) wit
with PublishSourcesAndJavadocs
with PublishSite
{
val utilCore = "com.twitter" % "util-core" % "1.12.4"
val utilEval = "com.twitter" % "util-eval" % "1.12.4"
val utilLogging = "com.twitter" % "util-logging" % "1.12.4"
val utilCore = "com.twitter" % "util-core" % "1.12.13"
val utilEval = "com.twitter" % "util-eval" % "1.12.13"
val utilLogging = "com.twitter" % "util-logging" % "1.12.13"

val ostrich = "com.twitter" % "ostrich" % "4.10.0"
val ostrich = "com.twitter" % "ostrich_2.8.1" % "4.10.6"
val naggati = "com.twitter" % "naggati" % "2.1.1" intransitive() // allow custom netty
val netty = "org.jboss.netty" % "netty" % "3.2.6.Final"

Expand Down
42 changes: 32 additions & 10 deletions src/main/scala/net/lag/kestrel/Kestrel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,33 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],

memcacheAcceptor.foreach { _.close().awaitUninterruptibly() }
textAcceptor.foreach { _.close().awaitUninterruptibly() }
queueCollection.shutdown()

if (queueCollection ne null) {
queueCollection.shutdown()
queueCollection = null
}

channelGroup.close().awaitUninterruptibly()
channelFactory.releaseExternalResources()

executor.shutdown()
executor.awaitTermination(5, TimeUnit.SECONDS)
timer.stop()
timer = null
journalSyncScheduler.shutdown()
journalSyncScheduler.awaitTermination(5, TimeUnit.SECONDS)
journalSyncScheduler = null
if (channelFactory ne null) {
channelFactory.releaseExternalResources()
channelFactory = null
}

if (executor ne null) {
executor.shutdown()
executor.awaitTermination(5, TimeUnit.SECONDS)
executor = null
}
if (timer ne null) {
timer.stop()
timer = null
}
if (journalSyncScheduler ne null) {
journalSyncScheduler.shutdown()
journalSyncScheduler.awaitTermination(5, TimeUnit.SECONDS)
journalSyncScheduler = null
}

log.info("Goodbye.")
}

Expand Down Expand Up @@ -227,6 +243,12 @@ object Kestrel {
} catch {
case e =>
log.error(e, "Exception during startup; exiting!")

// Shut down all registered services such as AdminHttpService properly
// so that SBT does not refuse to shut itself down when 'sbt run -f ...'
// fails.
ServiceTracker.shutdown()

System.exit(1)
}
log.info("Kestrel %s started.", runtime.jarVersion)
Expand Down

0 comments on commit 8bd2821

Please sign in to comment.