Skip to content
This repository
Browse code

Merge branch 'master' into site

  • Loading branch information...
commit 4105f0becb145042921be22af42ef88eb0d95291 2 parents 6a5d083 + 9cf4d58
authored January 12, 2012
14  ChangeLog
... ...
@@ -1,5 +1,18 @@
  1
+
  2
+2.1.5
  3
+-----
  4
+release: 12 January 2012
  5
+
  6
+- Only sync to disk when there's data to be written. [Stephan Zuercher]
  7
+- Track latency measurements better, and track the percentage of java heap
  8
+  reserved for in-memory queues. [Stephan Zuercher]
  9
+- Fix the startup script to work in "dash". [Matt Parlane]
  10
+- Send the correct reponse back for the memcached protocol for delete. [Matt
  11
+  Erkkila]
  12
+
1 13
 2.1.4
2 14
 -----
  15
+release: 21 November 2011
3 16
 
4 17
 - Separate timers for journal fsync operations from those used for request
5 18
   timeouts and queue expiry
@@ -9,7 +22,6 @@
9 22
   bugs)
10 23
 - Creating a queue with an illegal name causes an error
11 24
 
12  
-
13 25
 2.1.3
14 26
 -----
15 27
 release: 13 October 2011
2  config/production.scala
@@ -21,7 +21,7 @@ new KestrelConfig {
21 21
   default.defaultJournalSize = 16.megabytes
22 22
   default.maxMemorySize = 128.megabytes
23 23
   default.maxJournalSize = 1.gigabyte
24  
-  default.syncJournal = 20.milliseconds
  24
+  default.syncJournal = 100.milliseconds
25 25
 
26 26
   admin.httpPort = 2223
27 27
 
4  project/build.properties
... ...
@@ -1,9 +1,9 @@
1 1
 #Project properties
2  
-#Mon Nov 21 15:08:49 PST 2011
  2
+#Thu Jan 12 16:30:20 PST 2012
3 3
 project.organization=net.lag
4 4
 project.name=kestrel
5 5
 sbt.version=0.7.4
6  
-project.version=2.1.5-SNAPSHOT
  6
+project.version=2.1.6-SNAPSHOT
7 7
 def.scala.version=2.7.7
8 8
 build.scala.versions=2.8.1
9 9
 project.initialize=false
19  project/build/KestrelProject.scala
@@ -58,7 +58,24 @@ class KestrelProject(info: ProjectInfo) extends StandardServiceProject(info) wit
58 58
   lazy val packageLoadTests = packageLoadTestsAction
59 59
   override def packageDistTask = packageLoadTestsAction && super.packageDistTask
60 60
 
61  
-//  override def fork = forkRun(List("-Xmx1024m", "-verbosegc", "-XX:+PrintGCDetails"))
  61
+  // generate a distribution zip for release.
  62
+  def releaseDistTask = task {
  63
+    val releaseDistPath = "dist-release" / distName ##
  64
+
  65
+    releaseDistPath.asFile.mkdirs()
  66
+    (releaseDistPath / "libs").asFile.mkdirs()
  67
+    (releaseDistPath / "config").asFile.mkdirs()
  68
+
  69
+    FileUtilities.copyFlat(List(jarPath), releaseDistPath, log).left.toOption orElse
  70
+      FileUtilities.copyFlat(List(outputPath / loadTestJarFilename), releaseDistPath, log).left.toOption orElse
  71
+      FileUtilities.copyFlat(dependentJars.get, releaseDistPath / "libs", log).left.toOption orElse
  72
+      FileUtilities.copy(((configPath ***) --- (configPath ** "*.class")).get, releaseDistPath / "config", log).left.toOption orElse
  73
+      FileUtilities.copy((scriptsOutputPath ***).get, releaseDistPath, log).left.toOption orElse
  74
+      FileUtilities.zip((("dist-release" ##) / distName).get, "dist-release" / (distName + ".zip"), true, log)
  75
+  }
  76
+  val ReleaseDistDescription = "Creates a deployable zip file with dependencies, config, and scripts."
  77
+  lazy val releaseDist = releaseDistTask.dependsOn(`package`, makePom, copyScripts).describedAs(ReleaseDistDescription)
  78
+
62 79
 
63 80
   lazy val putMany = task { args =>
64 81
     runTask(Some("net.lag.kestrel.load.PutMany"), testClasspath, args).dependsOn(testCompile)
6  project/release.properties
... ...
@@ -1,4 +1,4 @@
1 1
 #Automatically generated by ReleaseManagement
2  
-#Mon Nov 21 15:08:49 PST 2011
3  
-version=2.1.4
4  
-sha1=10d84a9dee3c23813c8fd76b55ce95d958154423
  2
+#Thu Jan 12 16:30:20 PST 2012
  3
+version=2.1.5
  4
+sha1=459ec824963e31e0da78dbc425be0c7084a3e298
8  src/main/scala/net/lag/kestrel/Journal.scala
@@ -20,14 +20,14 @@ package net.lag.kestrel
20 20
 import java.io._
21 21
 import java.nio.{ByteBuffer, ByteOrder}
22 22
 import java.nio.channels.FileChannel
23  
-import java.util.concurrent.LinkedBlockingQueue
  23
+import java.util.concurrent.{LinkedBlockingQueue, ScheduledExecutorService}
24 24
 import java.util.concurrent.atomic.AtomicInteger
25 25
 import scala.annotation.tailrec
26 26
 import com.twitter.conversions.storage._
27 27
 import com.twitter.conversions.time._
28 28
 import com.twitter.logging.Logger
29 29
 import com.twitter.ostrich.admin.BackgroundProcess
30  
-import com.twitter.util.{Future, Duration, Timer, Time}
  30
+import com.twitter.util.{Future, Duration, Time}
31 31
 
32 32
 case class BrokenItemException(lastValidPosition: Long, cause: Throwable) extends IOException(cause)
33 33
 
@@ -51,7 +51,7 @@ object JournalItem {
51 51
 /**
52 52
  * Codes for working with the journal file for a PersistentQueue.
53 53
  */
54  
-class Journal(queuePath: File, queueName: String, syncTimer: Timer, syncJournal: Duration) {
  54
+class Journal(queuePath: File, queueName: String, syncScheduler: ScheduledExecutorService, syncJournal: Duration) {
55 55
   import Journal._
56 56
 
57 57
   private val log = Logger.get(getClass)
@@ -97,7 +97,7 @@ class Journal(queuePath: File, queueName: String, syncTimer: Timer, syncJournal:
97 97
   def this(fullPath: String) = this(fullPath, Duration.MaxValue)
98 98
 
99 99
   private def open(file: File) {
100  
-    writer = new PeriodicSyncFile(file, syncTimer, syncJournal)
  100
+    writer = new PeriodicSyncFile(file, syncScheduler, syncJournal)
101 101
   }
102 102
 
103 103
   def open() {
26  src/main/scala/net/lag/kestrel/Kestrel.scala
@@ -18,9 +18,10 @@
18 18
 package net.lag.kestrel
19 19
 
20 20
 import java.net.InetSocketAddress
21  
-import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
  21
+import java.util.concurrent._
22 22
 import java.util.concurrent.atomic.AtomicInteger
23 23
 import scala.collection.{immutable, mutable}
  24
+import com.twitter.concurrent.NamedPoolThreadFactory
24 25
 import com.twitter.conversions.time._
25 26
 import com.twitter.logging.Logger
26 27
 import com.twitter.naggati.codec.MemcacheCodec
@@ -72,7 +73,7 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],
72 73
 
73 74
   var queueCollection: QueueCollection = null
74 75
   var timer: Timer = null
75  
-  var journalSyncTimer: Timer = null
  76
+  var journalSyncScheduler: ScheduledExecutorService = null
76 77
   var executor: ExecutorService = null
77 78
   var channelFactory: ChannelFactory = null
78 79
   var memcacheAcceptor: Option[Channel] = None
@@ -93,15 +94,25 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],
93 94
              listenAddress, memcacheListenPort, textListenPort, queuePath, protocol,
94 95
              expirationTimerFrequency, clientTimeout, maxOpenTransactions)
95 96
 
96  
-    // this means no timeout will be at better granularity than N ms.
97  
-    journalSyncTimer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
  97
+    // this means no timeout will be at better granularity than 100 ms.
98 98
     timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS)
99 99
 
100  
-    queueCollection = new QueueCollection(queuePath, new NettyTimer(timer), new NettyTimer(journalSyncTimer), defaultQueueConfig, builders)
  100
+    journalSyncScheduler =
  101
+      new ScheduledThreadPoolExecutor(
  102
+        Runtime.getRuntime.availableProcessors,
  103
+        new NamedPoolThreadFactory("journal-sync", true),
  104
+        new RejectedExecutionHandler {
  105
+          override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor) {
  106
+            log.warning("Rejected journal fsync")
  107
+          }
  108
+        })
  109
+
  110
+    queueCollection = new QueueCollection(queuePath, new NettyTimer(timer), journalSyncScheduler, defaultQueueConfig, builders)
101 111
     queueCollection.loadQueues()
102 112
 
103 113
     Stats.addGauge("items") { queueCollection.currentItems.toDouble }
104 114
     Stats.addGauge("bytes") { queueCollection.currentBytes.toDouble }
  115
+    Stats.addGauge("reserved_memory_ratio") { queueCollection.reservedMemoryRatio }
105 116
 
106 117
     // netty setup:
107 118
     executor = Executors.newCachedThreadPool()
@@ -161,8 +172,9 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],
161 172
     executor.awaitTermination(5, TimeUnit.SECONDS)
162 173
     timer.stop()
163 174
     timer = null
164  
-    journalSyncTimer.stop()
165  
-    journalSyncTimer = null
  175
+    journalSyncScheduler.shutdown()
  176
+    journalSyncScheduler.awaitTermination(5, TimeUnit.SECONDS)
  177
+    journalSyncScheduler = null
166 178
     log.info("Goodbye.")
167 179
   }
168 180
 
15  src/main/scala/net/lag/kestrel/KestrelHandler.scala
@@ -194,21 +194,6 @@ abstract class KestrelHandler(val queues: QueueCollection, val maxOpenTransactio
194 194
     waitingFor = Some(future)
195 195
     future.map { itemOption =>
196 196
       waitingFor = None
197  
-      timeout match {
198  
-        case None => {
199  
-          val usec = (Time.now - startTime).inMicroseconds.toInt
200  
-          val statName = if (itemOption.isDefined) "get_hit_latency_usec" else "get_miss_latency_usec"
201  
-          Stats.addMetric(statName, usec)
202  
-          Stats.addMetric("q/" + key + "/" + statName, usec)
203  
-        }
204  
-        case Some(_) => {
205  
-          if (!itemOption.isDefined) {
206  
-            val msec = (Time.now - startTime).inMilliseconds.toInt
207  
-            Stats.addMetric("get_timeout_msec", msec)
208  
-            Stats.addMetric("q/" + key + "/get_timeout_msec", msec)
209  
-          }
210  
-        }
211  
-      }
212 197
       itemOption.foreach { item =>
213 198
         log.debug("get <- %s", item)
214 199
         if (opening) pendingTransactions.add(key, item.xid)
3  src/main/scala/net/lag/kestrel/MemcacheHandler.scala
@@ -85,7 +85,7 @@ extends NettyHandler[MemcacheRequest](channelGroup, queueCollection, maxOpenTran
85 85
         dumpStats(request.line.drop(1))
86 86
       case "delete" =>
87 87
         delete(request.line(1))
88  
-        channel.write(new MemcacheResponse("END"))
  88
+        channel.write(new MemcacheResponse("DELETED"))
89 89
       case "flush_expired" =>
90 90
         channel.write(new MemcacheResponse(flushExpired(request.line(1)).toString))
91 91
       case "flush_all_expired" =>
@@ -189,6 +189,7 @@ extends NettyHandler[MemcacheRequest](channelGroup, queueCollection, maxOpenTran
189 189
     report += (("curr_items", queues.currentItems.toString))
190 190
     report += (("total_items", Stats.getCounter("total_items")().toString))
191 191
     report += (("bytes", queues.currentBytes.toString))
  192
+    report += (("reserved_memory_ratio", "%.3f".format(queues.reservedMemoryRatio)))
192 193
     report += (("curr_connections", Kestrel.sessions.get().toString))
193 194
     report += (("total_connections", Stats.getCounter("total_connections")().toString))
194 195
     report += (("cmd_get", Stats.getCounter("cmd_get")().toString))
65  src/main/scala/net/lag/kestrel/PeriodicSyncFile.scala
... ...
@@ -1,47 +1,86 @@
1 1
 package net.lag.kestrel
2 2
 
3  
-import java.nio.ByteBuffer
4  
-import java.util.concurrent.ConcurrentLinkedQueue
5 3
 import com.twitter.conversions.time._
  4
+import com.twitter.ostrich.stats.Stats
6 5
 import com.twitter.util._
7 6
 import java.io.{IOException, FileOutputStream, File}
  7
+import java.nio.ByteBuffer
  8
+import java.util.concurrent.{ConcurrentLinkedQueue, ScheduledExecutorService, ScheduledFuture, TimeUnit}
  9
+
  10
+abstract class PeriodicSyncTask(val scheduler: ScheduledExecutorService, initialDelay: Duration, period: Duration)
  11
+extends Runnable {
  12
+  @volatile private[this] var scheduledFsync: Option[ScheduledFuture[_]] = None
  13
+
  14
+  def start() {
  15
+    synchronized {
  16
+      if (scheduledFsync.isEmpty && period > 0.seconds) {
  17
+        val handle = scheduler.scheduleWithFixedDelay(this, initialDelay.inMilliseconds, period.inMilliseconds,
  18
+                                                      TimeUnit.MILLISECONDS)
  19
+        scheduledFsync = Some(handle)
  20
+      }
  21
+    }
  22
+  }
  23
+
  24
+  def stop() {
  25
+    synchronized { _stop() }
  26
+  }
  27
+
  28
+  def stopIf(f: => Boolean) {
  29
+    synchronized {
  30
+      if (f) _stop()
  31
+    }
  32
+  }
  33
+
  34
+  private[this] def _stop() {
  35
+    scheduledFsync.foreach { _.cancel(false) }
  36
+    scheduledFsync = None
  37
+  }
  38
+}
8 39
 
9 40
 /**
10 41
  * Open a file for writing, and fsync it on a schedule. The period may be 0 to force an fsync
11 42
  * after every write, or `Duration.MaxValue` to never fsync.
12 43
  */
13  
-class PeriodicSyncFile(file: File, timer: Timer, period: Duration) {
  44
+class PeriodicSyncFile(file: File, scheduler: ScheduledExecutorService, period: Duration) {
14 45
   // pre-completed future for writers who are behaving synchronously.
15 46
   private final val DONE = Future(())
16 47
 
17  
-  val writer = new FileOutputStream(file, true).getChannel
18  
-  val promises = new ConcurrentLinkedQueue[Promise[Unit]]()
19  
-
20  
-  @volatile var closed = false
  48
+  case class TimestampedPromise(val promise: Promise[Unit], val time: Time)
21 49
 
22  
-  if (period > 0.seconds && period < Duration.MaxValue) {
23  
-    timer.schedule(Time.now, period) {
  50
+  val writer = new FileOutputStream(file, true).getChannel
  51
+  val promises = new ConcurrentLinkedQueue[TimestampedPromise]()
  52
+  val periodicSyncTask = new PeriodicSyncTask(scheduler, period, period) {
  53
+    override def run() {
24 54
       if (!closed && !promises.isEmpty) fsync()
25 55
     }
26 56
   }
27 57
 
  58
+  @volatile var closed = false
  59
+
28 60
   private def fsync() {
29 61
     synchronized {
30 62
       // race: we could underestimate the number of completed writes. that's okay.
31 63
       val completed = promises.size
  64
+      val fsyncStart = Time.now
32 65
       try {
33 66
         writer.force(false)
34 67
       } catch {
35 68
         case e: IOException =>
36 69
           for (i <- 0 until completed) {
37  
-            promises.poll().setException(e)
  70
+            promises.poll().promise.setException(e)
38 71
           }
39 72
         return;
40 73
       }
41 74
 
42 75
       for (i <- 0 until completed) {
43  
-        promises.poll().setValue(())
  76
+        val timestampedPromise = promises.poll()
  77
+        timestampedPromise.promise.setValue(())
  78
+        val delaySinceWrite = fsyncStart - timestampedPromise.time
  79
+        val durationBehind = if (delaySinceWrite > period) delaySinceWrite - period else 0.seconds
  80
+        Stats.addMetric("fsync_delay_usec", durationBehind.inMicroseconds.toInt)
44 81
       }
  82
+
  83
+      periodicSyncTask.stopIf { promises.isEmpty }
45 84
     }
46 85
   }
47 86
 
@@ -62,7 +101,8 @@ class PeriodicSyncFile(file: File, timer: Timer, period: Duration) {
62 101
       DONE
63 102
     } else {
64 103
       val promise = new Promise[Unit]()
65  
-      promises.add(promise)
  104
+      promises.add(TimestampedPromise(promise, Time.now))
  105
+      periodicSyncTask.start()
66 106
       promise
67 107
     }
68 108
   }
@@ -73,6 +113,7 @@ class PeriodicSyncFile(file: File, timer: Timer, period: Duration) {
73 113
    */
74 114
   def close() {
75 115
     closed = true
  116
+    periodicSyncTask.stop()
76 117
     fsync()
77 118
     writer.close()
78 119
   }
49  src/main/scala/net/lag/kestrel/PersistentQueue.scala
@@ -20,7 +20,7 @@ package net.lag.kestrel
20 20
 import java.io._
21 21
 import java.nio.{ByteBuffer, ByteOrder}
22 22
 import java.nio.channels.FileChannel
23  
-import java.util.concurrent.{CountDownLatch, Executor}
  23
+import java.util.concurrent.{CountDownLatch, Executor, ScheduledExecutorService}
24 24
 import scala.collection.mutable
25 25
 import com.twitter.conversions.storage._
26 26
 import com.twitter.conversions.time._
@@ -30,10 +30,10 @@ import com.twitter.util._
30 30
 import config._
31 31
 
32 32
 class PersistentQueue(val name: String, persistencePath: String, @volatile var config: QueueConfig,
33  
-                      timer: Timer, journalSyncTimer: Timer,
  33
+                      timer: Timer, journalSyncScheduler: ScheduledExecutorService,
34 34
                       queueLookup: Option[(String => Option[PersistentQueue])]) {
35  
-  def this(name: String, persistencePath: String, config: QueueConfig, timer: Timer, journalSyncTimer: Timer) =
36  
-    this(name, persistencePath, config, timer, journalSyncTimer, None)
  35
+  def this(name: String, persistencePath: String, config: QueueConfig, timer: Timer, journalSyncScheduler: ScheduledExecutorService) =
  36
+    this(name, persistencePath, config, timer, journalSyncScheduler, None)
37 37
 
38 38
   private val log = Logger.get(getClass.getName)
39 39
 
@@ -70,7 +70,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
70 70
   private var paused = false
71 71
 
72 72
   private var journal =
73  
-    new Journal(new File(persistencePath).getCanonicalFile, name, journalSyncTimer, config.syncJournal)
  73
+    new Journal(new File(persistencePath).getCanonicalFile, name, journalSyncScheduler, config.syncJournal)
74 74
 
75 75
   private val waiters = new DeadlineWaitQueue(timer)
76 76
 
@@ -82,6 +82,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
82 82
 
83 83
   def length: Long = synchronized { queueLength }
84 84
   def bytes: Long = synchronized { queueSize }
  85
+  def maxMemoryBytes: Long = synchronized { config.maxMemorySize.inBytes }
85 86
   def journalSize: Long = synchronized { journal.size }
86 87
   def journalTotalSize: Long = journal.archivedSize + journalSize
87 88
   def currentAge: Duration = synchronized { if (queueSize == 0) 0.milliseconds else _currentAge }
@@ -231,7 +232,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
231 232
    *     head of the queue)
232 233
    */
233 234
   def remove(transaction: Boolean): Option[QItem] = {
234  
-    synchronized {
  235
+    val removedItem = synchronized {
235 236
       if (closed || paused || queueLength == 0) {
236 237
         None
237 238
       } else {
@@ -240,9 +241,17 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
240 241
           if (transaction) journal.removeTentative(item.get.xid) else journal.remove()
241 242
           checkRotateJournal()
242 243
         }
  244
+
243 245
         item
244 246
       }
245 247
     }
  248
+
  249
+    removedItem.foreach { qItem =>
  250
+      val usec = (Time.now - qItem.addTime).inMilliseconds.toInt max 0
  251
+      Stats.addMetric("delivery_latency_msec", usec)
  252
+      Stats.addMetric("q/" + name + "/delivery_latency_msec", usec)
  253
+    }
  254
+    removedItem
246 255
   }
247 256
 
248 257
   /**
@@ -250,12 +259,20 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
250 259
    */
251 260
   def remove(): Option[QItem] = remove(false)
252 261
 
253  
-  private def waitOperation(op: => Option[QItem], deadline: Option[Time], future: Promise[Option[QItem]]) {
  262
+  private def waitOperation(op: => Option[QItem], startTime: Time, deadline: Option[Time],
  263
+                            future: Promise[Option[QItem]]) {
254 264
     val item = op
255 265
     if (synchronized {
256 266
       if (!item.isDefined && !closed && !paused && deadline.isDefined && deadline.get > Time.now) {
257 267
         // if we get woken up, try again with the same deadline.
258  
-        val w = waiters.add(deadline.get, { () => waitOperation(op, deadline, future) }, { () => future.setValue(None) })
  268
+        def onTrigger() = waitOperation(op, startTime, deadline, future)
  269
+        def onTimeout() {
  270
+          val msec = (Time.now - startTime).inMilliseconds.toInt
  271
+          Stats.addMetric("get_timeout_msec", msec)
  272
+          Stats.addMetric("q/" + name + "/get_timeout_msec", msec)
  273
+          future.setValue(None)
  274
+        }
  275
+        val w = waiters.add(deadline.get, onTrigger, onTimeout)
259 276
         // FIXME: use onCancellation when util-core is bumped.
260 277
         future.linkTo(new CancellableSink({ waiters.remove(w) }))
261 278
         false
@@ -266,20 +283,22 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
266 283
   }
267 284
 
268 285
   final def waitRemove(deadline: Option[Time], transaction: Boolean): Future[Option[QItem]] = {
  286
+    val startTime = Time.now
269 287
     val promise = new Promise[Option[QItem]]()
270  
-    waitOperation(remove(transaction), deadline, promise)
271  
-    // if an item was handed off immediately, track latency from the "put" to "get".
272  
-    if (promise.isDefined && promise().isDefined) {
273  
-      val usec = (Time.now - promise().get.addTime).inMicroseconds.toInt max 0
274  
-      Stats.addMetric("get_hit_latency_usec", usec)
275  
-      Stats.addMetric("q/" + name + "/get_hit_latency_usec", usec)
  288
+    waitOperation(remove(transaction), startTime, deadline, promise)
  289
+    // if an item was handed off immediately, track latency of the "get" operation
  290
+    if (promise.isDefined) {
  291
+      val statName = if (promise().isDefined) "get_hit_latency_usec" else "get_miss_latency_usec"
  292
+      val usec = (Time.now - startTime).inMicroseconds.toInt max 0
  293
+      Stats.addMetric(statName, usec)
  294
+      Stats.addMetric("q/" + name + "/" + statName, usec)
276 295
     }
277 296
     promise
278 297
   }
279 298
 
280 299
   final def waitPeek(deadline: Option[Time]): Future[Option[QItem]] = {
281 300
     val promise = new Promise[Option[QItem]]()
282  
-    waitOperation(peek(), deadline, promise)
  301
+    waitOperation(peek(), Time.now, deadline, promise)
283 302
     promise
284 303
   }
285 304
 
11  src/main/scala/net/lag/kestrel/QueueCollection.scala
@@ -18,7 +18,7 @@
18 18
 package net.lag.kestrel
19 19
 
20 20
 import java.io.File
21  
-import java.util.concurrent.CountDownLatch
  21
+import java.util.concurrent.{CountDownLatch, ScheduledExecutorService}
22 22
 import scala.collection.mutable
23 23
 import com.twitter.conversions.time._
24 24
 import com.twitter.logging.Logger
@@ -28,7 +28,7 @@ import config._
28 28
 
29 29
 class InaccessibleQueuePath extends Exception("Inaccessible queue path: Must be a directory and writable")
30 30
 
31  
-class QueueCollection(queueFolder: String, timer: Timer, journalSyncTimer: Timer,
  31
+class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: ScheduledExecutorService,
32 32
                       @volatile private var defaultQueueConfig: QueueConfig,
33 33
                       @volatile var queueBuilders: List[QueueBuilder]) {
34 34
   private val log = Logger.get(getClass.getName)
@@ -54,7 +54,7 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncTimer: Timer
54 54
     }
55 55
     val config = queueConfigMap.getOrElse(name, defaultQueueConfig)
56 56
     log.info("Setting up queue %s: %s", realName, config)
57  
-    new PersistentQueue(realName, path, config, timer, journalSyncTimer, Some(this.apply))
  57
+    new PersistentQueue(realName, path, config, timer, journalSyncScheduler, Some(this.apply))
58 58
   }
59 59
 
60 60
   // preload any queues
@@ -68,6 +68,11 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncTimer: Timer
68 68
 
69 69
   def currentItems = queues.values.foldLeft(0L) { _ + _.length }
70 70
   def currentBytes = queues.values.foldLeft(0L) { _ + _.bytes }
  71
+  def reservedMemoryRatio = {
  72
+    val maxBytes = queues.values.foldLeft(0L) { _ + _.maxMemoryBytes }
  73
+    maxBytes.toDouble / systemMaxHeapBytes.toDouble
  74
+  }
  75
+  lazy val systemMaxHeapBytes = Runtime.getRuntime.maxMemory
71 76
 
72 77
   def reload(newDefaultQueueConfig: QueueConfig, newQueueBuilders: List[QueueBuilder]) {
73 78
     defaultQueueConfig = newDefaultQueueConfig
42  src/scripts/kcluster 100644 → 100755
@@ -18,20 +18,28 @@ def fetch_stats(host, port, data)
18 18
   while !done && line = sock.gets.chomp
19 19
     if (line == 'END') then
20 20
       done = true
21  
-    elsif line =~ /STAT queue_(\w+) (\d+)/
  21
+    elsif line =~ /STAT queue_([\w+]+) (\d+)/
22 22
       key = $1
23 23
       value = $2.to_i
24  
-      if key =~ /(\w+)_total_items/
25  
-        data[:total_items][$1] += value
26  
-      elsif key =~ /(\w+)_items/
27  
-        data[:items][$1] += value
28  
-      elsif key =~ /(\w+)_mem_bytes/
29  
-        data[:mem_bytes][$1] += value
30  
-      elsif key =~ /(\w+)_bytes/
31  
-        data[:bytes][$1] += value
32  
-      elsif key =~ /(\w+)_age/
33  
-        data[:min_age][$1] = value if value < data[:min_age][$1]
34  
-        data[:max_age][$1] = value if value > data[:max_age][$1]
  24
+      (stat, queue_name) = case key
  25
+        when /([\w+]+)_total_items/   then [:total_items, $1]
  26
+        when /([\w+]+)_expired_items/ then [:expired_items, $1]
  27
+        when /([\w+]+)_mem_items/     then [:mem_items, $1]
  28
+        when /([\w+]+)_items/         then [:items, $1]
  29
+        when /([\w+]+)_mem_bytes/     then [:mem_bytes, $1]
  30
+        when /([\w+]+)_bytes/         then [:bytes, $1]
  31
+        when /([\w+]+)_age/           then [:age, $1]
  32
+      end
  33
+
  34
+      if (queue_name)
  35
+        queue_name = queue_name.split('+', 2).first if $options[:rollup_fanouts]
  36
+
  37
+        if (stat == :age)
  38
+          data[:min_age][queue_name] = value if value < data[:min_age][queue_name]
  39
+          data[:max_age][queue_name] = value if value > data[:max_age][queue_name]
  40
+        else
  41
+          data[stat][queue_name] += value
  42
+        end
35 43
       end
36 44
     end
37 45
   end
@@ -65,7 +73,8 @@ def report(data, key)
65 73
   format = "%14s %s\n"
66 74
   printf(format, key, "queue")
67 75
   printf(format, "============", "====================")
68  
-  data[key].each { |queue, value| printf("%14d %s\n", value, queue) }
  76
+  stats = data[key] || {}
  77
+  stats.each { |queue, value| printf("%14d %s\n", value, queue) }
69 78
 end
70 79
 
71 80
 def report_all(data, keys)
@@ -131,8 +140,8 @@ def find_stale(rounds)
131 140
   printf("%11s %11s %s\n", "total_items", "items", "queue")
132 141
   printf("%11s %11s %s\n", "-----------", "-----------", "--------------------")
133 142
   stale.each do |queue_name|
134  
-    items = sorted[:items][stale]
135  
-    total_items = sorted[:total_items][stale]
  143
+    items = last[:items][queue_name]
  144
+    total_items = last[:total_items][queue_name]
136 145
     printf("%11d %11d %s\n", total_items, items, queue_name)
137 146
   end
138 147
 end
@@ -150,6 +159,9 @@ parser = OptionParser.new do |opts|
150 159
   opts.on("-p", "--port=N", "use port (default: #{$options[:port]})") do |port|
151 160
     $options[:port] = port.to_i
152 161
   end
  162
+  opts.on("-r", "--rollup-fanouts", "roll up stats for fanout queues into a single count") do
  163
+    $options[:rollup_fanouts] = true
  164
+  end
153 165
 
154 166
   opts.separator ""
155 167
   opts.separator "Commands:"
4  src/scripts/kestrel.sh
@@ -34,11 +34,11 @@ daemon_args="--name $APP_NAME --pidfile $daemon_pidfile --core --chdir /"
34 34
 daemon_start_args="--stdout=/var/log/$APP_NAME/stdout --stderr=/var/log/$APP_NAME/error"
35 35
 
36 36
 
37  
-function running() {
  37
+running() {
38 38
   $DAEMON $daemon_args --running
39 39
 }
40 40
 
41  
-function find_java() {
  41
+find_java() {
42 42
   if [ ! -z "$JAVA_HOME" ]; then
43 43
     return
44 44
   fi
16  src/test/scala/net/lag/kestrel/KestrelHandlerSpec.scala
@@ -18,6 +18,7 @@
18 18
 package net.lag.kestrel
19 19
 
20 20
 import java.io.{File, FileInputStream}
  21
+import java.util.concurrent.ScheduledThreadPoolExecutor
21 22
 import scala.util.Sorting
22 23
 import com.twitter.conversions.time._
23 24
 import com.twitter.ostrich.stats.Stats
@@ -44,6 +45,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
44 45
   "KestrelHandler" should {
45 46
     var queues: QueueCollection = null
46 47
     val timer = new FakeTimer()
  48
+    val scheduler = new ScheduledThreadPoolExecutor(1)
47 49
 
48 50
     doAfter {
49 51
       queues.shutdown()
@@ -51,7 +53,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
51 53
 
52 54
     "set and get" in {
53 55
       withTempFolder {
54  
-        queues = new QueueCollection(folderName, timer, timer, config, Nil)
  56
+        queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
55 57
         val handler = new FakeKestrelHandler(queues, 10)
56 58
         handler.setItem("test", 0, None, "one".getBytes)
57 59
         handler.setItem("test", 0, None, "two".getBytes)
@@ -63,7 +65,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
63 65
     "track stats" in {
64 66
       withTempFolder {
65 67
         Stats.clearAll()
66  
-        queues = new QueueCollection(folderName, timer, timer, config, Nil)
  68
+        queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
67 69
         val handler = new FakeKestrelHandler(queues, 10)
68 70
 
69 71
         Stats.getCounter("cmd_get")() mustEqual 0
@@ -91,7 +93,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
91 93
 
92 94
     "abort and confirm a transaction" in {
93 95
       withTempFolder {
94  
-        queues = new QueueCollection(folderName, timer, timer, config, Nil)
  96
+        queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
95 97
         val handler = new FakeKestrelHandler(queues, 10)
96 98
         handler.setItem("test", 0, None, "one".getBytes)
97 99
         handler.getItem("test", None, true, false)() must beString("one")
@@ -106,7 +108,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
106 108
     "open several transactions" in {
107 109
       "on one queue" in {
108 110
         withTempFolder {
109  
-          queues = new QueueCollection(folderName, timer, timer, config, Nil)
  111
+          queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
110 112
           val handler = new FakeKestrelHandler(queues, 10)
111 113
           handler.setItem("test", 0, None, "one".getBytes)
112 114
           handler.setItem("test", 0, None, "two".getBytes)
@@ -124,7 +126,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
124 126
 
125 127
       "on several queues" in {
126 128
         withTempFolder {
127  
-          queues = new QueueCollection(folderName, timer, timer, config, Nil)
  129
+          queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
128 130
           val handler = new FakeKestrelHandler(queues, 10)
129 131
           handler.setItem("red", 0, None, "red1".getBytes)
130 132
           handler.setItem("red", 0, None, "red2".getBytes)
@@ -153,7 +155,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
153 155
 
154 156
       "but not if transactions are limited" in {
155 157
         withTempFolder {
156  
-          queues = new QueueCollection(folderName, timer, timer, config, Nil)
  158
+          queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
157 159
           val handler = new FakeKestrelHandler(queues, 1)
158 160
           handler.setItem("red", 0, None, "red1".getBytes)
159 161
           handler.setItem("red", 0, None, "red2".getBytes)
@@ -164,7 +166,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
164 166
 
165 167
       "close all transactions" in {
166 168
         withTempFolder {
167  
-          queues = new QueueCollection(folderName, timer, timer, config, Nil)
  169
+          queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
168 170
           val handler = new FakeKestrelHandler(queues, 2)
169 171
           handler.setItem("red", 0, None, "red1".getBytes)
170 172
           handler.setItem("red", 0, None, "red2".getBytes)
87  src/test/scala/net/lag/kestrel/PeriodicSyncFileSpec.scala
... ...
@@ -0,0 +1,87 @@
  1
+/*
  2
+ * Copyright 2011 Twitter, Inc.
  3
+ * Copyright 2011 Robey Pointer <robeypointer@gmail.com>
  4
+ *
  5
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
  6
+ * not use this file except in compliance with the License. You may obtain
  7
+ * a copy of the License at
  8
+ *
  9
+ *     http://www.apache.org/licenses/LICENSE-2.0
  10
+ *
  11
+ * Unless required by applicable law or agreed to in writing, software
  12
+ * distributed under the License is distributed on an "AS IS" BASIS,
  13
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14
+ * See the License for the specific language governing permissions and
  15
+ * limitations under the License.
  16
+ */
  17
+
  18
+package net.lag.kestrel
  19
+
  20
+import com.twitter.conversions.time._
  21
+import com.twitter.util.Duration
  22
+import java.util.concurrent._
  23
+import java.util.concurrent.atomic.AtomicInteger
  24
+import org.specs.Specification
  25
+import org.specs.matcher.Matcher
  26
+
  27
+class PeriodicSyncFileSpec extends Specification
  28
+  with TestLogging
  29
+  with QueueMatchers
  30
+{
  31
+  "PeriodicSyncTask" should {
  32
+    val scheduler = new ScheduledThreadPoolExecutor(4)
  33
+    val invocations = new AtomicInteger(0)
  34
+    val syncTask = new PeriodicSyncTask(scheduler, 0.milliseconds, 20.milliseconds) {
  35
+      override def run() {
  36
+        invocations.incrementAndGet
  37
+      }
  38
+    }
  39
+
  40
+    doAfter {
  41
+      scheduler.shutdown()
  42
+      scheduler.awaitTermination(5, TimeUnit.SECONDS)
  43
+    }
  44
+
  45
+    "only start once" in {
  46
+      val (_, duration) = Duration.inMilliseconds {
  47
+        syncTask.start()
  48
+        syncTask.start()
  49
+        Thread.sleep(100)
  50
+        syncTask.stop()
  51
+      }
  52
+
  53
+      val expectedInvocations = duration.inMilliseconds / 20
  54
+      (invocations.get <= expectedInvocations * 3 / 2) mustBe true
  55
+    }
  56
+
  57
+    "stop" in {
  58
+      syncTask.start()
  59
+      Thread.sleep(100)
  60
+      syncTask.stop()
  61
+      val invocationsPostTermination = invocations.get
  62
+      Thread.sleep(100)
  63
+      invocations.get mustEqual invocationsPostTermination
  64
+    }
  65
+
  66
+    "stop given a condition" in {
  67
+      syncTask.start()
  68
+      Thread.sleep(100)
  69
+
  70
+      val invocationsPreStop = invocations.get
  71
+      syncTask.stopIf { false }
  72
+      Thread.sleep(100)
  73
+
  74
+      val invocationsPostIgnoredStop = invocations.get
  75
+      syncTask.stopIf { true }
  76
+      Thread.sleep(100)
  77
+
  78
+      val invocationsPostStop = invocations.get
  79
+      Thread.sleep(100)
  80
+
  81
+      (invocationsPreStop > 0) mustBe true                            // did something
  82
+      (invocationsPostIgnoredStop > invocationsPreStop) mustBe true   // kept going
  83
+      (invocationsPostStop >= invocationsPostIgnoredStop) mustBe true // maybe did more
  84
+      invocations.get mustEqual invocationsPostStop                   // stopped
  85
+    }
  86
+  }
  87
+}
76  src/test/scala/net/lag/kestrel/PersistentQueueSpec.scala
@@ -18,7 +18,7 @@
18 18
 package net.lag.kestrel
19 19
 
20 20
 import java.io.{File, FileInputStream}
21  
-import java.util.concurrent.CountDownLatch
  21
+import java.util.concurrent.{CountDownLatch, ScheduledThreadPoolExecutor}
22 22
 import scala.collection.mutable
23 23
 import com.twitter.conversions.storage._
24 24
 import com.twitter.conversions.time._
@@ -35,6 +35,7 @@ class PersistentQueueSpec extends Specification
35 35
 {
36 36
   "PersistentQueue" should {
37 37
     val timer = new FakeTimer()
  38
+    val scheduler = new ScheduledThreadPoolExecutor(1)
38 39
 
39 40
     doBefore {
40 41
       timer.timerTask.cancelled = false
@@ -42,7 +43,7 @@ class PersistentQueueSpec extends Specification
42 43
 
43 44
     "add and remove one item" in {
44 45
       withTempFolder {
45  
-        val q = new PersistentQueue("work", folderName, new QueueBuilder().apply(), timer, timer)
  46
+        val q = new PersistentQueue("work", folderName, new QueueBuilder().apply(), timer, scheduler)
46 47
         q.setup
47 48
 
48 49
         q.length mustEqual 0
@@ -75,7 +76,7 @@ class PersistentQueueSpec extends Specification
75 76
         val config = new QueueBuilder {
76 77
           maxItemSize = 128.bytes
77 78
         }.apply()
78  
-        val q = new PersistentQueue("work", folderName, config, timer, timer)
  79
+        val q = new PersistentQueue("work", folderName, config, timer, scheduler)
79 80
         q.setup()
80 81
         q.length mustEqual 0
81 82
         q.add(new Array[Byte](127)) mustEqual true
@@ -87,7 +88,7 @@ class PersistentQueueSpec extends Specification
87 88
 
88 89
     "flush all items" in {
89 90
       withTempFolder {
90  
-        val q = new PersistentQueue("work", folderName, new QueueBuilder().apply(), timer, timer)
  91
+        val q = new PersistentQueue("work", folderName, new QueueBuilder().apply(), timer, scheduler)
91 92
         q.setup()
92 93
 
93 94
         q.length mustEqual 0
@@ -115,7 +116,7 @@ class PersistentQueueSpec extends Specification
115 116
         val config = new QueueBuilder {
116 117
           defaultJournalSize = 64.bytes
117 118
         }.apply()
118  
-        val q = new PersistentQueue("rolling", folderName, config, timer, timer)
  119
+        val q = new PersistentQueue("rolling", folderName, config, timer, scheduler)
119 120
         q.setup()
120 121
 
121 122
         q.add(new Array[Byte](32))
@@ -145,7 +146,7 @@ class PersistentQueueSpec extends Specification
145 146
         val config = new QueueBuilder {
146 147
           defaultJournalSize = 64.bytes
147 148
         }.apply()
148  
-        val q = new PersistentQueue("rolling", folderName, config, timer, timer)
  149
+        val q = new PersistentQueue("rolling", folderName, config, timer, scheduler)
149 150
         q.setup()
150 151
 
151 152
         q.add(new Array[Byte](32))
@@ -173,7 +174,7 @@ class PersistentQueueSpec extends Specification
173 174
 
174 175
     "recover the journal after a restart" in {
175 176
       withTempFolder {
176  
-        val q = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, timer)
  177
+        val q = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, scheduler)
177 178
         q.setup
178 179
         q.add("first".getBytes)
179 180
         q.add("second".getBytes)
@@ -181,7 +182,7 @@ class PersistentQueueSpec extends Specification
181 182
         q.journalSize mustEqual 5 + 6 + 16 + 16 + 5 + 5 + 1
182 183
         q.close
183 184
 
184  
-        val q2 = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, timer)
  185
+        val q2 = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, scheduler)
185 186
         q2.setup
186 187
         q2.journalSize mustEqual 5 + 6 + 16 + 16 + 5 + 5 + 1
187 188
         new String(q2.remove.get.data) mustEqual "second"
@@ -189,7 +190,7 @@ class PersistentQueueSpec extends Specification
189 190
         q2.length mustEqual 0
190 191
         q2.close
191 192
 
192  
-        val q3 = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, timer)
  193
+        val q3 = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, scheduler)
193 194
         q3.setup
194 195
         q3.journalSize mustEqual 5 + 6 + 16 + 16 + 5 + 5 + 1 + 1
195 196
         q3.length mustEqual 0
@@ -198,7 +199,7 @@ class PersistentQueueSpec extends Specification
198 199
 
199 200
     "recover a journal with a rewritten transaction" in {
200 201
       withTempFolder {
201  
-        val q = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, timer)
  202
+        val q = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, scheduler)
202 203
         q.setup()
203 204
         q.add("zero".getBytes)
204 205
         q.add("first".getBytes)
@@ -215,7 +216,7 @@ class PersistentQueueSpec extends Specification
215 216
         q.confirmRemove(item.xid)
216 217
         q.close()
217 218
 
218  
-        val q2 = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, timer)
  219
+        val q2 = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, scheduler)
219 220
         q2.setup()
220 221
         new String(q2.remove().get.data) mustEqual "second"
221 222
         q2.close()
@@ -229,7 +230,7 @@ class PersistentQueueSpec extends Specification
229 230
           val config = new QueueBuilder {
230 231
             maxAge = 3.seconds
231 232
           }.apply()
232  
-          val q = new PersistentQueue("weather_updates", folderName, config, timer, timer)
  233
+          val q = new PersistentQueue("weather_updates", folderName, config, timer, scheduler)
233 234
           q.setup()
234 235
           q.add("sunny".getBytes) mustEqual true
235 236
           q.length mustEqual 1
@@ -254,13 +255,13 @@ class PersistentQueueSpec extends Specification
254 255
         val config1 = new QueueBuilder {
255 256
           maxMemorySize = 123.bytes
256 257
         }.apply()
257  
-        val q1 = new PersistentQueue("test1", folderName, config1, timer, timer)
  258
+        val q1 = new PersistentQueue("test1", folderName, config1, timer, scheduler)
258 259
         q1.config.maxJournalSize mustEqual new QueueBuilder().maxJournalSize
259 260
         q1.config.maxMemorySize mustEqual 123.bytes
260 261
         val config2 = new QueueBuilder {
261 262
           maxJournalSize = 123.bytes
262 263
         }.apply()
263  
-        val q2 = new PersistentQueue("test1", folderName, config2, timer, timer)
  264
+        val q2 = new PersistentQueue("test1", folderName, config2, timer, scheduler)
264 265
         q2.config.maxJournalSize mustEqual 123.bytes
265 266
         q2.config.maxMemorySize mustEqual new QueueBuilder().maxMemorySize
266 267
       }
@@ -272,7 +273,7 @@ class PersistentQueueSpec extends Specification
272 273
           val config1 = new QueueBuilder {
273 274
             maxMemorySize = 1.kilobyte
274 275
           }.apply()
275  
-          val q = new PersistentQueue("things", folderName, config1, timer, new FakeTimer)
  276
+          val q = new PersistentQueue("things", folderName, config1, timer, scheduler)
276 277
           q.setup
277 278
 
278 279
           var rv: Option[String] = None
@@ -294,7 +295,7 @@ class PersistentQueueSpec extends Specification
294 295
           val config1 = new QueueBuilder {
295 296
             maxMemorySize = 1.kilobyte
296 297
           }.apply()
297  
-          val q = new PersistentQueue("things", folderName, config1, timer, new FakeTimer)
  298
+          val q = new PersistentQueue("things", folderName, config1, timer, scheduler)
298 299
           q.setup
299 300
 
300 301
           var rv: Option[String] = Some("foo")
@@ -316,7 +317,7 @@ class PersistentQueueSpec extends Specification
316 317
 
317 318
         "when an item arrives" in {
318 319
           withTempFolder {
319  
-            val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, new FakeTimer)
  320
+            val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, scheduler)
320 321
             q.setup()
321 322
 
322 323
             var rv: Option[String] = None
@@ -336,7 +337,7 @@ class PersistentQueueSpec extends Specification
336 337
 
337 338
         "when the connection dies" in {
338 339
           withTempFolder {
339  
-            val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, new FakeTimer)
  340
+            val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, scheduler)
340 341
             q.setup()
341 342
 
342 343
             var rv: Option[String] = None
@@ -360,7 +361,7 @@ class PersistentQueueSpec extends Specification
360 361
         val config = new QueueBuilder {
361 362
           maxMemorySize = 1.kilobyte
362 363
         }.apply()
363  
-        val q = new PersistentQueue("things", folderName, config, timer, timer)
  364
+        val q = new PersistentQueue("things", folderName, config, timer, scheduler)
364 365
 
365 366
         q.setup
366 367
         q.add("house".getBytes)
@@ -397,7 +398,7 @@ class PersistentQueueSpec extends Specification
397 398
           "add(5:0:house), add(3:0:cat), remove-tentative(1), remove-tentative(2), unremove(1), confirm-remove(2), remove"
398 399
 
399 400
         // and journal is replayed correctly.
400  
-        val q2 = new PersistentQueue("things", folderName, config, timer, timer)
  401
+        val q2 = new PersistentQueue("things", folderName, config, timer, scheduler)
401 402
         q2.setup
402 403
         q2.length mustEqual 0
403 404
         q2.bytes mustEqual 0
@@ -406,7 +407,7 @@ class PersistentQueueSpec extends Specification
406 407
 
407 408
     "recover a journal with open transactions" in {
408 409
       withTempFolder {
409  
-        val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, timer)
  410
+        val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, scheduler)
410 411
         q.setup
411 412
         q.add("one".getBytes)
412 413
         q.add("two".getBytes)
@@ -428,7 +429,7 @@ class PersistentQueueSpec extends Specification
428 429
         q.confirmRemove(item4.get.xid)
429 430
         q.close
430 431
 
431  
-        val q2 = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, timer)
  432
+        val q2 = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, scheduler)
432 433
         q2.setup
433 434
         q2.length mustEqual 3
434 435
         q2.openTransactionCount mustEqual 0
@@ -441,7 +442,7 @@ class PersistentQueueSpec extends Specification
441 442
 
442 443
     "continue a queue item" in {
443 444
       withTempFolder {
444  
-        val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, timer)
  445
+        val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, scheduler)
445 446
         q.setup
446 447
         q.add("one".getBytes)
447 448
 
@@ -451,7 +452,7 @@ class PersistentQueueSpec extends Specification
451 452
         q.continue(item1.get.xid, "two".getBytes)
452 453
         q.close
453 454
 
454  
-        val q2 = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, timer)
  455
+        val q2 = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, scheduler)
455 456
         q2.setup
456 457
         q2.length mustEqual 1
457 458
         q2.openTransactionCount mustEqual 0
@@ -465,7 +466,7 @@ class PersistentQueueSpec extends Specification
465 466
         val config = new QueueBuilder {
466 467
           maxJournalSize = 3.kilobytes
467 468
         }.apply()
468  
-        val q = new PersistentQueue("things", folderName, config, timer, timer)
  469
+        val q = new PersistentQueue("things", folderName, config, timer, scheduler)
469 470
         q.setup
470 471
         q.add(new Array[Byte](512))
471 472
         // can't roll the journal normally, cuz there's always one item left.
@@ -494,7 +495,7 @@ class PersistentQueueSpec extends Specification
494 495
           maxMemorySize = 1.kilobyte
495 496
           maxJournalSize = 3.kilobytes
496 497
         }.apply()
497  
-        val q = new PersistentQueue("things", folderName, config, timer, timer)
  498
+        val q = new PersistentQueue("things", folderName, config, timer, scheduler)
498 499
         q.setup
499 500
         for (i <- 0 until 8) {
500 501
           q.add(new Array[Byte](512))
@@ -507,7 +508,7 @@ class PersistentQueueSpec extends Specification
507 508
 
508 509
     "report an age of zero on an empty queue" in {
509 510
       withTempFolder {
510  
-        val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, timer)
  511
+        val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, scheduler)
511 512
         q.setup
512 513
         put(q, 128, 0)
513 514
         Thread.sleep(10)
@@ -521,13 +522,14 @@ class PersistentQueueSpec extends Specification
521 522
 
522 523
   "PersistentQueue with no journal" should {
523 524
     val timer = new FakeTimer()
  525
+    val scheduler = new ScheduledThreadPoolExecutor(1)
524 526
 
525 527
     "create no journal" in {
526 528
       withTempFolder {
527 529
         val config = new QueueBuilder {
528 530
           keepJournal = false
529 531
         }.apply()
530  
-        val q = new PersistentQueue("mem", folderName, config, timer, timer)
  532
+        val q = new PersistentQueue("mem", folderName, config, timer, scheduler)
531 533
         q.setup
532 534
 
533 535
         q.add("coffee".getBytes)
@@ -541,12 +543,12 @@ class PersistentQueueSpec extends Specification
541 543
         val config = new QueueBuilder {
542 544
           keepJournal = false
543 545
         }.apply()
544  
-        val q = new PersistentQueue("mem", folderName, config, timer, timer)
  546
+        val q = new PersistentQueue("mem", folderName, config, timer, scheduler)
545 547
         q.setup
546 548
         q.add("coffee".getBytes)
547 549
         q.close
548 550
 
549  
-        val q2 = new PersistentQueue("mem", folderName, config, timer, timer)
  551
+        val q2 = new PersistentQueue("mem", folderName, config, timer, scheduler)
550 552
         q2.setup
551 553
         q2.remove mustEqual None
552 554
       }
@@ -556,13 +558,14 @@ class PersistentQueueSpec extends Specification
556 558
 
557 559
   "PersistentQueue with item/size limit" should {
558