Permalink
Browse files

never rewrite journals unless the queue is empty or in read-behind

RB_ID=94977
  • Loading branch information...
1 parent 34e37ae commit f3f5a4a3ccc8be1983993a28911accc623e5d04c Stephan Zuercher committed Nov 2, 2012
View
@@ -151,16 +151,18 @@ just a sequential record of each add or remove operation that's happened on
that queue. When kestrel starts up, it replays each queue's journal to build
up the in-memory queue that it uses for client queries.
-The journal file is rotated in one of two conditions:
+The journal file is compacted if the queue is empty and the journal is larger
+than `defaultJournalSize`.
-1. the queue is empty and the journal is larger than `defaultJournalSize`
-
-2. the journal is larger than `maxJournalSize`
+The current journal file is archived (or rotated) when the journal is larger
+than `maxMemorySize`. In addition, if the complete journal exceeds
+`maxJournalSize`, a checkpoint is set. The journal may then be compacted during
+read-behind (when the size of the queue exceeds `maxMemorySize`) via the
+journal packer thread.
For example, if `defaultJournalSize` is 16MB (the default), then if the queue
-is empty and the journal is larger than 16MB, it will be truncated into a new
-(empty) file. If the journal is larger than `maxJournalSize` (1GB by default),
-the journal will be rewritten periodically to contain just the live items.
+is empty and the journal is larger than 16MB, it will be compacted into a new
+(empty, if there are no open transactions) file.
You can turn the journal off for a queue (`keepJournal` = false) and the queue
will exist only in memory. If the server restarts, all enqueued items are
View
@@ -134,13 +134,8 @@ To keep from using an infinite amount of disk space, kestrel has a few
strategies for erasing or compressing old journal files.
- If the queue is empty, then after `defaultJournalSize` (usually 16MB) of
- journal is written, the file is erased and a new file is started. (This is
- the normal "steady-state" operation mode.)
-
-- If the queue is not in read-behind (that is, all of the queue's contents
- are in memory), and the journal files total more than `maxJournalSize`
- (usually 1GB), then the journal is rewritten from scratch. We know that
- will be no more than `maxMemorySize` (usually 128MB).
+ journal is written, the file is erased and a new file is started with only
+ open transactions. (This is the normal "steady-state" operation mode.)
- If the queue *is* in read-behind, a new journal file is started after each
`maxMemorySize` (usually 128MB). At the beginning of each new file, a
@@ -161,3 +156,15 @@ The cons to this strategy are:
- If a kestrel has N live connections, up to N open transactions will be
written out during each checkpoint. For 50k connections, each holding a 1KB
transaction, that could be 50MB.
+
+Kestrel versions prior to 2.4.1 will also attempt to compact the journal if
+the queue is not in read-behind (that is, all of the queue's contents are in
+memory) and the journal files total more than `maxJournalSize` (usually
+1GB). In this case, the journal is rewritten from scratch. We know that
+there will be no more than `maxMemorySize` (usually 128MB) worth of data to
+write. This feature was removed because in the absence of remove operations,
+it was possible to cause Kestrel to perform this rewrite on every add
+operation, rendering it unusable. The work around for this behavior is to
+increase `maxJournalSize` such that the ratio of `maxJournalSize` to
+`maxMemorySize` is greater than the ratio of the `minimum item size + 21` to
+the `minimum item size`.
@@ -197,15 +197,11 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
private def checkRotateJournal() {
/*
* if the queue is empty, and the journal is larger than defaultJournalSize, rebuild it.
- * if the queue is smaller than maxMemorySize, and the combined journals are larger than
- * maxJournalSize, rebuild them. (we are not in read-behind.)
* if the current journal is larger than maxMemorySize, rotate to a new file. if the combined
* journals are larger than maxJournalSize, checkpoint in preparation for rebuilding the
* older files in the background.
*/
- if ((journal.size >= config.defaultJournalSize.inBytes && queueLength == 0) ||
- (journal.size + journal.archivedSize > config.maxJournalSize.inBytes &&
- queueSize < config.maxMemorySize.inBytes)) {
+ if ((journal.size >= config.defaultJournalSize.inBytes && queueLength == 0)) {
log.info("Rewriting journal file for '%s' (qsize=%d)", name, queueSize)
journal.rewrite(openTransactionIds.map { openTransactions(_) }, queue)
} else if (journal.size > config.maxMemorySize.inBytes) {
@@ -104,20 +104,21 @@ class QueueBuilder {
var maxAge: ConfigValue[Option[Duration]] = Default(None)
/**
- * If the queue is empty, truncate the journal when it reaches this size.
+ * If the queue is empty, compact the journal when it reaches this size.
*/
var defaultJournalSize: ConfigValue[StorageUnit] = Default(16.megabytes)
/**
* Keep only this much of the queue in memory. The journal will be used to store backlogged
* items, and they'll be read back into memory as the queue is drained. This setting is a release
- * valve to keep a backed-up queue from consuming all memory.
+ * valve to keep a backed-up queue from consuming all memory. Also, when the current journal file
+ * reaches this size, it is rotated.
*/
var maxMemorySize: ConfigValue[StorageUnit] = Default(128.megabytes)
/**
- * If the queue fits entirely in memory (see maxMemorySize) and the journal files get larger than
- * this, rebuild the journal.
+ * When the journal gets larger than this, set a checkpoint so that the journal may be compacted
+ * during read-behind (when the queue size in bytes exceeds `maxMemorySize`).
*/
var maxJournalSize: ConfigValue[StorageUnit] = Default(1.gigabyte)
@@ -118,7 +118,7 @@ class PersistentQueueSpec extends Specification
}
}
- "rotate journals" in {
+ "rewrite journals" in {
withTempFolder {
val config = new QueueBuilder {
defaultJournalSize = 64.bytes
@@ -139,7 +139,7 @@ class PersistentQueueSpec extends Specification
q.bytes mustEqual 64
(q.journalTotalSize > 96) mustBe true
- // now it should rotate:
+ // now it should rewrite:
q.remove()
q.length mustEqual 0
q.putItems.get mustEqual 2L
@@ -148,7 +148,7 @@ class PersistentQueueSpec extends Specification
}
}
- "rotate journals with an open transaction" in {
+ "rewrite journals with an open transaction" in {
withTempFolder {
val config = new QueueBuilder {
defaultJournalSize = 64.bytes
@@ -169,7 +169,7 @@ class PersistentQueueSpec extends Specification
q.bytes mustEqual 64
(q.journalSize > 96) mustBe true
- // now it should rotate:
+ // now it should rewrite:
q.remove(true)
q.length mustEqual 0
q.openTransactionCount mustEqual 1
@@ -179,6 +179,67 @@ class PersistentQueueSpec extends Specification
}
}
+ "rotate journals when bigger than maxMemorySize and queue is empty" in {
+ withTempFolder {
+ val config = new QueueBuilder {
+ maxMemorySize = 128.bytes
+ defaultJournalSize = 64.bytes
+ }.apply()
+ val q = new PersistentQueue("rotating", folderName, config, timer, scheduler)
+ q.setup()
+
+ q.add(new Array[Byte](96))
+ q.add(new Array[Byte](96))
+ q.length mustEqual 2
+ q.bytes mustEqual 96 + 96
+ (q.journalSize > 128) mustBe true
+ Journal.journalsForQueue(new File(folderName), "rotating").length mustEqual 1
+
+ // now it should rotate:
+ q.add(new Array[Byte](96))
+ q.length mustEqual 3
+ q.bytes mustEqual 96 + 96 + 96
+ Journal.journalsForQueue(new File(folderName), "rotating").length mustEqual 2
+ }
+ }
+
+ "rewrite rotated journals" in {
+ withTempFolder {
+ val config = new QueueBuilder {
+ maxMemorySize = 128.bytes
+ defaultJournalSize = 64.bytes
+ }.apply()
+ val q = new PersistentQueue("rotating", folderName, config, timer, scheduler)
+ q.setup()
+
+ q.add(new Array[Byte](96))
+ q.add(new Array[Byte](96))
+
+ // now it should rotate:
+ q.add(new Array[Byte](96))
+ Journal.journalsForQueue(new File(folderName), "rotating").length mustEqual 2
+
+ q.add(new Array[Byte](96))
+
+ // rotate again:
+ q.add(new Array[Byte](96))
+ Journal.journalsForQueue(new File(folderName), "rotating").length mustEqual 3
+
+ q.remove()
+ q.remove()
+ q.remove()
+ q.remove()
+ Journal.journalsForQueue(new File(folderName), "rotating").length mustEqual 3
+
+ // now it should rewrite
+ q.remove()
+ q.length mustEqual 0
+ q.bytes mustEqual 0
+ q.journalSize mustEqual 0
+ Journal.journalsForQueue(new File(folderName), "rotating").length mustEqual 1
+ }
+ }
+
"recover the journal after a restart" in {
withTempFolder {
val q = new PersistentQueue("rolling", folderName, new QueueBuilder().apply(), timer, scheduler)
@@ -468,51 +529,6 @@ class PersistentQueueSpec extends Specification
}
}
- "recreate the journal file when it gets too big" in {
- withTempFolder {
- val config = new QueueBuilder {
- maxJournalSize = 3.kilobytes
- }.apply()
- val q = new PersistentQueue("things", folderName, config, timer, scheduler)
- q.setup
- q.add(new Array[Byte](512))
- // can't roll the journal normally, cuz there's always one item left.
- for (i <- 0 until 4) {
- q.add(new Array[Byte](512))
- q.remove(false) must beSomeQItem(512)
- }
- q.add(new Array[Byte](512))
- q.length mustEqual 2
- q.journalSize mustEqual (512 * 6) + (6 * 21) + 4
-
- // next remove should force a recreate, because the queue size will be 512.
- q.remove(false) must beSomeQItem(512)
- q.length mustEqual 1
- q.journalSize mustEqual (512 + 21)
-
- // journal should contain exactly 1 item.
- q.close
- dumpJournal("things") mustEqual "add(512:0)"
- }
- }
-
- "don't recreate the journal file if the queue itself is still huge" in {
- withTempFolder {
- val config = new QueueBuilder {
- maxMemorySize = 1.kilobyte
- maxJournalSize = 3.kilobytes
- }.apply()
- val q = new PersistentQueue("things", folderName, config, timer, scheduler)
- q.setup
- for (i <- 0 until 8) {
- q.add(new Array[Byte](512))
- }
- q.length mustEqual 8
- q.bytes mustEqual 4096
- q.journalSize must be_<(q.journalTotalSize)
- }
- }
-
"report an age of zero on an empty queue" in {
withTempFolder {
val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, scheduler)
@@ -38,6 +38,7 @@ class ServerSpec extends Specification with TempFolder with TestLogging {
def makeServer() {
val defaultConfig = new QueueBuilder() {
+ defaultJournalSize = 12.kilobytes
maxJournalSize = 16.kilobytes
}.apply()
// make a queue specify max_items and max_age

0 comments on commit f3f5a4a

Please sign in to comment.