Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #42 from mzalimeni/master

Default maxRequests to Integer.MAX_VALUE rather than 1000
  • Loading branch information...
commit 41cf6a3ee5872c517ac14b948cfb85985a0cb558 2 parents 86c94d7 + 7f41856
@WamBamBoozle WamBamBoozle authored
View
2  README.md
@@ -527,7 +527,7 @@ new ParrotLauncherConfig {
<td><code>maxRequests</code></td>
<td><p>An integer value that specifies the total number of requests to submit to your service.</p>
<p><b>Example: </b><code>maxRequests = 10000</code></p></td>
- <td><code>1000</code></td>
+ <td><code>Integer.MAX_VALUE</code></td>
</tr>
<tr>
View
2  src/main/scala/com/twitter/parrot/config/ParrotFeederConfig.scala
@@ -31,7 +31,7 @@ trait ParrotFeederConfig extends Config[RuntimeEnvironment => Service]
var inputLog: String = ""
var jobName = "parrot"
var linesToSkip = 0
- var maxRequests = 0
+ var maxRequests = Integer.MAX_VALUE
var numInstances = 1
var numThreads = 0
var pollInterval = Duration(1, TimeUnit.SECONDS)
View
2  src/main/scala/com/twitter/parrot/config/ParrotLauncherConfig.scala
@@ -44,7 +44,7 @@ trait ParrotLauncherConfig extends Config[ParrotLauncher] with ParrotCommonConfi
var localMode = false
var log = required[String]
var maxPerHost = 1
- var maxRequests = 1000
+ var maxRequests = Integer.MAX_VALUE
var mesosEnv = "devel"
var mesosFeederRamInMb: Option[Int] = None
var mesosServerRamInMb: Option[Int] = None
View
3  src/main/scala/com/twitter/parrot/feeder/ParrotFeeder.scala
@@ -43,7 +43,7 @@ class ParrotFeeder(config: ParrotFeederConfig) extends Service {
private[this] val log = Logger.get(getClass.getName)
val requestsRead = new AtomicLong(0)
@volatile
- private[this] var state = FeederState.RUNNING
+ protected[this] var state = FeederState.RUNNING
private[this] val initializedParrots = mutable.Set[RemoteParrot]()
@@ -61,7 +61,6 @@ class ParrotFeeder(config: ParrotFeederConfig) extends Service {
if (config.duration.inMillis > 0) {
shutdownAfter(config.duration)
- config.maxRequests = Integer.MAX_VALUE // don't terminate prematurely
}
// Poller is starting here so that we can validate that we get enough servers, ie
View
6 src/main/scala/com/twitter/parrot/util/RemoteParrot.scala
@@ -117,7 +117,7 @@ class RemoteParrot(val name: String,
def shutdown() {
consumer.shutdown
- waitFor(client.shutdown())
+ waitFor(client.shutdown(), Duration.Top)
service.close()
}
@@ -156,8 +156,8 @@ class RemoteParrot(val name: String,
(service, client)
}
- private[this] def waitFor[A](future: Future[A]): A = {
- future.get(finagleTimeout) match {
+ private[this] def waitFor[A](future: Future[A], timeout: Duration = finagleTimeout): A = {
+ future.get(timeout) match {
case Return(res) => res
case Throw(t) => throw t
}
View
65 src/test/scala/com/twitter/parrot/integration/EndToEndSpec.scala
@@ -27,19 +27,11 @@ import com.twitter.logging.Level
import com.twitter.logging.Logger
import com.twitter.parrot.config.ParrotFeederConfig
import com.twitter.parrot.config.ParrotServerConfig
-import com.twitter.parrot.feeder.InMemoryLog
-import com.twitter.parrot.feeder.ParrotFeeder
+import com.twitter.parrot.feeder.{FeederState, InMemoryLog, ParrotFeeder}
import com.twitter.parrot.server._
import com.twitter.parrot.util.ConsoleHandler
import com.twitter.parrot.util.PrettyDuration
-import com.twitter.util.Await
-import com.twitter.util.Duration
-import com.twitter.util.Eval
-import com.twitter.util.Future
-import com.twitter.util.RandomSocket
-import com.twitter.util.Stopwatch
-import com.twitter.util.Time
-import com.twitter.util.TimeoutException
+import com.twitter.util._
@RunWith(classOf[JUnitRunner])
class EndToEndSpec extends WordSpec with MustMatchers with FeederFixture {
@@ -111,7 +103,7 @@ class EndToEndSpec extends WordSpec with MustMatchers with FeederFixture {
assert(rp.properlyShutDown)
}
- "honor timouts" in {
+ "honor timeouts" in {
val serverConfig = makeServerConfig(true)
serverConfig.cachedSeconds = 1
val transport = serverConfig.transport.get.asInstanceOf[FinagleTransport]
@@ -120,20 +112,59 @@ class EndToEndSpec extends WordSpec with MustMatchers with FeederFixture {
server.start()
val rate = 1 // rps ... requests per second
- val seconds = 20 // how long we expect to take to send our requests
- val totalRequests = (rate * seconds).toInt
- val feederConfig = makeFeederConfig(serverConfig.parrotPort, twitters(totalRequests))
+ val secondsToRun = 10
+ val expectedRequests = (rate * secondsToRun) // expect the number of requests we can send before timeout
+ val feederConfig = makeFeederConfig(serverConfig.parrotPort, twitters(expectedRequests))
feederConfig.reuseFile = true
feederConfig.requestRate = rate
- feederConfig.duration = (seconds / 2).toInt.seconds
+ feederConfig.duration = secondsToRun.seconds
+ feederConfig.maxRequests = Integer.MAX_VALUE // allow for unlimited requests so that timeout can occur
+ feederConfig.batchSize = 3
+
+ val feederDone = Promise[Unit]
+ var feederState = FeederState.RUNNING
+ val feeder = new ParrotFeeder(feederConfig) {
+ override def shutdown() = {
+ feederDone.setDone() // give this test ability to wait on shutdown call
+ feederState = state // capture state at beginning of shutdown call
+ super.shutdown()
+ }
+ }
+ feeder.start()
+ try {
+ Await.ready(feederDone, (secondsToRun + 1).seconds) // time out if feeder timeout is not honored
+ } catch {
+ case e: TimeoutException =>
+ fail(String.format("Server did not time out in %s", PrettyDuration(secondsToRun.seconds)))
+ }
+ feederState must be === FeederState.TIMEOUT
+ }
+
+ "honor the max request limit" in {
+ val serverConfig = makeServerConfig(true)
+ serverConfig.cachedSeconds = 1
+ val transport = serverConfig.transport.get.asInstanceOf[FinagleTransport]
+ val server: ParrotServerImpl[ParrotRequest, HttpResponse] =
+ new ParrotServerImpl(serverConfig)
+ server.start()
+
+ val rate = 1 // rps ... requests per second
+ val requestLimit = 10 // how many requests we want to send
+ val secondsToRun = (rate * requestLimit * 2) // run much longer than needed to send the max number of requests
+ val feederConfig = makeFeederConfig(serverConfig.parrotPort, twitters(requestLimit))
+
+ feederConfig.reuseFile = true
+ feederConfig.requestRate = rate
+ feederConfig.duration = secondsToRun.seconds
+ feederConfig.maxRequests = requestLimit
feederConfig.batchSize = 3
val feeder = new ParrotFeeder(feederConfig)
feeder.start() // shuts down when maxRequests have been sent
- waitForServer(server.done, seconds * 2)
+ waitForServer(server.done, secondsToRun)
val (requestsRead, allRequests, rp) = report(feeder, transport, serverConfig)
- allRequests must be < requestsRead.toInt
+ allRequests must be === requestLimit
assert(rp.properlyShutDown)
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.