Skip to content

Commit

Permalink
Fix to optionally use GrabbyHands for kestrel communication
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Reichhold committed Sep 1, 2010
1 parent 71c0453 commit 973a4c5
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 17 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -7,6 +7,9 @@ bin
.manager
.DS_Store
*.sw?
.project
.classpath
.scala_dependencies
project/boot/
project/plugins/lib_managed/
project/plugins/src_managed/
Expand Down
2 changes: 2 additions & 0 deletions project/build/NankeenProject.scala
Expand Up @@ -2,9 +2,11 @@ import sbt._
import com.twitter.sbt.StandardProject

class NankeenProject(info: ProjectInfo) extends StandardProject(info) {
val twitterRepo = "Twitter Repo" at "http://binaries.local.twitter.com/maven/"
// dependencies
val jackhammer = "com.twitter" % "jackhammer_2.7.7" % "1.0"
val smile = "net.lag" % "smile" % "0.8.12"
val grabby = "com.twitter" % "grabbyhands" % "1.0"

override def mainClass = Some("com.twitter.nankeen.Nankeen")
}
77 changes: 60 additions & 17 deletions src/main/scala/com/twitter/nankeen/Nankeen.scala
@@ -1,20 +1,24 @@
package com.twitter.nankeen

import com.twitter.jackhammer._
import com.twitter.grabbyhands.{Config => GrabbyConfig}
import com.twitter.grabbyhands.GrabbyHands
import com.twitter.grabbyhands.Write
import java.io.File
import java.util.concurrent.LinkedBlockingQueue
import net.lag.logging._
import net.lag.smile._
import scala.collection.mutable._
import java.util.concurrent.TimeUnit

class Reader(client: MemcacheClient[String], queueName: String, numMessages: Int, expectedMessages: Set[String]) extends Runnable{
class Reader(client: MemcacheClient[String], queueName: String, numMessages: Int, expectedMessages: Set[String], useGrabbyHands:Boolean, grabbyHands:GrabbyHands) extends Runnable{
val log = Logger.get("nankeen")

def run() = {
try {
while (expectedMessages.size > 0) {
get()
Thread.sleep(10)
if (!useGrabbyHands) Thread.sleep(10)
}
} catch {
case e => {
Expand All @@ -25,18 +29,28 @@ class Reader(client: MemcacheClient[String], queueName: String, numMessages: Int
}

def get() = {
client.get(queueName) match {
case Some(msg) => {
if( useGrabbyHands ) {
val value = grabbyHands.getRecvQueue(queueName).poll(10, TimeUnit.SECONDS)
if ( value != null ) {
val msg = new String(value.array)
expectedMessages.synchronized {
expectedMessages - msg
}
}
case None => //noop
} else {
client.get(queueName) match {
case Some(msg) => {
expectedMessages.synchronized {
expectedMessages - msg
}
}
case None => //noop
}
}
}
}

class Writer(client: MemcacheClient[String], queueName: String, numMessages: Int, messages: LinkedBlockingQueue[String]) extends Runnable{
class Writer(client: MemcacheClient[String], queueName: String, numMessages: Int, messages: LinkedBlockingQueue[String], useGrabbyHands:Boolean, grabbyHands:GrabbyHands) extends Runnable{
val log = Logger.get("nankeen")

def run(): Unit = {
Expand All @@ -60,12 +74,17 @@ class Writer(client: MemcacheClient[String], queueName: String, numMessages: Int
}

def put(data: String) = {
client.set(queueName, data)
if ( useGrabbyHands ) {
val write = new Write(data)
grabbyHands.getSendQueue(queueName).put(write)
} else {
client.set(queueName, data)
}
}
}

class Loader(servers: Array[String], queueName: String, numWriters: Int,
numReaders: Int, numMessages: Int) extends Runnable{
class Loader(servers: Array[String], queueName: String, numWriters: Int,
numReaders: Int, numMessages: Int, useGrabbyHands:Boolean, grabbyHands:GrabbyHands) extends Runnable{

val distribution = "ketama"
val hash = "fnv1a-64"
Expand All @@ -85,8 +104,8 @@ class Loader(servers: Array[String], queueName: String, numWriters: Int,
pool.servers = connections
client.setPool(pool)

val readers = (1 to numReaders).map(i => new Reader(client, queueName, numMessages, messagesSet)).toList
val writers = (1 to numWriters).map(i => new Writer(client, queueName, numMessages, messagesQueue)).toList
val readers = (1 to numReaders).map(i => new Reader(client, queueName, numMessages, messagesSet, useGrabbyHands, grabbyHands)).toList
val writers = (1 to numWriters).map(i => new Writer(client, queueName, numMessages, messagesQueue, useGrabbyHands, grabbyHands)).toList

def run() = {
val readerThreads = readers.map(r => new Thread(r))
Expand All @@ -95,22 +114,25 @@ class Loader(servers: Array[String], queueName: String, numWriters: Int,
writerThreads.foreach(_.start)
readerThreads.foreach(_.join)
writerThreads.foreach(_.join)
client.shutdown()
if(!useGrabbyHands) {
client.shutdown()
}
}
}

object Nankeen extends LoggingLoadTest {
val messagePrefix = "Nankeen Load Test Message "
val log = Logger.get("nankeen")
def main(args: Array[String]) = {
if (args.length != 7) {
if (args.length != 7 && args.length != 8) {
Console.println("Nankeen")
Console.println(" spin up a number of loaders that each")
Console.println(" spin up M writers and have them write N messages to a queue")
Console.println(" for Z loops")
Console.println(" spin up O writers to drain the queue")
Console.println(" (optional true/false) use grabby hands instead of smile for load test. default is false")
Console.println("usage:")
Console.println(" java -jar nankeen-0.1.jar localhost:22133 test 10 1 1 1 1")
Console.println(" java -jar nankeen-0.1.jar localhost:22133 test 10 1 1 1 1 true")
System.exit(1)
}

Expand All @@ -121,16 +143,34 @@ object Nankeen extends LoggingLoadTest {
val numReaders = args(4).toInt
val numWriters = args(5).toInt
val numMessages = args(6).toInt
var useGrabbyHands = false
if ( args.length == 8 ) {
useGrabbyHands = args(7).toBoolean
}



val timingsFile = new File("timings.log")
log.info("Using %d writers and %d readers to write %d messages to %d queues prefaced by %s".
format(numWriters, numReaders, numMessages, numQueues, queueName))
val queues = (1 to numQueues).toList

while(loops != 0) {
val grabbyHands:GrabbyHands = {
if( false ) {
val grabbyConfig = new GrabbyConfig
grabbyConfig.addServers(Array(hostName))
grabbyConfig.addQueues(queues.map { i => queueName+i } )
new GrabbyHands(grabbyConfig)
} else {
null
}
}

while(loops > 0) {
loops -= 1
val loaderThreads = queues.map {i =>
val loader = new Loader(Array(hostName), queueName + i, numWriters, numReaders, numMessages)
//Console.println(" queue " + i + " loop" + loops)
val loader = new Loader(Array(hostName), queueName + i, numWriters, numReaders, numMessages, useGrabbyHands, grabbyHands)
new Thread(loader)
}
runWithTiming {
Expand All @@ -140,6 +180,9 @@ object Nankeen extends LoggingLoadTest {
dumpLogOutput(timingsFile)
}
}
}
//gh1.1 grabbyHands.close
//Grabby hands uses daemon threads
System.exit(1)
}

}

0 comments on commit 973a4c5

Please sign in to comment.