Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'localmode'

  • Loading branch information...
commit b6af109b79878f4e2bcd54e06baf93b08e13f441 2 parents 663e393 + 3603b6e
James Waldrop authored
View
2  README.md
@@ -24,6 +24,8 @@
## Iago Quick Start
+NOTE: This repo has only recently been made public and our velocity is high at the moment, with significant work being done on documentation in particular. Please join iago-users@googlegroups.com (https://groups.google.com/d/forum/iago-users) for updates and to ask pressing questions.
+
If you are already familiar with the Iago Load Generation tool, follow these steps to get started; otherwise, start with the <a href="#Iago Overview">Iago Overview</a>. For questions, please contact <a href="mailto:iago-users@googlegroups.com">iago-users@googlegroups.com</a>.
<a name="Iago Prerequisites"></a>
View
13 config/local-launcher.scala
@@ -0,0 +1,13 @@
+import com.twitter.parrot.config.ParrotLauncherConfig
+
+new ParrotLauncherConfig {
+ localMode = true
+ jobName = "testrun"
+ port = 80
+ victims = "twitter.com"
+ log = "config/replay.log"
+ requestRate = 5
+ maxRequests = 20
+ reuseFile = false
+}
+
View
3  src/main/resources/scripts/local-parrot.sh
@@ -0,0 +1,3 @@
+sh scripts/parrot-feeder.sh start-local &
+sh scripts/parrot-server.sh start-local &
+
View
5 src/main/resources/scripts/parrot-feeder.sh
@@ -17,7 +17,7 @@ LOG_HOME=$APP_HOME
MAIN_CLASS="com.twitter.parrot.feeder.FeederMain"
HEAP_OPTS="-Xmx2000m -Xms2000m -XX:NewSize=512m"
GC_OPTS="-XX:+UseConcMarkSweepGC -verbosegc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseParNewGC -Xloggc:$LOG_HOME/gc.log"
-DEBUG_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8008"
+DEBUG_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5004"
JAVA_OPTS="-server $GC_OPTS $HEAP_OPTS $PROFILE_OPTS" #$DEBUG_OPTS"
# Used to set JAVA_HOME sanely if not already set.
@@ -40,7 +40,8 @@ case "$1" in
# start-local is meant for development and runs your server in the foreground.
start-local)
- ${JAVA_HOME}/bin/java ${JAVA_OPTS} -cp ${APP_HOME}/${MAIN_JAR} ${MAIN_CLASS} -f ${APP_HOME}/config/dev-feeder.scala $2 $3 $4 $5 $6 $7 $8 $9 ${10}
+ ${JAVA_HOME}/bin/java ${JAVA_OPTS} -cp ${APP_HOME}/*jar ${MAIN_CLASS} -f ${APP_HOME}/config/target/local-feeder.scala $2 $3 $4 $5 $6 $7 $8 $9 ${10}
+echo "done."
;;
start-mesos)
View
6 src/main/resources/scripts/parrot-server.sh
@@ -16,7 +16,8 @@ LOG_HOME=$APP_HOME
MAIN_CLASS="com.twitter.parrot.server.ServerMain"
HEAP_OPTS="-Xmx#{serverXmx}m -Xms2000m -XX:NewSize=512m"
GC_OPTS="-XX:+UseConcMarkSweepGC -verbosegc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseParNewGC -Xloggc:$LOG_HOME/gc.log"
-JAVA_OPTS="-server $GC_OPTS $HEAP_OPTS $PROFILE_OPTS"
+DEBUG_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"
+JAVA_OPTS="-server $GC_OPTS $HEAP_OPTS $PROFILE_OPTS" # $DEBUG_OPTS"
# Used to set JAVA_HOME sanely if not already set.
function find_java() {
@@ -38,7 +39,8 @@ case "$1" in
# start-local is meant for development and runs your server in the foreground.
start-local)
- ${JAVA_HOME}/bin/java ${JAVA_OPTS} -cp ${APP_HOME}/* ${MAIN_CLASS} -f ${APP_HOME}/config/dev-server.scala
+ ${JAVA_HOME}/bin/java ${JAVA_OPTS} -cp ${APP_HOME}/*jar ${MAIN_CLASS} -f ${APP_HOME}/config/target/local-server.scala
+echo "done."
;;
start-mesos)
View
39 src/main/resources/templates/local-template-feeder.scala
@@ -0,0 +1,39 @@
+import com.twitter.parrot.config.ParrotFeederConfig
+import com.twitter.logging._
+import com.twitter.parrot.feeder.LogSourceImpl
+import com.twitter.util.Duration
+import java.util.concurrent.TimeUnit
+
+new ParrotFeederConfig {
+ val victimList = "#{victims}"
+ jobName = "#{jobName}"
+ victimHosts = victimList.split(',').toList
+ victimPort = #{port}
+ victimScheme = "#{scheme}"
+ inputLog = "#{fullLog}"
+ httpPort = 9900
+
+ zkHostName = None
+ zkPort = -1
+ zkNode = ""
+
+ linesToSkip = 0
+
+ numInstances = #{numInstances}
+ maxRequests = #{maxRequests}
+ batchSize = math.min(maxRequests, 1000)
+ duration = Duration(#{duration}, TimeUnit.#{timeUnit})
+ reuseFile = #{reuseFile}
+ requestRate = #{requestRate}
+ parser = "#{processor}"
+ #{customLogSource}
+
+ loggers = new LoggerFactory(
+ level = Level.DEBUG,
+ handlers = FileHandler(
+ filename = "parrot-feeder.log",
+ rollPolicy = Policy.Hourly,
+ rotateCount = 6
+ )
+ )
+}
View
61 src/main/resources/templates/local-template-server.scala
@@ -0,0 +1,61 @@
+import com.twitter.logging._
+import com.twitter.parrot.config.ParrotServerConfig
+import com.twitter.parrot.server._
+import com.twitter.parrot.util.ParrotClusterImpl
+#{responseTypeImport}
+
+new ParrotServerConfig[#{requestType}, #{responseType}] {
+ loggers = new LoggerFactory(
+ level = Level.INFO,
+ handlers = FileHandler(
+ filename = "parrot-server.log",
+ rollPolicy = Policy.Hourly,
+ rotateCount = 6
+ )
+ ) :: new LoggerFactory(
+ node = "stats",
+ level = Level.INFO,
+ useParents = false,
+ handlers = ScribeHandler(
+ hostname = "localhost",
+ category = "cuckoo_json",
+ maxMessagesPerTransaction = 100,
+ formatter = BareFormatter
+ )
+ ) :: loggers
+
+ statsName = "parrot_#{jobName}"
+ thinkTime = 0
+ replayTimeCheck = false
+ slopTimeInMs = 100
+ testHosts = List("api.twitter.com")
+ charEncoding = "deflate"
+ httpHostHeader = Some("#{header}")
+ thriftClientId = "#{thriftClientId}"
+ reuseConnections = #{reuseConnections}
+ hostConnectionLimit = #{hostConnectionLimit}
+ hostConnectionCoresize = #{hostConnectionCoresize}
+ hostConnectionIdleTimeInMs = #{hostConnectionIdleTimeInMs}
+ hostConnectionMaxIdleTimeInMs = #{hostConnectionMaxIdleTimeInMs}
+ hostConnectionMaxLifeTimeInMs = #{hostConnectionMaxLifeTimeInMs}
+
+ // for thrift
+ parrotPort = 9999
+ thriftName = "parrot"
+ clientIdleTimeoutInMs = 15000
+ idleTimeoutInSec = 300
+ minThriftThreads = 10
+
+ // request distribution -- default will be to do nada
+ #{createDistribution}
+
+ transport = Some(new #{transport}(this))
+ queue = Some(new RequestQueue(this))
+ thriftServer = Some(new ThriftServerImpl)
+ clusterService = Some(new ParrotClusterImpl(this))
+
+ // configure after transport so that service is valid
+ loadTestInstance = Some(#{loadTest})
+
+ // Put config options past this point at your own risk
+}
View
9 src/main/scala/com/twitter/parrot/feeder/FeederMain.scala
@@ -17,15 +17,18 @@ package com.twitter.parrot.feeder
import com.twitter.ostrich.admin.RuntimeEnvironment
import com.twitter.logging.Logger
+import com.twitter.util.Eval
+import com.twitter.parrot.config.ParrotFeederConfig
+import java.io.File
object FeederMain {
val log = Logger.get(getClass.getName)
def main(args: Array[String]) {
- val runtime = RuntimeEnvironment(this, args)
- val feeder: ParrotFeeder = runtime.loadRuntimeConfig()
- log.info("Starting Parrot Feeder...")
try {
+ val runtime = RuntimeEnvironment(this, args)
+ log.info("Starting Parrot Feeder...")
+ val feeder: ParrotFeeder = runtime.loadRuntimeConfig()
feeder.start()
} catch {
case t: Throwable =>
View
9 src/main/scala/com/twitter/parrot/feeder/ParrotFeeder.scala
@@ -117,12 +117,7 @@ class ParrotFeeder(config: ParrotFeederConfig) extends Service {
// Must call this before blocking below
cluster.connectParrots()
- // This gives our server(s) a chance to start up by waiting on a latch the Poller manages.
- // This technically could have a bug -- if a server were to start up and then disappear,
- // the latch would still potentially tick down in the poller, and we'd end up with
- // fewer servers than expected. The code further down will cover that condition.
- log.info("Awaiting %d servers to stand up and be recognized.", config.numInstances)
- allServers.await(5, TimeUnit.MINUTES)
+ allServers.await(1, TimeUnit.MINUTES)
if (cluster.runningParrots.isEmpty) {
log.error("Empty Parrots list! Is Parrot running somewhere?")
@@ -162,11 +157,11 @@ class ParrotFeeder(config: ParrotFeederConfig) extends Service {
parrots foreach { parrot =>
if (!initialized(parrot)) {
+ println("initialized parrot")
initialize(parrot)
}
val batch = readBatch(linesToRead)
-
if (batch.size > 0) {
writeBatch(parrot, batch)
View
48 src/main/scala/com/twitter/parrot/launcher/ParrotLauncher.scala
@@ -52,6 +52,7 @@ class ParrotLauncher(config: ParrotLauncherConfig) {
("diskUsed" -> diskUsed.toString),
("doAuth" -> config.doOAuth.toString),
("duration" -> config.duration.toString),
+ ("fullLog" -> config.log),
("header" -> config.header),
("hostConnectionCoresize" -> config.hostConnectionCoresize.toString),
("hostConnectionIdleTimeInMs" -> config.hostConnectionIdleTimeInMs.toString),
@@ -93,11 +94,20 @@ class ParrotLauncher(config: ParrotLauncherConfig) {
log.info("Starting Parrot job named %s", job)
try {
- handleLogFile()
- createConfigs()
- createScripts()
- pauseUntilReady()
- cleanup()
+ if(config.localMode) {
+ handleLogFile()
+ createLocalConfigs()
+ createScripts()
+ pauseUntilReady()
+ createLocalJobs()
+ cleanup()
+ } else {
+ handleLogFile()
+ createRemoteConfigs()
+ createScripts()
+ pauseUntilReady()
+ cleanup()
+ }
}
catch {
case t: Throwable => {
@@ -113,7 +123,16 @@ class ParrotLauncher(config: ParrotLauncherConfig) {
def kill() {
log.info("Killing Parrot job named: %s", job: String)
config.parrotTasks foreach { task =>
- // TBD
+ //CommandRunner("kill -9 `ps aux | grep local-%s.scala | awk '{print $2;}'`".format(task), true)
+ val commandRunner = new CommandRunner("ps aux")
+ commandRunner.run
+ val psOutput = commandRunner.getOutput
+ psOutput.split("\n").map {
+ line => if (line.contains("local-%s.scala".format(task))) {
+ val pid = line.split("\\s+")(1)
+ CommandRunner("kill -9 %s".format(pid))
+ }
+ }
}
CommandRunner.shutdown()
}
@@ -132,7 +151,16 @@ class ParrotLauncher(config: ParrotLauncherConfig) {
}
}
- private[this] def createConfigs() {
+ private[this] def createLocalConfigs() {
+ log.debug("Creating configs.")
+
+ List(("/templates/local-template-feeder.scala", targetDstFolder + "/local-feeder.scala"),
+ ("/templates/local-template-server.scala", targetDstFolder + "/local-server.scala") ) foreach {
+ case (src, dst) => templatize(src, dst, symbols)
+ }
+ }
+
+ private[this] def createRemoteConfigs() {
log.debug("Creating configs.")
List( ("/templates/template.mesos", targetDstFolder + "/config.mesos"),
@@ -148,7 +176,7 @@ class ParrotLauncher(config: ParrotLauncherConfig) {
new File(scriptsDstFolder).mkdir()
}
- List("parrot-feeder.sh", "parrot-server.sh") map { s =>
+ List("local-parrot.sh","parrot-feeder.sh", "parrot-server.sh") map { s =>
("/scripts/" + s, scriptsDstFolder + "/" + s)
} foreach { case (src, dst) =>
templatize(src, dst, symbols)
@@ -166,6 +194,10 @@ class ParrotLauncher(config: ParrotLauncherConfig) {
}
}
+ private[this] def createLocalJobs() {
+ CommandRunner("sh scripts/local-parrot.sh", true)
+ }
+
private[this] def cleanup() {
config.parrotTasks foreach { task =>
CommandRunner("rm %s-%s.zip".format(task, time))
View
1  src/main/scala/com/twitter/parrot/server/RequestQueue.scala
@@ -99,6 +99,7 @@ class RequestQueue[Req <: ParrotRequest, Rep](config: ParrotServerConfig[Req, Re
def clockError = jobs.values.foldLeft(0.0)(_+_.clockError)
def shutdown() {
+
running.set(false)
jobs.values.foreach( _.pause() )
}
View
4 src/main/scala/com/twitter/parrot/server/ServerMain.scala
@@ -17,6 +17,10 @@ package com.twitter.parrot.server
import com.twitter.logging.Logger
import com.twitter.ostrich.admin.{RuntimeEnvironment, Service}
+import com.twitter.util.Eval
+import com.twitter.parrot.config.ParrotServerConfig
+import java.io.File
+import org.jboss.netty.handler.codec.http.HttpResponse
object ServerMain {
val log = Logger.get(getClass.getName)
View
2  src/main/scala/com/twitter/parrot/util/ParrotCluster.scala
@@ -326,9 +326,11 @@ class ParrotClusterImpl(config: Option[ParrotCommonConfig] = None)
allParrots foreach { parrot =>
try {
parrot.shutdown()
+ println("shut down parrot")
}
catch {
case t: Throwable => log.error(t, "Error shutting down Parrot: %s", t.getClass.getName)
+ println(t, "Error shutting down Parrot: %s", t.getClass.getName)
}
}
}
View
2  src/main/scala/com/twitter/parrot/util/RemoteParrot.scala
@@ -84,6 +84,8 @@ class RemoteParrot(val name: String,
}
def shutdown() {
+ consumer.isShutdown.set(true)
+ println("shutting down client")
waitFor(client.shutdown())
service.release
}
Please sign in to comment.
Something went wrong with that request. Please try again.