Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: twitter/kestrel
...
head fork: twitter/kestrel
Checking mergeability… Don't worry, you can still create the pull request.
  • 2 commits
  • 4 files changed
  • 0 commit comments
  • 1 contributor
Commits on Feb 22, 2011
Robey Pointer port the "continue" branch to 1.2 and label it as 1.3. b4fc37b
Robey Pointer 1.3.0 9a9b686
View
4 project/build.properties
@@ -1,9 +1,9 @@
#Project properties
-#Wed Jan 12 13:36:46 PST 2011
+#Tue Feb 22 15:26:42 PST 2011
project.organization=net.lag
project.name=kestrel
sbt.version=0.7.4
-project.version=1.2.9-SNAPSHOT
+project.version=1.3.0
def.scala.version=2.7.7
build.scala.versions=2.7.7
project.initialize=false
View
21 src/main/scala/net/lag/kestrel/Journal.scala
@@ -34,6 +34,7 @@ object JournalItem {
case class SavedXid(xid: Int) extends JournalItem
case class Unremove(xid: Int) extends JournalItem
case class ConfirmRemove(xid: Int) extends JournalItem
+ case class Continue(item: QItem, xid: Int) extends JournalItem
case object EndOfFile extends JournalItem
}
@@ -65,7 +66,7 @@ class Journal(queuePath: String, syncJournal: => Boolean) {
private val CMD_UNREMOVE = 5
private val CMD_CONFIRM_REMOVE = 6
private val CMD_ADD_XID = 7
-
+ private val CMD_CONTINUE = 8
private def open(file: File): Unit = {
writer = new FileOutputStream(file, true).getChannel
@@ -81,7 +82,7 @@ class Journal(queuePath: String, syncJournal: => Boolean) {
open(tmpFile)
size = 0
for (item <- openItems) {
- addWithXid(item)
+ addWithXidAndCommand(CMD_ADD_XID, item)
removeTentative(false)
}
saveXid(xid)
@@ -124,9 +125,13 @@ class Journal(queuePath: String, syncJournal: => Boolean) {
def add(item: QItem): Unit = add(true, item)
- // used only to list pending transactions when recreating the journal.
- private def addWithXid(item: QItem) = {
- val blob = item.pack(CMD_ADD_XID.toByte, true)
+ def continue(xid: Int, item: QItem): Unit = {
+ item.xid = xid
+ addWithXidAndCommand(CMD_CONTINUE, item)
+ }
+
+ private def addWithXidAndCommand(command: Int, item: QItem) = {
+ val blob = item.pack(command.toByte, true)
do {
writer.write(blob)
} while (blob.position < blob.limit)
@@ -174,6 +179,7 @@ class Journal(queuePath: String, syncJournal: => Boolean) {
} else {
readJournalEntry(rj) match {
case (JournalItem.Add(item), _) => f(item)
+ case (JournalItem.Continue(item, xid), _) => f(item)
case (_, _) =>
}
}
@@ -267,6 +273,11 @@ class Journal(queuePath: String, syncJournal: => Boolean) {
val item = QItem.unpack(data)
item.xid = xid
(JournalItem.Add(item), 9 + data.length)
+ case CMD_CONTINUE =>
+ val xid = readInt(in)
+ val data = readBlock(in)
+ val item = QItem.unpack(data)
+ (JournalItem.Continue(item, xid), 9 + data.length)
case n =>
throw new BrokenItemException(lastPosition, new IOException("invalid opcode in journal: " + n.toInt + " at position " + (in.position - 1)))
}
View
19 src/main/scala/net/lag/kestrel/PersistentQueue.scala
@@ -210,7 +210,7 @@ class PersistentQueue(persistencePath: String, val name: String,
/**
* Add a value to the end of the queue, transactionally.
*/
- def add(value: Array[Byte], expiry: Long): Boolean = synchronized {
+ def add(value: Array[Byte], expiry: Long, xid: Option[Int]): Boolean = synchronized {
if (closed || value.size > maxItemSize()) return false
while (queueLength >= maxItems() || queueSize >= maxSize()) {
if (!discardOldWhenFull()) return false
@@ -232,15 +232,25 @@ class PersistentQueue(persistencePath: String, val name: String,
journal.startReadBehind
}
}
+ if (xid != None) openTransactions.removeKey(xid.get)
_add(item)
- if (keepJournal()) journal.add(item)
+ if (keepJournal()) {
+ xid match {
+ case None => journal.add(item)
+ case _ => journal.continue(xid.get, item)
+ }
+ }
if (waiters.size > 0) {
waiters.remove(0).actor ! ItemArrived
}
true
}
- def add(value: Array[Byte]): Boolean = add(value, 0)
+ def add(value: Array[Byte]): Boolean = add(value, 0, None)
+ def add(value: Array[Byte], expiry: Long): Boolean = add(value, expiry, None)
+
+ def continue(xid: Int, value: Array[Byte]): Boolean = add(value, 0, Some(xid))
+ def continue(xid: Int, value: Array[Byte], expiry: Long): Boolean = add(value, expiry, Some(xid))
/**
* Peek at the head item in the queue, if there is one.
@@ -465,6 +475,9 @@ class PersistentQueue(persistencePath: String, val name: String,
case JournalItem.SavedXid(xid) => xidCounter = xid
case JournalItem.Unremove(xid) => _unremove(xid)
case JournalItem.ConfirmRemove(xid) => openTransactions.removeKey(xid)
+ case JournalItem.Continue(item, xid) =>
+ openTransactions.removeKey(xid)
+ _add(item)
case x => log.error("Unexpected item in journal: %s", x)
}
View
21 src/test/scala/net/lag/kestrel/PersistentQueueSpec.scala
@@ -507,6 +507,27 @@ class PersistentQueueSpec extends Specification with TestHelper {
}
}
+ "continue a queue item" in {
+ withTempFolder {
+ val q = new PersistentQueue(folderName, "things", Config.fromMap(Map.empty))
+ q.setup
+ q.add("one".getBytes)
+
+ val item1 = q.remove(true)
+ new String(item1.get.data) mustEqual "one"
+
+ q.continue(item1.get.xid, "two".getBytes)
+ q.close
+
+ val q2 = new PersistentQueue(folderName, "things", Config.fromMap(Map.empty))
+ q2.setup
+ q2.length mustEqual 1
+ q2.openTransactionCount mustEqual 0
+ new String(q2.remove.get.data) mustEqual "two"
+ q2.length mustEqual 0
+ }
+ }
+
"recreate the journal file when it gets too big" in {
withTempFolder {
val q = makeQueue("things", "max_journal_size" -> "1024", "max_journal_overflow" -> "3")

No commit comments for this range

Something went wrong with that request. Please try again.