diff --git a/src/main/scala/net/lag/kestrel/PersistentQueue.scala b/src/main/scala/net/lag/kestrel/PersistentQueue.scala index 1eb91fcc..2daaa352 100644 --- a/src/main/scala/net/lag/kestrel/PersistentQueue.scala +++ b/src/main/scala/net/lag/kestrel/PersistentQueue.scala @@ -267,10 +267,9 @@ class PersistentQueue(persistencePath: String, val name: String, if (keepJournal()) { if (transaction) journal.removeTentative() else journal.remove() - if ((queueLength == 0) && (journal.size >= maxJournalSize()) && - (openTransactions.size == 0)) { + if ((queueLength == 0) && (journal.size >= maxJournalSize())) { log.info("Rolling journal file for '%s'", name) - journal.roll(xidCounter, Nil, Nil) + journal.roll(xidCounter, openTransactionIds map { openTransactions(_) }, Nil) } } item diff --git a/src/test/scala/net/lag/kestrel/PersistentQueueSpec.scala b/src/test/scala/net/lag/kestrel/PersistentQueueSpec.scala index d4693837..dee78b54 100644 --- a/src/test/scala/net/lag/kestrel/PersistentQueueSpec.scala +++ b/src/test/scala/net/lag/kestrel/PersistentQueueSpec.scala @@ -100,6 +100,7 @@ object PersistentQueueSpec extends Specification with TestHelper { q.totalItems mustEqual 1 q.bytes mustEqual 11 q.journalSize mustEqual 32 + new File(folderName, "work").length mustEqual 32 new String(q.remove.get.data) mustEqual "hello kitty" @@ -161,23 +162,49 @@ object PersistentQueueSpec extends Specification with TestHelper { q.length mustEqual 2 q.totalItems mustEqual 2 q.bytes mustEqual 32 + 64 - q.journalSize mustEqual 32 + 64 + 16 + 16 + 5 + 5 - new File(folderName, "rolling").length mustEqual 32 + 64 + 16 + 16 + 5 + 5 + (q.journalSize > 96) mustBe true q.remove q.length mustEqual 1 q.totalItems mustEqual 2 q.bytes mustEqual 64 - q.journalSize mustEqual 32 + 64 + 16 + 16 + 5 + 5 + 1 - new File(folderName, "rolling").length mustEqual 32 + 64 + 16 + 16 + 5 + 5 + 1 + (q.journalSize > 96) mustBe true // now it should rotate: q.remove q.length mustEqual 0 q.totalItems mustEqual 2 q.bytes mustEqual 0 - q.journalSize mustEqual 5 // saved xid. - new File(folderName, "rolling").length mustEqual 5 + (q.journalSize < 10) mustBe true + } + } + + "rotate journals with an open transaction" in { + withTempFolder { + val q = makeQueue("rolling") + q.setup + q.maxJournalSize set Some(64) + + q.add(new Array[Byte](32)) + q.add(new Array[Byte](64)) + q.length mustEqual 2 + q.totalItems mustEqual 2 + q.bytes mustEqual 32 + 64 + (q.journalSize > 96) mustBe true + + q.remove + q.length mustEqual 1 + q.totalItems mustEqual 2 + q.bytes mustEqual 64 + (q.journalSize > 96) mustBe true + + // now it should rotate: + q.remove(true) + q.length mustEqual 0 + q.openTransactionCount mustEqual 1 + q.totalItems mustEqual 2 + q.bytes mustEqual 0 + (q.journalSize < 96) mustBe true } }