Permalink
Browse files

Merge branch 'release_2_2' into libkestrel

  • Loading branch information...
2 parents e9a1329 + df5ba17 commit 61ffd27c9a1dd7d9df240283e4a236344744e946 Robey Pointer committed Jan 12, 2012
View
@@ -230,3 +230,22 @@ A sample run:
You can see the journals being built and erased in the kestrel log. Like
"many-clients", this test is a load test instead of a speed test.
+## Leaky-reader
+
+This test starts a producer and several consumers, with the consumers
+occasionally "forgetting" to acknowledge an item that they've read. It
+verifies that the un-acknowledged items are eventually handed off to another
+consmer.
+
+A sample run:
+
+ $ ./dist/kestrel/scripts/load/leaky-reader -n 100000 -t 10
+ leaky-reader: 10 threads each sending 100000 items through spam
+ Flushing queues first.
+ 1000
+ 2000
+ 100000
+ Finished in 40220 msec (40.2 usec/put throughput).
+ Completed all reads
+
+Like "many-clients", it's just a load test.
@@ -0,0 +1,4 @@
+#!/bin/bash
+DIST_HOME="$(dirname $(readlink $0 || echo $0))/../.."
+TEST_JAR=$DIST_HOME/kestrel-tests-@VERSION@.jar
+java -server -classpath @DIST_CLASSPATH@:$TEST_JAR net.lag.kestrel.load.LeakyThriftReader "$@"
@@ -31,31 +31,29 @@ import com.twitter.conversions.string._
object JournalPacking extends LoadTesting {
private val DATA = "x" * 1024
- private val EXPECT = ByteBuffer.wrap("STORED\r\n".getBytes)
-
def put(socket: SocketChannel, queueName: String, n: Int, data: String, counter: Long) = {
- val buffer = ByteBuffer.allocate(EXPECT.limit)
+ val expect = client.putSuccess()
+ val buffer = ByteBuffer.allocate(expect.limit)
for (i <- 0 until n) {
val counterData = ((counter + i).toString + data).substring(0, data.length)
- val spam = ByteBuffer.wrap(("set " + queueName + " 0 0 " + data.length + "\r\n" + counterData + "\r\n").getBytes)
- send(socket, spam)
- if (receive(socket, buffer) != EXPECT) {
+ send(socket, client.put(queueName, counterData))
+ if (receive(socket, buffer) != expect) {
// the "!" is important.
throw new Exception("Unexpected response at " + i + "!")
}
}
}
def get(socket: SocketChannel, queueName: String, n: Int, data: String, counter: Long): Int = {
- val req = ByteBuffer.wrap(("get " + queueName + (if (useTransactions) "/t=1000/close/open" else "") + "\r\n").getBytes)
- val expectEnd = ByteBuffer.wrap("END\r\n".getBytes)
+ val req = client.get(queueName, Some(1000))
+ val expectEnd = client.getEmpty(queueName)
var count = 0
var misses = 0
while (count < n) {
val counterData = ((counter + count).toString + data).substring(0, data.length)
- val expectData = ByteBuffer.wrap(("VALUE " + queueName + " 0 " + data.length + "\r\n" + counterData + "\r\nEND\r\n").getBytes)
+ val expectData = client.getSuccess(queueName, counterData)
val expecting = new Expecting(expectEnd, expectData)
send(socket, req)
val got = expecting(socket)
@@ -66,9 +64,6 @@ object JournalPacking extends LoadTesting {
count += 1
}
}
- if (useTransactions) {
- send(socket, ByteBuffer.wrap(("get " + queueName + "/close\r\n").getBytes))
- }
misses
}
@@ -82,7 +77,7 @@ object JournalPacking extends LoadTesting {
producerThread = new Thread {
override def run() = {
val socket = SocketChannel.open(new InetSocketAddress(hostname, 22133))
- put(socket, qName, totalItems, data, writeCounter)
+ put(socket, queueName, totalItems, data, writeCounter)
}
}
@@ -95,7 +90,7 @@ object JournalPacking extends LoadTesting {
consumerThread = new Thread {
override def run() = {
val socket = SocketChannel.open(new InetSocketAddress(hostname, 22133))
- misses = get(socket, qName, totalItems, data, readCounter)
+ misses = get(socket, queueName, totalItems, data, readCounter)
}
}
consumerThread.start()
@@ -114,15 +109,18 @@ object JournalPacking extends LoadTesting {
}
}
- var qName = "spam"
+ var queueName = "spam"
var totalItems = 25000
var kilobytes = 1
var pause = 1
var cycles = 100
var readCounter: Long = 0
var writeCounter: Long = 0
- var useTransactions: Boolean = false
var hostname = "localhost"
+ var port = 22133
+ var client: Client = MemcacheClient
+ var flushFirst = true
+ var monitor = false
def usage() {
Console.println("usage: packing [options]")
@@ -131,7 +129,7 @@ object JournalPacking extends LoadTesting {
Console.println()
Console.println("options:")
Console.println(" -q NAME")
- Console.println(" use named queue (default: %s)".format(qName))
+ Console.println(" use named queue (default: %s)".format(queueName))
Console.println(" -n ITEMS")
Console.println(" put ITEMS items into the queue (default: %d)".format(totalItems))
Console.println(" -k KILOBYTES")
@@ -140,10 +138,16 @@ object JournalPacking extends LoadTesting {
Console.println(" pause SECONDS between cycles (default: %d)".format(pause))
Console.println(" -c CYCLES")
Console.println(" do read/writes CYCLES times (default: %d)".format(cycles))
- Console.println(" -x")
- Console.println(" use transactions when fetching")
Console.println(" -h HOSTNAME")
Console.println(" use kestrel on HOSTNAME (default: %s)".format(hostname))
+ Console.println(" -p PORT")
+ Console.println(" use kestrel on PORT (default: %d)".format(port))
+ Console.println(" --thrift")
+ Console.println(" use thrift RPC")
+ Console.println(" -F")
+ Console.println(" don't flush queue(s) before the test")
+ Console.println(" -M")
+ Console.println(" monitor queue stats during the test")
}
@tailrec
@@ -153,7 +157,7 @@ object JournalPacking extends LoadTesting {
usage()
System.exit(0)
case "-q" :: x :: xs =>
- qName = x
+ queueName = x
parseArgs(xs)
case "-n" :: x :: xs =>
totalItems = x.toInt
@@ -167,12 +171,22 @@ object JournalPacking extends LoadTesting {
case "-c" :: x :: xs =>
cycles = x.toInt
parseArgs(xs)
- case "-x" :: xs =>
- useTransactions = true
- parseArgs(xs)
case "-h" :: x :: xs =>
hostname = x
parseArgs(xs)
+ case "-p" :: x :: xs =>
+ port = x.toInt
+ parseArgs(xs)
+ case "--thrift" :: xs =>
+ client = ThriftClient
+ port = 2229
+ parseArgs(xs)
+ case "-F" :: xs =>
+ flushFirst = false
+ parseArgs(xs)
+ case "-M" :: xs =>
+ monitor = true
+ parseArgs(xs)
case _ =>
usage()
System.exit(1)
@@ -182,6 +196,18 @@ object JournalPacking extends LoadTesting {
parseArgs(args.toList)
println("packing: " + totalItems + " items of " + kilobytes + "kB with " + pause + " second pauses")
+
+ // flush queues first
+ if (flushFirst) {
+ println("Flushing queues first.")
+ val socket = tryHard { SocketChannel.open(new InetSocketAddress(hostname, port)) }
+ send(socket, client.flush(queueName))
+ expect(socket, client.flushSuccess())
+ socket.close()
+ }
+
+ if (monitor) monitorQueue(hostname, queueName)
+
cycle(false, true)
for (i <- 0 until cycles) {
println("cycle: " + (i + 1))
@@ -33,8 +33,8 @@ import org.apache.thrift.protocol.TBinaryProtocol
/**
* Have one producer generate data at a steady rate, while several clients
- * consume them with reliaable reads, occassionally failing to confirm an
- * item. Verify that all items are eventually read with confirmation.
+ * consume them with reliable reads, occassionally failing to confirm an
+ * item. Verify that all items are eventually read with confirmation.
*/
object LeakyThriftReader {
private val DATA_TEMPLATE = "%08x"
@@ -91,7 +91,7 @@ object LeakyThriftReader {
def usage() {
Console.println("usage: leaky-reader [options]")
- Console.println(" spin up a producer and consumer and deliver N items through kestrel, with occassional drops")
+ Console.println(" spin up a producer and consumer(s) and deliver N items through kestrel, with occassional drops")
Console.println()
Console.println("options:")
Console.println(" -n ITEMS")
@@ -101,7 +101,7 @@ object LeakyThriftReader {
Console.println(" -q NAME")
Console.println(" use queue NAME (default: %s)".format(queueName))
Console.println(" -t THREADS")
- Console.println(" create THREADS producers and THREADS consumers (default: %d)".format(threads))
+ Console.println(" create THREADS consumers (default: %d)".format(threads))
Console.println(" -h HOSTNAME")
Console.println(" use kestrel on HOSTNAME (default: %s)".format(hostname))
Console.println(" -p PORT")
@@ -159,15 +159,15 @@ object LeakyThriftReader {
val client = new Kestrel.FinagledClient(service, new TBinaryProtocol.Factory())
+ println("leaky-reader: %d threads each sending %d items through %s".format(
+ threads, totalItems, queueName))
+
// flush queues first
if (flushFirst) {
println("Flushing queues first.")
client.flushQueue(queueName)()
}
- println("leaky-reader: %d threads each sending %d items through %s".format(
- threads, totalItems, queueName))
-
val producerThread = new Thread {
override def run = {
var n = 0
@@ -222,5 +222,4 @@ object LeakyThriftReader {
println("Completed all reads")
}
}
-
}
@@ -92,7 +92,7 @@ object ManyClients extends LoadTesting {
while (got.get < count) {
if (socket eq null) {
- socket = SocketChannel.open(new InetSocketAddress(hostname, port))
+ socket = tryHard { SocketChannel.open(new InetSocketAddress(hostname, port)) }
}
req.rewind
while (req.position < req.limit) {
@@ -182,6 +182,8 @@ object ManyClients extends LoadTesting {
Console.println(" use CLIENTS consumers (default: %d)".format(clientCount))
Console.println(" -h HOSTNAME")
Console.println(" use kestrel on HOSTNAME (default: %s)".format(hostname))
+ Console.println(" -p PORT")
+ Console.println(" use kestrel on PORT (default: %d)".format(port))
Console.println(" -k PERCENT")
Console.println(" kill PERCENT %% of clients before they can read the response (default: %d)".format(dropPercent))
Console.println(" -x")
@@ -212,6 +214,9 @@ object ManyClients extends LoadTesting {
case "-h" :: x :: xs =>
hostname = x
parseArgs(xs)
+ case "-p" :: x :: xs =>
+ port = x.toInt
+ parseArgs(xs)
case "-k" :: x :: xs =>
killPercent = x.toInt
useTransactions = true

0 comments on commit 61ffd27

Please sign in to comment.