Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Commit

Permalink
re-implment
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh Hull committed Mar 2, 2011
1 parent fc3e556 commit ca5ac67
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 10 deletions.
Expand Up @@ -66,9 +66,7 @@ class KestrelJobQueue(queueName: String, val queue: PersistentQueue, codec: Json
queue.confirmRemove(qitem.xid)
}
def continue(job: JsonJob) = {
queue.confirmRemove(qitem.xid)
queue.add(codec.flatten(job))
//queue.continue(qitem.xid, codec.flatten(job)) FIXME, this needs to be implemented
queue.continue(qitem.xid, codec.flatten(job))
}
}
}
Expand Down
21 changes: 17 additions & 4 deletions src/main/scala/net/lag/kestrel/Journal.scala
Expand Up @@ -35,6 +35,7 @@ object JournalItem {
case class SavedXid(xid: Int) extends JournalItem
case class Unremove(xid: Int) extends JournalItem
case class ConfirmRemove(xid: Int) extends JournalItem
case class Continue(item: QItem, xid: Int) extends JournalItem
case object EndOfFile extends JournalItem
}

Expand Down Expand Up @@ -66,7 +67,7 @@ class Journal(queuePath: String, queueName: String, syncJournal: => Boolean, mul
private val CMD_UNREMOVE = 5
private val CMD_CONFIRM_REMOVE = 6
private val CMD_ADD_XID = 7

private val CMD_CONTINUE = 8

def this(fullPath: String, syncJournal: => Boolean) =
this(new File(fullPath).getParent(), new File(fullPath).getName(), syncJournal, false)
Expand Down Expand Up @@ -95,10 +96,15 @@ class Journal(queuePath: String, queueName: String, syncJournal: => Boolean, mul
open
}

def continue(xid: Int, item: QItem): Unit = {
item.xid = xid
addWithXidAndCommand(CMD_CONTINUE, item)
}

def dump(xid: Int, openItems: Seq[QItem], queue: Iterable[QItem]) {
size = 0
for (item <- openItems) {
addWithXid(item)
addWithXidAndCommand(CMD_ADD_XID, item)
removeTentative(false)
}
saveXid(xid)
Expand Down Expand Up @@ -140,8 +146,8 @@ class Journal(queuePath: String, queueName: String, syncJournal: => Boolean, mul
def add(item: QItem): Unit = add(true, item)

// used only to list pending transactions when recreating the journal.
private def addWithXid(item: QItem) = {
val blob = item.pack(CMD_ADD_XID.toByte, true)
private def addWithXidAndCommand(command: Int, item: QItem) = {
val blob = item.pack(command.toByte, true)
do {
writer.write(blob)
} while (blob.position < blob.limit)
Expand Down Expand Up @@ -193,6 +199,8 @@ class Journal(queuePath: String, queueName: String, syncJournal: => Boolean, mul
readJournalEntry(rj) match {
case (JournalItem.Add(item), _) =>
f(item)
case (JournalItem.Continue(item, xid), _) =>
f(item)
case (JournalItem.EndOfFile, _) =>
// move to next file and try again.
readerFilename = Journal.journalAfter(new File(queuePath), queueName, readerFilename.get)
Expand Down Expand Up @@ -298,6 +306,11 @@ class Journal(queuePath: String, queueName: String, syncJournal: => Boolean, mul
val item = QItem.unpack(data)
item.xid = xid
(JournalItem.Add(item), 9 + data.length)
case CMD_CONTINUE =>
val xid = readInt(in)
val data = readBlock(in)
val item = QItem.unpack(data)
(JournalItem.Continue(item, xid), 9 + data.length)
case n =>
throw new BrokenItemException(lastPosition, new IOException("invalid opcode in journal: " + n.toInt + " at position " + (in.position - 1)))
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/net/lag/kestrel/JournalPacker.scala
Expand Up @@ -70,6 +70,8 @@ class JournalPacker(filenames: Seq[String], newFilename: String) {
for ((item, itemsize) <- remover) {
item match {
case JournalItem.Add(qitem) =>
case JournalItem.Continue(qitem, xid) =>
openTransactions -= xid
case JournalItem.Remove =>
advanceAdder().get
case JournalItem.RemoveTentative =>
Expand Down
19 changes: 16 additions & 3 deletions src/main/scala/net/lag/kestrel/PersistentQueue.scala
Expand Up @@ -175,7 +175,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
/**
* Add a value to the end of the queue, transactionally.
*/
def add(value: Array[Byte], expiry: Option[Time]): Boolean = synchronized {
def add(value: Array[Byte], expiry: Option[Time], xid: Option[Int]): Boolean = synchronized {
if (closed || value.size > config.maxItemSize.inBytes) return false
if (config.fanoutOnly && !isFanout) return true
while (queueLength >= config.maxItems || queueSize >= config.maxSize.inBytes) {
Expand All @@ -199,15 +199,25 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
}
}
checkRotateJournal()
if (xid ne None) openTransactions.remove(xid.get)
_add(item)
if (config.keepJournal) journal.add(item)
if (config.keepJournal) {
xid match {
case None => journal.add(item)
case _ => journal.continue(xid.get, item)
}
}
if (waiters.size > 0) {
waiters.remove(0).actor ! ItemArrived
}
true
}

def add(value: Array[Byte]): Boolean = add(value, None)
def add(value: Array[Byte]): Boolean = add(value, None, None)
def add(value: Array[Byte], expiry: Option[Time]): Boolean = add(value, expiry, None)

def continue(xid: Int, value: Array[Byte]): Boolean = add(value, None, Some(xid))
def continue(xid: Int, value: Array[Byte], expiry: Option[Time]): Boolean = add(value, expiry, Some(xid))

/**
* Peek at the head item in the queue, if there is one.
Expand Down Expand Up @@ -434,6 +444,9 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
case JournalItem.SavedXid(xid) => xidCounter = xid
case JournalItem.Unremove(xid) => _unremove(xid)
case JournalItem.ConfirmRemove(xid) => openTransactions.remove(xid)
case JournalItem.Continue(item, xid) =>
openTransactions.remove(xid)
_add(item)
case x => log.error("Unexpected item in journal: %s", x)
}

Expand Down

0 comments on commit ca5ac67

Please sign in to comment.