Permalink
Browse files

Merge branch 'master' into site

  • Loading branch information...
2 parents 6a5d083 + 9cf4d58 commit 4105f0becb145042921be22af42ef88eb0d95291 Robey Pointer committed Jan 13, 2012
View
14 ChangeLog
@@ -1,5 +1,18 @@
+
+2.1.5
+-----
+release: 12 January 2012
+
+- Only sync to disk when there's data to be written. [Stephan Zuercher]
+- Track latency measurements better, and track the percentage of java heap
+ reserved for in-memory queues. [Stephan Zuercher]
+- Fix the startup script to work in "dash". [Matt Parlane]
+- Send the correct reponse back for the memcached protocol for delete. [Matt
+ Erkkila]
+
2.1.4
-----
+release: 21 November 2011
- Separate timers for journal fsync operations from those used for request
timeouts and queue expiry
@@ -9,7 +22,6 @@
bugs)
- Creating a queue with an illegal name causes an error
-
2.1.3
-----
release: 13 October 2011
View
2 config/production.scala
@@ -21,7 +21,7 @@ new KestrelConfig {
default.defaultJournalSize = 16.megabytes
default.maxMemorySize = 128.megabytes
default.maxJournalSize = 1.gigabyte
- default.syncJournal = 20.milliseconds
+ default.syncJournal = 100.milliseconds
admin.httpPort = 2223
View
4 project/build.properties
@@ -1,9 +1,9 @@
#Project properties
-#Mon Nov 21 15:08:49 PST 2011
+#Thu Jan 12 16:30:20 PST 2012
project.organization=net.lag
project.name=kestrel
sbt.version=0.7.4
-project.version=2.1.5-SNAPSHOT
+project.version=2.1.6-SNAPSHOT
def.scala.version=2.7.7
build.scala.versions=2.8.1
project.initialize=false
View
19 project/build/KestrelProject.scala
@@ -58,7 +58,24 @@ class KestrelProject(info: ProjectInfo) extends StandardServiceProject(info) wit
lazy val packageLoadTests = packageLoadTestsAction
override def packageDistTask = packageLoadTestsAction && super.packageDistTask
-// override def fork = forkRun(List("-Xmx1024m", "-verbosegc", "-XX:+PrintGCDetails"))
+ // generate a distribution zip for release.
+ def releaseDistTask = task {
+ val releaseDistPath = "dist-release" / distName ##
+
+ releaseDistPath.asFile.mkdirs()
+ (releaseDistPath / "libs").asFile.mkdirs()
+ (releaseDistPath / "config").asFile.mkdirs()
+
+ FileUtilities.copyFlat(List(jarPath), releaseDistPath, log).left.toOption orElse
+ FileUtilities.copyFlat(List(outputPath / loadTestJarFilename), releaseDistPath, log).left.toOption orElse
+ FileUtilities.copyFlat(dependentJars.get, releaseDistPath / "libs", log).left.toOption orElse
+ FileUtilities.copy(((configPath ***) --- (configPath ** "*.class")).get, releaseDistPath / "config", log).left.toOption orElse
+ FileUtilities.copy((scriptsOutputPath ***).get, releaseDistPath, log).left.toOption orElse
+ FileUtilities.zip((("dist-release" ##) / distName).get, "dist-release" / (distName + ".zip"), true, log)
+ }
+ val ReleaseDistDescription = "Creates a deployable zip file with dependencies, config, and scripts."
+ lazy val releaseDist = releaseDistTask.dependsOn(`package`, makePom, copyScripts).describedAs(ReleaseDistDescription)
+
lazy val putMany = task { args =>
runTask(Some("net.lag.kestrel.load.PutMany"), testClasspath, args).dependsOn(testCompile)
View
6 project/release.properties
@@ -1,4 +1,4 @@
#Automatically generated by ReleaseManagement
-#Mon Nov 21 15:08:49 PST 2011
-version=2.1.4
-sha1=10d84a9dee3c23813c8fd76b55ce95d958154423
+#Thu Jan 12 16:30:20 PST 2012
+version=2.1.5
+sha1=459ec824963e31e0da78dbc425be0c7084a3e298
View
8 src/main/scala/net/lag/kestrel/Journal.scala
@@ -20,14 +20,14 @@ package net.lag.kestrel
import java.io._
import java.nio.{ByteBuffer, ByteOrder}
import java.nio.channels.FileChannel
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.{LinkedBlockingQueue, ScheduledExecutorService}
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import com.twitter.conversions.storage._
import com.twitter.conversions.time._
import com.twitter.logging.Logger
import com.twitter.ostrich.admin.BackgroundProcess
-import com.twitter.util.{Future, Duration, Timer, Time}
+import com.twitter.util.{Future, Duration, Time}
case class BrokenItemException(lastValidPosition: Long, cause: Throwable) extends IOException(cause)
@@ -51,7 +51,7 @@ object JournalItem {
/**
* Codes for working with the journal file for a PersistentQueue.
*/
-class Journal(queuePath: File, queueName: String, syncTimer: Timer, syncJournal: Duration) {
+class Journal(queuePath: File, queueName: String, syncScheduler: ScheduledExecutorService, syncJournal: Duration) {
import Journal._
private val log = Logger.get(getClass)
@@ -97,7 +97,7 @@ class Journal(queuePath: File, queueName: String, syncTimer: Timer, syncJournal:
def this(fullPath: String) = this(fullPath, Duration.MaxValue)
private def open(file: File) {
- writer = new PeriodicSyncFile(file, syncTimer, syncJournal)
+ writer = new PeriodicSyncFile(file, syncScheduler, syncJournal)
}
def open() {
View
26 src/main/scala/net/lag/kestrel/Kestrel.scala
@@ -18,9 +18,10 @@
package net.lag.kestrel
import java.net.InetSocketAddress
-import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
+import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.{immutable, mutable}
+import com.twitter.concurrent.NamedPoolThreadFactory
import com.twitter.conversions.time._
import com.twitter.logging.Logger
import com.twitter.naggati.codec.MemcacheCodec
@@ -72,7 +73,7 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],
var queueCollection: QueueCollection = null
var timer: Timer = null
- var journalSyncTimer: Timer = null
+ var journalSyncScheduler: ScheduledExecutorService = null
var executor: ExecutorService = null
var channelFactory: ChannelFactory = null
var memcacheAcceptor: Option[Channel] = None
@@ -93,15 +94,25 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],
listenAddress, memcacheListenPort, textListenPort, queuePath, protocol,
expirationTimerFrequency, clientTimeout, maxOpenTransactions)
- // this means no timeout will be at better granularity than N ms.
- journalSyncTimer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
+ // this means no timeout will be at better granularity than 100 ms.
timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS)
- queueCollection = new QueueCollection(queuePath, new NettyTimer(timer), new NettyTimer(journalSyncTimer), defaultQueueConfig, builders)
+ journalSyncScheduler =
+ new ScheduledThreadPoolExecutor(
+ Runtime.getRuntime.availableProcessors,
+ new NamedPoolThreadFactory("journal-sync", true),
+ new RejectedExecutionHandler {
+ override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor) {
+ log.warning("Rejected journal fsync")
+ }
+ })
+
+ queueCollection = new QueueCollection(queuePath, new NettyTimer(timer), journalSyncScheduler, defaultQueueConfig, builders)
queueCollection.loadQueues()
Stats.addGauge("items") { queueCollection.currentItems.toDouble }
Stats.addGauge("bytes") { queueCollection.currentBytes.toDouble }
+ Stats.addGauge("reserved_memory_ratio") { queueCollection.reservedMemoryRatio }
// netty setup:
executor = Executors.newCachedThreadPool()
@@ -161,8 +172,9 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],
executor.awaitTermination(5, TimeUnit.SECONDS)
timer.stop()
timer = null
- journalSyncTimer.stop()
- journalSyncTimer = null
+ journalSyncScheduler.shutdown()
+ journalSyncScheduler.awaitTermination(5, TimeUnit.SECONDS)
+ journalSyncScheduler = null
log.info("Goodbye.")
}
View
15 src/main/scala/net/lag/kestrel/KestrelHandler.scala
@@ -194,21 +194,6 @@ abstract class KestrelHandler(val queues: QueueCollection, val maxOpenTransactio
waitingFor = Some(future)
future.map { itemOption =>
waitingFor = None
- timeout match {
- case None => {
- val usec = (Time.now - startTime).inMicroseconds.toInt
- val statName = if (itemOption.isDefined) "get_hit_latency_usec" else "get_miss_latency_usec"
- Stats.addMetric(statName, usec)
- Stats.addMetric("q/" + key + "/" + statName, usec)
- }
- case Some(_) => {
- if (!itemOption.isDefined) {
- val msec = (Time.now - startTime).inMilliseconds.toInt
- Stats.addMetric("get_timeout_msec", msec)
- Stats.addMetric("q/" + key + "/get_timeout_msec", msec)
- }
- }
- }
itemOption.foreach { item =>
log.debug("get <- %s", item)
if (opening) pendingTransactions.add(key, item.xid)
View
3 src/main/scala/net/lag/kestrel/MemcacheHandler.scala
@@ -85,7 +85,7 @@ extends NettyHandler[MemcacheRequest](channelGroup, queueCollection, maxOpenTran
dumpStats(request.line.drop(1))
case "delete" =>
delete(request.line(1))
- channel.write(new MemcacheResponse("END"))
+ channel.write(new MemcacheResponse("DELETED"))
case "flush_expired" =>
channel.write(new MemcacheResponse(flushExpired(request.line(1)).toString))
case "flush_all_expired" =>
@@ -189,6 +189,7 @@ extends NettyHandler[MemcacheRequest](channelGroup, queueCollection, maxOpenTran
report += (("curr_items", queues.currentItems.toString))
report += (("total_items", Stats.getCounter("total_items")().toString))
report += (("bytes", queues.currentBytes.toString))
+ report += (("reserved_memory_ratio", "%.3f".format(queues.reservedMemoryRatio)))
report += (("curr_connections", Kestrel.sessions.get().toString))
report += (("total_connections", Stats.getCounter("total_connections")().toString))
report += (("cmd_get", Stats.getCounter("cmd_get")().toString))
View
65 src/main/scala/net/lag/kestrel/PeriodicSyncFile.scala
@@ -1,47 +1,86 @@
package net.lag.kestrel
-import java.nio.ByteBuffer
-import java.util.concurrent.ConcurrentLinkedQueue
import com.twitter.conversions.time._
+import com.twitter.ostrich.stats.Stats
import com.twitter.util._
import java.io.{IOException, FileOutputStream, File}
+import java.nio.ByteBuffer
+import java.util.concurrent.{ConcurrentLinkedQueue, ScheduledExecutorService, ScheduledFuture, TimeUnit}
+
+abstract class PeriodicSyncTask(val scheduler: ScheduledExecutorService, initialDelay: Duration, period: Duration)
+extends Runnable {
+ @volatile private[this] var scheduledFsync: Option[ScheduledFuture[_]] = None
+
+ def start() {
+ synchronized {
+ if (scheduledFsync.isEmpty && period > 0.seconds) {
+ val handle = scheduler.scheduleWithFixedDelay(this, initialDelay.inMilliseconds, period.inMilliseconds,
+ TimeUnit.MILLISECONDS)
+ scheduledFsync = Some(handle)
+ }
+ }
+ }
+
+ def stop() {
+ synchronized { _stop() }
+ }
+
+ def stopIf(f: => Boolean) {
+ synchronized {
+ if (f) _stop()
+ }
+ }
+
+ private[this] def _stop() {
+ scheduledFsync.foreach { _.cancel(false) }
+ scheduledFsync = None
+ }
+}
/**
* Open a file for writing, and fsync it on a schedule. The period may be 0 to force an fsync
* after every write, or `Duration.MaxValue` to never fsync.
*/
-class PeriodicSyncFile(file: File, timer: Timer, period: Duration) {
+class PeriodicSyncFile(file: File, scheduler: ScheduledExecutorService, period: Duration) {
// pre-completed future for writers who are behaving synchronously.
private final val DONE = Future(())
- val writer = new FileOutputStream(file, true).getChannel
- val promises = new ConcurrentLinkedQueue[Promise[Unit]]()
-
- @volatile var closed = false
+ case class TimestampedPromise(val promise: Promise[Unit], val time: Time)
- if (period > 0.seconds && period < Duration.MaxValue) {
- timer.schedule(Time.now, period) {
+ val writer = new FileOutputStream(file, true).getChannel
+ val promises = new ConcurrentLinkedQueue[TimestampedPromise]()
+ val periodicSyncTask = new PeriodicSyncTask(scheduler, period, period) {
+ override def run() {
if (!closed && !promises.isEmpty) fsync()
}
}
+ @volatile var closed = false
+
private def fsync() {
synchronized {
// race: we could underestimate the number of completed writes. that's okay.
val completed = promises.size
+ val fsyncStart = Time.now
try {
writer.force(false)
} catch {
case e: IOException =>
for (i <- 0 until completed) {
- promises.poll().setException(e)
+ promises.poll().promise.setException(e)
}
return;
}
for (i <- 0 until completed) {
- promises.poll().setValue(())
+ val timestampedPromise = promises.poll()
+ timestampedPromise.promise.setValue(())
+ val delaySinceWrite = fsyncStart - timestampedPromise.time
+ val durationBehind = if (delaySinceWrite > period) delaySinceWrite - period else 0.seconds
+ Stats.addMetric("fsync_delay_usec", durationBehind.inMicroseconds.toInt)
}
+
+ periodicSyncTask.stopIf { promises.isEmpty }
}
}
@@ -62,7 +101,8 @@ class PeriodicSyncFile(file: File, timer: Timer, period: Duration) {
DONE
} else {
val promise = new Promise[Unit]()
- promises.add(promise)
+ promises.add(TimestampedPromise(promise, Time.now))
+ periodicSyncTask.start()
promise
}
}
@@ -73,6 +113,7 @@ class PeriodicSyncFile(file: File, timer: Timer, period: Duration) {
*/
def close() {
closed = true
+ periodicSyncTask.stop()
fsync()
writer.close()
}
View
49 src/main/scala/net/lag/kestrel/PersistentQueue.scala
@@ -20,7 +20,7 @@ package net.lag.kestrel
import java.io._
import java.nio.{ByteBuffer, ByteOrder}
import java.nio.channels.FileChannel
-import java.util.concurrent.{CountDownLatch, Executor}
+import java.util.concurrent.{CountDownLatch, Executor, ScheduledExecutorService}
import scala.collection.mutable
import com.twitter.conversions.storage._
import com.twitter.conversions.time._
@@ -30,10 +30,10 @@ import com.twitter.util._
import config._
class PersistentQueue(val name: String, persistencePath: String, @volatile var config: QueueConfig,
- timer: Timer, journalSyncTimer: Timer,
+ timer: Timer, journalSyncScheduler: ScheduledExecutorService,
queueLookup: Option[(String => Option[PersistentQueue])]) {
- def this(name: String, persistencePath: String, config: QueueConfig, timer: Timer, journalSyncTimer: Timer) =
- this(name, persistencePath, config, timer, journalSyncTimer, None)
+ def this(name: String, persistencePath: String, config: QueueConfig, timer: Timer, journalSyncScheduler: ScheduledExecutorService) =
+ this(name, persistencePath, config, timer, journalSyncScheduler, None)
private val log = Logger.get(getClass.getName)
@@ -70,7 +70,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
private var paused = false
private var journal =
- new Journal(new File(persistencePath).getCanonicalFile, name, journalSyncTimer, config.syncJournal)
+ new Journal(new File(persistencePath).getCanonicalFile, name, journalSyncScheduler, config.syncJournal)
private val waiters = new DeadlineWaitQueue(timer)
@@ -82,6 +82,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
def length: Long = synchronized { queueLength }
def bytes: Long = synchronized { queueSize }
+ def maxMemoryBytes: Long = synchronized { config.maxMemorySize.inBytes }
def journalSize: Long = synchronized { journal.size }
def journalTotalSize: Long = journal.archivedSize + journalSize
def currentAge: Duration = synchronized { if (queueSize == 0) 0.milliseconds else _currentAge }
@@ -231,7 +232,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
* head of the queue)
*/
def remove(transaction: Boolean): Option[QItem] = {
- synchronized {
+ val removedItem = synchronized {
if (closed || paused || queueLength == 0) {
None
} else {
@@ -240,22 +241,38 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
if (transaction) journal.removeTentative(item.get.xid) else journal.remove()
checkRotateJournal()
}
+
item
}
}
+
+ removedItem.foreach { qItem =>
+ val usec = (Time.now - qItem.addTime).inMilliseconds.toInt max 0
+ Stats.addMetric("delivery_latency_msec", usec)
+ Stats.addMetric("q/" + name + "/delivery_latency_msec", usec)
+ }
+ removedItem
}
/**
* Remove and return an item from the queue, if there is one.
*/
def remove(): Option[QItem] = remove(false)
- private def waitOperation(op: => Option[QItem], deadline: Option[Time], future: Promise[Option[QItem]]) {
+ private def waitOperation(op: => Option[QItem], startTime: Time, deadline: Option[Time],
+ future: Promise[Option[QItem]]) {
val item = op
if (synchronized {
if (!item.isDefined && !closed && !paused && deadline.isDefined && deadline.get > Time.now) {
// if we get woken up, try again with the same deadline.
- val w = waiters.add(deadline.get, { () => waitOperation(op, deadline, future) }, { () => future.setValue(None) })
+ def onTrigger() = waitOperation(op, startTime, deadline, future)
+ def onTimeout() {
+ val msec = (Time.now - startTime).inMilliseconds.toInt
+ Stats.addMetric("get_timeout_msec", msec)
+ Stats.addMetric("q/" + name + "/get_timeout_msec", msec)
+ future.setValue(None)
+ }
+ val w = waiters.add(deadline.get, onTrigger, onTimeout)
// FIXME: use onCancellation when util-core is bumped.
future.linkTo(new CancellableSink({ waiters.remove(w) }))
false
@@ -266,20 +283,22 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
}
final def waitRemove(deadline: Option[Time], transaction: Boolean): Future[Option[QItem]] = {
+ val startTime = Time.now
val promise = new Promise[Option[QItem]]()
- waitOperation(remove(transaction), deadline, promise)
- // if an item was handed off immediately, track latency from the "put" to "get".
- if (promise.isDefined && promise().isDefined) {
- val usec = (Time.now - promise().get.addTime).inMicroseconds.toInt max 0
- Stats.addMetric("get_hit_latency_usec", usec)
- Stats.addMetric("q/" + name + "/get_hit_latency_usec", usec)
+ waitOperation(remove(transaction), startTime, deadline, promise)
+ // if an item was handed off immediately, track latency of the "get" operation
+ if (promise.isDefined) {
+ val statName = if (promise().isDefined) "get_hit_latency_usec" else "get_miss_latency_usec"
+ val usec = (Time.now - startTime).inMicroseconds.toInt max 0
+ Stats.addMetric(statName, usec)
+ Stats.addMetric("q/" + name + "/" + statName, usec)
}
promise
}
final def waitPeek(deadline: Option[Time]): Future[Option[QItem]] = {
val promise = new Promise[Option[QItem]]()
- waitOperation(peek(), deadline, promise)
+ waitOperation(peek(), Time.now, deadline, promise)
promise
}
View
11 src/main/scala/net/lag/kestrel/QueueCollection.scala
@@ -18,7 +18,7 @@
package net.lag.kestrel
import java.io.File
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{CountDownLatch, ScheduledExecutorService}
import scala.collection.mutable
import com.twitter.conversions.time._
import com.twitter.logging.Logger
@@ -28,7 +28,7 @@ import config._
class InaccessibleQueuePath extends Exception("Inaccessible queue path: Must be a directory and writable")
-class QueueCollection(queueFolder: String, timer: Timer, journalSyncTimer: Timer,
+class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: ScheduledExecutorService,
@volatile private var defaultQueueConfig: QueueConfig,
@volatile var queueBuilders: List[QueueBuilder]) {
private val log = Logger.get(getClass.getName)
@@ -54,7 +54,7 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncTimer: Timer
}
val config = queueConfigMap.getOrElse(name, defaultQueueConfig)
log.info("Setting up queue %s: %s", realName, config)
- new PersistentQueue(realName, path, config, timer, journalSyncTimer, Some(this.apply))
+ new PersistentQueue(realName, path, config, timer, journalSyncScheduler, Some(this.apply))
}
// preload any queues
@@ -68,6 +68,11 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncTimer: Timer
def currentItems = queues.values.foldLeft(0L) { _ + _.length }
def currentBytes = queues.values.foldLeft(0L) { _ + _.bytes }
+ def reservedMemoryRatio = {
+ val maxBytes = queues.values.foldLeft(0L) { _ + _.maxMemoryBytes }
+ maxBytes.toDouble / systemMaxHeapBytes.toDouble
+ }
+ lazy val systemMaxHeapBytes = Runtime.getRuntime.maxMemory
def reload(newDefaultQueueConfig: QueueConfig, newQueueBuilders: List[QueueBuilder]) {
defaultQueueConfig = newDefaultQueueConfig
View
42 src/scripts/kcluster 100644 → 100755
@@ -18,20 +18,28 @@ def fetch_stats(host, port, data)
while !done && line = sock.gets.chomp
if (line == 'END') then
done = true
- elsif line =~ /STAT queue_(\w+) (\d+)/
+ elsif line =~ /STAT queue_([\w+]+) (\d+)/
key = $1
value = $2.to_i
- if key =~ /(\w+)_total_items/
- data[:total_items][$1] += value
- elsif key =~ /(\w+)_items/
- data[:items][$1] += value
- elsif key =~ /(\w+)_mem_bytes/
- data[:mem_bytes][$1] += value
- elsif key =~ /(\w+)_bytes/
- data[:bytes][$1] += value
- elsif key =~ /(\w+)_age/
- data[:min_age][$1] = value if value < data[:min_age][$1]
- data[:max_age][$1] = value if value > data[:max_age][$1]
+ (stat, queue_name) = case key
+ when /([\w+]+)_total_items/ then [:total_items, $1]
+ when /([\w+]+)_expired_items/ then [:expired_items, $1]
+ when /([\w+]+)_mem_items/ then [:mem_items, $1]
+ when /([\w+]+)_items/ then [:items, $1]
+ when /([\w+]+)_mem_bytes/ then [:mem_bytes, $1]
+ when /([\w+]+)_bytes/ then [:bytes, $1]
+ when /([\w+]+)_age/ then [:age, $1]
+ end
+
+ if (queue_name)
+ queue_name = queue_name.split('+', 2).first if $options[:rollup_fanouts]
+
+ if (stat == :age)
+ data[:min_age][queue_name] = value if value < data[:min_age][queue_name]
+ data[:max_age][queue_name] = value if value > data[:max_age][queue_name]
+ else
+ data[stat][queue_name] += value
+ end
end
end
end
@@ -65,7 +73,8 @@ def report(data, key)
format = "%14s %s\n"
printf(format, key, "queue")
printf(format, "============", "====================")
- data[key].each { |queue, value| printf("%14d %s\n", value, queue) }
+ stats = data[key] || {}
+ stats.each { |queue, value| printf("%14d %s\n", value, queue) }
end
def report_all(data, keys)
@@ -131,8 +140,8 @@ def find_stale(rounds)
printf("%11s %11s %s\n", "total_items", "items", "queue")
printf("%11s %11s %s\n", "-----------", "-----------", "--------------------")
stale.each do |queue_name|
- items = sorted[:items][stale]
- total_items = sorted[:total_items][stale]
+ items = last[:items][queue_name]
+ total_items = last[:total_items][queue_name]
printf("%11d %11d %s\n", total_items, items, queue_name)
end
end
@@ -150,6 +159,9 @@ parser = OptionParser.new do |opts|
opts.on("-p", "--port=N", "use port (default: #{$options[:port]})") do |port|
$options[:port] = port.to_i
end
+ opts.on("-r", "--rollup-fanouts", "roll up stats for fanout queues into a single count") do
+ $options[:rollup_fanouts] = true
+ end
opts.separator ""
opts.separator "Commands:"
View
4 src/scripts/kestrel.sh
@@ -34,11 +34,11 @@ daemon_args="--name $APP_NAME --pidfile $daemon_pidfile --core --chdir /"
daemon_start_args="--stdout=/var/log/$APP_NAME/stdout --stderr=/var/log/$APP_NAME/error"
-function running() {
+running() {
$DAEMON $daemon_args --running
}
-function find_java() {
+find_java() {
if [ ! -z "$JAVA_HOME" ]; then
return
fi
View
16 src/test/scala/net/lag/kestrel/KestrelHandlerSpec.scala
@@ -18,6 +18,7 @@
package net.lag.kestrel
import java.io.{File, FileInputStream}
+import java.util.concurrent.ScheduledThreadPoolExecutor
import scala.util.Sorting
import com.twitter.conversions.time._
import com.twitter.ostrich.stats.Stats
@@ -44,14 +45,15 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
"KestrelHandler" should {
var queues: QueueCollection = null
val timer = new FakeTimer()
+ val scheduler = new ScheduledThreadPoolExecutor(1)
doAfter {
queues.shutdown()
}
"set and get" in {
withTempFolder {
- queues = new QueueCollection(folderName, timer, timer, config, Nil)
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
val handler = new FakeKestrelHandler(queues, 10)
handler.setItem("test", 0, None, "one".getBytes)
handler.setItem("test", 0, None, "two".getBytes)
@@ -63,7 +65,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
"track stats" in {
withTempFolder {
Stats.clearAll()
- queues = new QueueCollection(folderName, timer, timer, config, Nil)
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
val handler = new FakeKestrelHandler(queues, 10)
Stats.getCounter("cmd_get")() mustEqual 0
@@ -91,7 +93,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
"abort and confirm a transaction" in {
withTempFolder {
- queues = new QueueCollection(folderName, timer, timer, config, Nil)
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
val handler = new FakeKestrelHandler(queues, 10)
handler.setItem("test", 0, None, "one".getBytes)
handler.getItem("test", None, true, false)() must beString("one")
@@ -106,7 +108,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
"open several transactions" in {
"on one queue" in {
withTempFolder {
- queues = new QueueCollection(folderName, timer, timer, config, Nil)
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
val handler = new FakeKestrelHandler(queues, 10)
handler.setItem("test", 0, None, "one".getBytes)
handler.setItem("test", 0, None, "two".getBytes)
@@ -124,7 +126,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
"on several queues" in {
withTempFolder {
- queues = new QueueCollection(folderName, timer, timer, config, Nil)
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
val handler = new FakeKestrelHandler(queues, 10)
handler.setItem("red", 0, None, "red1".getBytes)
handler.setItem("red", 0, None, "red2".getBytes)
@@ -153,7 +155,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
"but not if transactions are limited" in {
withTempFolder {
- queues = new QueueCollection(folderName, timer, timer, config, Nil)
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
val handler = new FakeKestrelHandler(queues, 1)
handler.setItem("red", 0, None, "red1".getBytes)
handler.setItem("red", 0, None, "red2".getBytes)
@@ -164,7 +166,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
"close all transactions" in {
withTempFolder {
- queues = new QueueCollection(folderName, timer, timer, config, Nil)
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
val handler = new FakeKestrelHandler(queues, 2)
handler.setItem("red", 0, None, "red1".getBytes)
handler.setItem("red", 0, None, "red2".getBytes)
View
87 src/test/scala/net/lag/kestrel/PeriodicSyncFileSpec.scala
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2011 Twitter, Inc.
+ * Copyright 2011 Robey Pointer <robeypointer@gmail.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.lag.kestrel
+
+import com.twitter.conversions.time._
+import com.twitter.util.Duration
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
+import org.specs.Specification
+import org.specs.matcher.Matcher
+
+class PeriodicSyncFileSpec extends Specification
+ with TestLogging
+ with QueueMatchers
+{
+ "PeriodicSyncTask" should {
+ val scheduler = new ScheduledThreadPoolExecutor(4)
+ val invocations = new AtomicInteger(0)
+ val syncTask = new PeriodicSyncTask(scheduler, 0.milliseconds, 20.milliseconds) {
+ override def run() {
+ invocations.incrementAndGet
+ }
+ }
+
+ doAfter {
+ scheduler.shutdown()
+ scheduler.awaitTermination(5, TimeUnit.SECONDS)
+ }
+
+ "only start once" in {
+ val (_, duration) = Duration.inMilliseconds {
+ syncTask.start()
+ syncTask.start()
+ Thread.sleep(100)
+ syncTask.stop()
+ }
+
+ val expectedInvocations = duration.inMilliseconds / 20
+ (invocations.get <= expectedInvocations * 3 / 2) mustBe true
+ }
+
+ "stop" in {
+ syncTask.start()
+ Thread.sleep(100)
+ syncTask.stop()
+ val invocationsPostTermination = invocations.get
+ Thread.sleep(100)
+ invocations.get mustEqual invocationsPostTermination
+ }
+
+ "stop given a condition" in {
+ syncTask.start()
+ Thread.sleep(100)
+
+ val invocationsPreStop = invocations.get
+ syncTask.stopIf { false }
+ Thread.sleep(100)
+
+ val invocationsPostIgnoredStop = invocations.get
+ syncTask.stopIf { true }
+ Thread.sleep(100)
+
+ val invocationsPostStop = invocations.get
+ Thread.sleep(100)
+
+ (invocationsPreStop > 0) mustBe true // did something
+ (invocationsPostIgnoredStop > invocationsPreStop) mustBe true // kept going
+ (invocationsPostStop >= invocationsPostIgnoredStop) mustBe true // maybe did more
+ invocations.get mustEqual invocationsPostStop // stopped
+ }
+ }
+}
View
76 src/test/scala/net/lag/kestrel/PersistentQueueSpec.scala
@@ -18,7 +18,7 @@
package net.lag.kestrel
import java.io.{File, FileInputStream}
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{CountDownLatch, ScheduledThreadPoolExecutor}
import scala.collection.mutable
import com.twitter.conversions.storage._
import com.twitter.conversions.time._
@@ -35,14 +35,15 @@ class PersistentQueueSpec extends Specification
{
"PersistentQueue" should {
val timer = new FakeTimer()
+ val scheduler = new ScheduledThreadPoolExecutor(1)
doBefore {
timer.timerTask.cancelled = false
}
"add and remove one item" in {
withTempFolder {
- val q = new PersistentQueue("work", folderName, new QueueBuilder().apply(), timer, timer)
+ val q = new PersistentQueue("work", folderName, new QueueBuilder().apply(), timer, scheduler)
q.setup
q.length mustEqual 0
@@ -75,7 +76,7 @@ class PersistentQueueSpec extends Specification
val config = new QueueBuilder {
maxItemSize = 128.bytes
}.apply()
- val q = new PersistentQueue("work", folderName, config, timer, timer)
+ val q = new PersistentQueue("work", folderName, config, timer, scheduler)
q.setup()
q.length mustEqual 0
q.add(new Array[Byte](127)) mustEqual true
@@ -87,7 +88,7 @@ class PersistentQueueSpec extends Specification
"flush all items" in {
withTempFolder {
- val q = new PersistentQueue("work", folderName, new QueueBuilder().apply(), timer, timer)
+ val q = new PersistentQueue("work", folderName, new QueueBuilder().apply(), timer, scheduler)
q.setup()
q.length mustEqual 0
@@ -115,7 +116,7 @@ class PersistentQueueSpec extends Specification
val config = new QueueBuilder {
defaultJournalSize = 64.bytes
}.apply()
- val q = new PersistentQueue("rolling", folderName, config, timer, timer)
+ val q = new PersistentQueue("rolling", folderName, config, timer, scheduler)
q.setup()
q.add(new Array[Byte](32))
@@ -145,7 +146,7 @@ class PersistentQueueSpec extends Specification
val config = new QueueBuilder {
defaultJournalSize = 64.bytes
}.apply()
- val q = new PersistentQueue("rolling", folderName, config, timer, timer)
+ val q = new PersistentQueue("rolling", folderName, config, timer, scheduler)
q.setup()
q.add(new Array[Byte](32))
@@ -173,23 +174,23 @@ class PersistentQueueSpec extends Specification
"recover the journal after a restart" in {
withTempFolder {
- val q = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, timer)
+ val q = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, scheduler)
q.setup
q.add("first".getBytes)
q.add("second".getBytes)
new String(q.remove.get.data) mustEqual "first"
q.journalSize mustEqual 5 + 6 + 16 + 16 + 5 + 5 + 1
q.close
- val q2 = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, timer)
+ val q2 = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, scheduler)
q2.setup
q2.journalSize mustEqual 5 + 6 + 16 + 16 + 5 + 5 + 1
new String(q2.remove.get.data) mustEqual "second"
q2.journalSize mustEqual 5 + 6 + 16 + 16 + 5 + 5 + 1 + 1
q2.length mustEqual 0
q2.close
- val q3 = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, timer)
+ val q3 = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, scheduler)
q3.setup
q3.journalSize mustEqual 5 + 6 + 16 + 16 + 5 + 5 + 1 + 1
q3.length mustEqual 0
@@ -198,7 +199,7 @@ class PersistentQueueSpec extends Specification
"recover a journal with a rewritten transaction" in {
withTempFolder {
- val q = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, timer)
+ val q = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, scheduler)
q.setup()
q.add("zero".getBytes)
q.add("first".getBytes)
@@ -215,7 +216,7 @@ class PersistentQueueSpec extends Specification
q.confirmRemove(item.xid)
q.close()
- val q2 = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, timer)
+ val q2 = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, scheduler)
q2.setup()
new String(q2.remove().get.data) mustEqual "second"
q2.close()
@@ -229,7 +230,7 @@ class PersistentQueueSpec extends Specification
val config = new QueueBuilder {
maxAge = 3.seconds
}.apply()
- val q = new PersistentQueue("weather_updates", folderName, config, timer, timer)
+ val q = new PersistentQueue("weather_updates", folderName, config, timer, scheduler)
q.setup()
q.add("sunny".getBytes) mustEqual true
q.length mustEqual 1
@@ -254,13 +255,13 @@ class PersistentQueueSpec extends Specification
val config1 = new QueueBuilder {
maxMemorySize = 123.bytes
}.apply()
- val q1 = new PersistentQueue("test1", folderName, config1, timer, timer)
+ val q1 = new PersistentQueue("test1", folderName, config1, timer, scheduler)
q1.config.maxJournalSize mustEqual new QueueBuilder().maxJournalSize
q1.config.maxMemorySize mustEqual 123.bytes
val config2 = new QueueBuilder {
maxJournalSize = 123.bytes
}.apply()
- val q2 = new PersistentQueue("test1", folderName, config2, timer, timer)
+ val q2 = new PersistentQueue("test1", folderName, config2, timer, scheduler)
q2.config.maxJournalSize mustEqual 123.bytes
q2.config.maxMemorySize mustEqual new QueueBuilder().maxMemorySize
}
@@ -272,7 +273,7 @@ class PersistentQueueSpec extends Specification
val config1 = new QueueBuilder {
maxMemorySize = 1.kilobyte
}.apply()
- val q = new PersistentQueue("things", folderName, config1, timer, new FakeTimer)
+ val q = new PersistentQueue("things", folderName, config1, timer, scheduler)
q.setup
var rv: Option[String] = None
@@ -294,7 +295,7 @@ class PersistentQueueSpec extends Specification
val config1 = new QueueBuilder {
maxMemorySize = 1.kilobyte
}.apply()
- val q = new PersistentQueue("things", folderName, config1, timer, new FakeTimer)
+ val q = new PersistentQueue("things", folderName, config1, timer, scheduler)
q.setup
var rv: Option[String] = Some("foo")
@@ -316,7 +317,7 @@ class PersistentQueueSpec extends Specification
"when an item arrives" in {
withTempFolder {
- val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, new FakeTimer)
+ val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, scheduler)
q.setup()
var rv: Option[String] = None
@@ -336,7 +337,7 @@ class PersistentQueueSpec extends Specification
"when the connection dies" in {
withTempFolder {
- val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, new FakeTimer)
+ val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, scheduler)
q.setup()
var rv: Option[String] = None
@@ -360,7 +361,7 @@ class PersistentQueueSpec extends Specification
val config = new QueueBuilder {
maxMemorySize = 1.kilobyte
}.apply()
- val q = new PersistentQueue("things", folderName, config, timer, timer)
+ val q = new PersistentQueue("things", folderName, config, timer, scheduler)
q.setup
q.add("house".getBytes)
@@ -397,7 +398,7 @@ class PersistentQueueSpec extends Specification
"add(5:0:house), add(3:0:cat), remove-tentative(1), remove-tentative(2), unremove(1), confirm-remove(2), remove"
// and journal is replayed correctly.
- val q2 = new PersistentQueue("things", folderName, config, timer, timer)
+ val q2 = new PersistentQueue("things", folderName, config, timer, scheduler)
q2.setup
q2.length mustEqual 0
q2.bytes mustEqual 0
@@ -406,7 +407,7 @@ class PersistentQueueSpec extends Specification
"recover a journal with open transactions" in {
withTempFolder {
- val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, timer)
+ val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, scheduler)
q.setup
q.add("one".getBytes)
q.add("two".getBytes)
@@ -428,7 +429,7 @@ class PersistentQueueSpec extends Specification
q.confirmRemove(item4.get.xid)
q.close
- val q2 = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, timer)
+ val q2 = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, scheduler)
q2.setup
q2.length mustEqual 3
q2.openTransactionCount mustEqual 0
@@ -441,7 +442,7 @@ class PersistentQueueSpec extends Specification
"continue a queue item" in {
withTempFolder {
- val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, timer)
+ val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, scheduler)
q.setup
q.add("one".getBytes)
@@ -451,7 +452,7 @@ class PersistentQueueSpec extends Specification
q.continue(item1.get.xid, "two".getBytes)
q.close
- val q2 = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, timer)
+ val q2 = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, scheduler)
q2.setup
q2.length mustEqual 1
q2.openTransactionCount mustEqual 0
@@ -465,7 +466,7 @@ class PersistentQueueSpec extends Specification
val config = new QueueBuilder {
maxJournalSize = 3.kilobytes
}.apply()
- val q = new PersistentQueue("things", folderName, config, timer, timer)
+ val q = new PersistentQueue("things", folderName, config, timer, scheduler)
q.setup
q.add(new Array[Byte](512))
// can't roll the journal normally, cuz there's always one item left.
@@ -494,7 +495,7 @@ class PersistentQueueSpec extends Specification
maxMemorySize = 1.kilobyte
maxJournalSize = 3.kilobytes
}.apply()
- val q = new PersistentQueue("things", folderName, config, timer, timer)
+ val q = new PersistentQueue("things", folderName, config, timer, scheduler)
q.setup
for (i <- 0 until 8) {
q.add(new Array[Byte](512))
@@ -507,7 +508,7 @@ class PersistentQueueSpec extends Specification
"report an age of zero on an empty queue" in {
withTempFolder {
- val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, timer)
+ val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, scheduler)
q.setup
put(q, 128, 0)
Thread.sleep(10)
@@ -521,13 +522,14 @@ class PersistentQueueSpec extends Specification
"PersistentQueue with no journal" should {
val timer = new FakeTimer()
+ val scheduler = new ScheduledThreadPoolExecutor(1)
"create no journal" in {
withTempFolder {
val config = new QueueBuilder {
keepJournal = false
}.apply()
- val q = new PersistentQueue("mem", folderName, config, timer, timer)
+ val q = new PersistentQueue("mem", folderName, config, timer, scheduler)
q.setup
q.add("coffee".getBytes)
@@ -541,12 +543,12 @@ class PersistentQueueSpec extends Specification
val config = new QueueBuilder {
keepJournal = false
}.apply()
- val q = new PersistentQueue("mem", folderName, config, timer, timer)
+ val q = new PersistentQueue("mem", folderName, config, timer, scheduler)
q.setup
q.add("coffee".getBytes)
q.close
- val q2 = new PersistentQueue("mem", folderName, config, timer, timer)
+ val q2 = new PersistentQueue("mem", folderName, config, timer, scheduler)
q2.setup
q2.remove mustEqual None
}
@@ -556,13 +558,14 @@ class PersistentQueueSpec extends Specification
"PersistentQueue with item/size limit" should {
val timer = new FakeTimer()
+ val scheduler = new ScheduledThreadPoolExecutor(1)
"honor max_items" in {
withTempFolder {
val config = new QueueBuilder {
maxItems = 1
}.apply()
- val q = new PersistentQueue("weather_updates", folderName, config, timer, timer)
+ val q = new PersistentQueue("weather_updates", folderName, config, timer, scheduler)
q.setup
q.add("sunny".getBytes) mustEqual true
q.add("rainy".getBytes) mustEqual false
@@ -576,7 +579,7 @@ class PersistentQueueSpec extends Specification
val config = new QueueBuilder {
maxSize = 510.bytes
}.apply()
- val q = new PersistentQueue("weather_updates", folderName, config, timer, timer)
+ val q = new PersistentQueue("weather_updates", folderName, config, timer, scheduler)
q.setup
q.add(("a" * 256).getBytes) mustEqual true
q.add(("b" * 256).getBytes) mustEqual true
@@ -593,7 +596,7 @@ class PersistentQueueSpec extends Specification
maxItems = 3
discardOldWhenFull = true
}.apply()
- val q = new PersistentQueue("weather_updates", folderName, config, timer, timer)
+ val q = new PersistentQueue("weather_updates", folderName, config, timer, scheduler)
q.setup
q.add("sunny".getBytes) mustEqual true
q.add("rainy".getBytes) mustEqual true
@@ -607,14 +610,15 @@ class PersistentQueueSpec extends Specification
"PersistentQueue with item expiry" should {
val timer = new FakeTimer()
+ val scheduler = new ScheduledThreadPoolExecutor(1)
"expire items into the ether" in {
withTempFolder {
Time.withCurrentTimeFrozen { time =>
val config = new QueueBuilder {
keepJournal = false
}.apply()
- val q = new PersistentQueue("wu_tang", folderName, config, timer, timer)
+ val q = new PersistentQueue("wu_tang", folderName, config, timer, scheduler)
q.setup()
val expiry = Time.now + 1.second
q.add("rza".getBytes, Some(expiry)) mustEqual true
@@ -635,8 +639,8 @@ class PersistentQueueSpec extends Specification
val config = new QueueBuilder {
keepJournal = false
}.apply()
- val r = new PersistentQueue("rappers", folderName, config, timer, new FakeTimer)
- val q = new PersistentQueue("wu_tang", folderName, config, timer, new FakeTimer)
+ val r = new PersistentQueue("rappers", folderName, config, timer, scheduler)
+ val q = new PersistentQueue("wu_tang", folderName, config, timer, scheduler)
r.setup()
q.setup()
q.expireQueue = Some(r)
View
46 src/test/scala/net/lag/kestrel/QueueCollectionSpec.scala
@@ -18,9 +18,11 @@
package net.lag.kestrel
import java.io.{File, FileInputStream}
+import java.util.concurrent.ScheduledThreadPoolExecutor
import scala.util.Sorting
import com.twitter.util.{TempFolder, Time, Timer}
import com.twitter.conversions.time._
+import com.twitter.conversions.storage._
import com.twitter.ostrich.stats.Stats
import org.specs.Specification
import config._
@@ -32,6 +34,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
"QueueCollection" should {
val timer = new FakeTimer()
+ val scheduler = new ScheduledThreadPoolExecutor(1)
doAfter {
if (qc ne null) {
@@ -42,7 +45,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
"create a queue" in {
withTempFolder {
Stats.clearAll()
- qc = new QueueCollection(folderName, timer, timer, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
qc.queueNames mustEqual Nil
qc.add("work1", "stuff".getBytes)
@@ -66,14 +69,29 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
"refuse to create a bad queue" in {
withTempFolder {
- qc = new QueueCollection(folderName, timer, timer, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
qc.queue("hello.there") must throwA[Exception]
}
}
+ "report reserved memory usage as a fraction of max heap" in {
+ withTempFolder {
+ val maxHeapBytes = config.maxMemorySize.inBytes * 4
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil) {
+ override lazy val systemMaxHeapBytes = maxHeapBytes
+ }
+
+ (1 to 5).foreach { i =>
+ qc.queue("queue" + i)
+
+ qc.reservedMemoryRatio mustEqual (i.toDouble / 4.0)
+ }
+ }
+ }
+
"load from journal" in {
withTempFolder {
- qc = new QueueCollection(folderName, timer, timer, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
qc.add("ducklings", "huey".getBytes)
qc.add("ducklings", "dewey".getBytes)
qc.add("ducklings", "louie".getBytes)
@@ -82,7 +100,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
qc.currentItems mustEqual 3
qc.shutdown
- qc = new QueueCollection(folderName, timer, timer, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
qc.queueNames mustEqual Nil
qc.remove("ducklings")() must beSomeQItem("huey")
// now the queue should be suddenly instantiated:
@@ -94,7 +112,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
"queue hit/miss tracking" in {
withTempFolder {
Stats.clearAll()
- qc = new QueueCollection(folderName, timer, timer, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
qc.add("ducklings", "ugly1".getBytes)
qc.add("ducklings", "ugly2".getBytes)
Stats.getCounter("get_hits")() mustEqual 0
@@ -124,7 +142,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
new File(folderName + "/apples").createNewFile()
new File(folderName + "/oranges.101").createNewFile()
new File(folderName + "/oranges.133").createNewFile()
- qc = new QueueCollection(folderName, timer, timer, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
qc.loadQueues()
qc.queueNames.sorted mustEqual List("apples", "oranges")
}
@@ -135,7 +153,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
new File(folderName + "/apples").createNewFile()
new File(folderName + "/oranges").createNewFile()
new File(folderName + "/oranges~~900").createNewFile()
- qc = new QueueCollection(folderName, timer, timer, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
qc.loadQueues()
qc.queueNames.sorted mustEqual List("apples", "oranges")
}
@@ -145,7 +163,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
withTempFolder {
new File(folderName + "/apples").createNewFile()
new File(folderName + "/oranges").createNewFile()
- qc = new QueueCollection(folderName, timer, timer, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
qc.loadQueues()
qc.delete("oranges")
@@ -157,7 +175,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
"fanout queues" in {
"generate on the fly" in {
withTempFolder {
- qc = new QueueCollection(folderName, timer, timer, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
qc.add("jobs", "job1".getBytes)
qc.remove("jobs+client1")() mustEqual None
qc.add("jobs", "job2".getBytes)
@@ -172,7 +190,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
withTempFolder {
new File(folderName + "/jobs").createNewFile()
new File(folderName + "/jobs+client1").createNewFile()
- qc = new QueueCollection(folderName, timer, timer, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
qc.loadQueues()
qc.add("jobs", "job1".getBytes)
qc.remove("jobs+client1")() must beSomeQItem("job1")
@@ -188,7 +206,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
withTempFolder {
new File(folderName + "/jobs").createNewFile()
new File(folderName + "/jobs+client1").createNewFile()
- qc = new QueueCollection(folderName, timer, timer, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
qc.loadQueues()
qc.add("jobs", "job1".getBytes)
@@ -210,7 +228,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
name = "jobs"
fanoutOnly = true
}
- qc = new QueueCollection(folderName, timer, timer, config, List(jobConfig))
+ qc = new QueueCollection(folderName, timer, scheduler, config, List(jobConfig))
qc.loadQueues()
qc.add("jobs", "job1".getBytes)
qc.remove("jobs")() mustEqual None
@@ -223,7 +241,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
withTempFolder {
Time.withCurrentTimeFrozen { time =>
new File(folderName + "/expired").createNewFile()
- qc = new QueueCollection(folderName, timer, timer, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
qc.loadQueues()
qc.add("expired", "hello".getBytes, Some(5.seconds.fromNow))
@@ -245,7 +263,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
name = "jobs"
expireToQueue = "expired"
}
- qc = new QueueCollection(folderName, timer, timer, config, List(expireConfig))
+ qc = new QueueCollection(folderName, timer, scheduler, config, List(expireConfig))
qc.loadQueues()
qc.add("jobs", "hello".getBytes, Some(1.second.fromNow))
qc.queue("jobs").get.length mustEqual 1
View
19 src/test/scala/net/lag/kestrel/ReadBehindSpec.scala
@@ -18,7 +18,7 @@
package net.lag.kestrel
import java.io.{File, FileInputStream}
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{CountDownLatch, ScheduledThreadPoolExecutor}
import scala.collection.mutable
import com.twitter.conversions.storage._
import com.twitter.conversions.time._
@@ -30,13 +30,14 @@ class ReadBehindSpec extends Specification with TempFolder with TestLogging with
"PersistentQueue read-behind" should {
val timer = new FakeTimer()
+ val scheduler = new ScheduledThreadPoolExecutor(1)
"drop into read-behind mode on insert" in {
withTempFolder {
val config1 = new QueueBuilder {
maxMemorySize = 1.kilobyte
}.apply()
- val q = new PersistentQueue("things", folderName, config1, timer, timer)
+ val q = new PersistentQueue("things", folderName, config1, timer, scheduler)
q.setup
for (i <- 0 until 10) {
@@ -97,7 +98,7 @@ class ReadBehindSpec extends Specification with TempFolder with TestLogging with
val config = new QueueBuilder {
maxMemorySize = 1.kilobyte
}.apply()
- val q = new PersistentQueue("things", folderName, config, timer, timer)
+ val q = new PersistentQueue("things", folderName, config, timer, scheduler)
q.setup
for (i <- 0 until 10) {
@@ -111,7 +112,7 @@ class ReadBehindSpec extends Specification with TempFolder with TestLogging with
q.memoryBytes mustEqual 1024
q.close
- val q2 = new PersistentQueue("things", folderName, config, timer, timer)
+ val q2 = new PersistentQueue("things", folderName, config, timer, scheduler)
q2.setup
q2.inReadBehind mustBe true
@@ -136,7 +137,7 @@ class ReadBehindSpec extends Specification with TempFolder with TestLogging with
val config = new QueueBuilder {
maxMemorySize = 1.kilobyte
}.apply()
- val q = new PersistentQueue("things", folderName, config, timer, timer)
+ val q = new PersistentQueue("things", folderName, config, timer, scheduler)
q.setup
for (i <- 0 until 10) {
@@ -158,7 +159,7 @@ class ReadBehindSpec extends Specification with TempFolder with TestLogging with
q.memoryBytes mustEqual 0
q.close
- val q2 = new PersistentQueue("things", folderName, config, timer, timer)
+ val q2 = new PersistentQueue("things", folderName, config, timer, scheduler)
q2.setup
q2.inReadBehind mustBe false
q2.length mustEqual 0
@@ -174,7 +175,7 @@ class ReadBehindSpec extends Specification with TempFolder with TestLogging with
maxMemorySize = 512.bytes
maxJournalSize = 1.kilobyte
}.apply()
- val q = new PersistentQueue("things", folderName, config, timer, timer)
+ val q = new PersistentQueue("things", folderName, config, timer, scheduler)
q.setup
for (i <- 0 until 10) {
@@ -194,7 +195,7 @@ class ReadBehindSpec extends Specification with TempFolder with TestLogging with
maxMemorySize = 1.kilobyte
maxJournalSize = 512.bytes
}.apply()
- val q = new PersistentQueue("things", folderName, config, timer, timer)
+ val q = new PersistentQueue("things", folderName, config, timer, scheduler)
q.setup
for (i <- 0 until 30) {
@@ -217,7 +218,7 @@ class ReadBehindSpec extends Specification with TempFolder with TestLogging with
q.inReadBehind mustBe false
q.close()
- val q2 = new PersistentQueue("things", folderName, config, timer, timer)
+ val q2 = new PersistentQueue("things", folderName, config, timer, scheduler)
q2.setup
q2.inReadBehind mustBe false
q2.length mustEqual 1

0 comments on commit 4105f0b

Please sign in to comment.