Skip to content
This repository
  • 4 commits
  • 5 files changed
  • 0 comments
  • 1 contributor
4  project/build.properties
... ...
@@ -1,9 +1,9 @@
1 1
 #Project properties
2  
-#Wed Jan 12 13:36:46 PST 2011
  2
+#Tue Feb 22 15:27:40 PST 2011
3 3
 project.organization=net.lag
4 4
 project.name=kestrel
5 5
 sbt.version=0.7.4
6  
-project.version=1.2.9-SNAPSHOT
  6
+project.version=1.3.1-SNAPSHOT
7 7
 def.scala.version=2.7.7
8 8
 build.scala.versions=2.7.7
9 9
 project.initialize=false
2  project/build/KestrelProject.scala
@@ -8,7 +8,7 @@ class KestrelProject(info: ProjectInfo) extends StandardProject(info) with Subve
8 8
   val mina = "org.apache.mina" % "mina-core" % "2.0.0-M6"
9 9
   val slf4j_api = "org.slf4j" % "slf4j-api" % "1.5.2"
10 10
   val slf4j_jdk14 = "org.slf4j" % "slf4j-jdk14" % "1.5.2"
11  
-  val xrayspecs = "com.twitter" % "xrayspecs" % "1.0.7"
  11
+  val xrayspecs = "com.twitter" % "xrayspecs_2.7.7" % "2.1.2"
12 12
 
13 13
   val specs = "org.scala-tools.testing" % "specs" % "1.6.2.1" % "test"
14 14
 
21  src/main/scala/net/lag/kestrel/Journal.scala
@@ -34,6 +34,7 @@ object JournalItem {
34 34
   case class SavedXid(xid: Int) extends JournalItem
35 35
   case class Unremove(xid: Int) extends JournalItem
36 36
   case class ConfirmRemove(xid: Int) extends JournalItem
  37
+  case class Continue(item: QItem, xid: Int) extends JournalItem
37 38
   case object EndOfFile extends JournalItem
38 39
 }
39 40
 
@@ -65,7 +66,7 @@ class Journal(queuePath: String, syncJournal: => Boolean) {
65 66
   private val CMD_UNREMOVE = 5
66 67
   private val CMD_CONFIRM_REMOVE = 6
67 68
   private val CMD_ADD_XID = 7
68  
-
  69
+  private val CMD_CONTINUE = 8
69 70
 
70 71
   private def open(file: File): Unit = {
71 72
     writer = new FileOutputStream(file, true).getChannel
@@ -81,7 +82,7 @@ class Journal(queuePath: String, syncJournal: => Boolean) {
81 82
     open(tmpFile)
82 83
     size = 0
83 84
     for (item <- openItems) {
84  
-      addWithXid(item)
  85
+      addWithXidAndCommand(CMD_ADD_XID, item)
85 86
       removeTentative(false)
86 87
     }
87 88
     saveXid(xid)
@@ -124,9 +125,13 @@ class Journal(queuePath: String, syncJournal: => Boolean) {
124 125
 
125 126
   def add(item: QItem): Unit = add(true, item)
126 127
 
127  
-  // used only to list pending transactions when recreating the journal.
128  
-  private def addWithXid(item: QItem) = {
129  
-    val blob = item.pack(CMD_ADD_XID.toByte, true)
  128
+  def continue(xid: Int, item: QItem): Unit = {
  129
+    item.xid = xid
  130
+    addWithXidAndCommand(CMD_CONTINUE, item)
  131
+  }
  132
+
  133
+  private def addWithXidAndCommand(command: Int, item: QItem) = {
  134
+    val blob = item.pack(command.toByte, true)
130 135
     do {
131 136
       writer.write(blob)
132 137
     } while (blob.position < blob.limit)
@@ -174,6 +179,7 @@ class Journal(queuePath: String, syncJournal: => Boolean) {
174 179
       } else {
175 180
         readJournalEntry(rj) match {
176 181
           case (JournalItem.Add(item), _) => f(item)
  182
+          case (JournalItem.Continue(item, xid), _) => f(item)
177 183
           case (_, _) =>
178 184
         }
179 185
       }
@@ -267,6 +273,11 @@ class Journal(queuePath: String, syncJournal: => Boolean) {
267 273
             val item = QItem.unpack(data)
268 274
             item.xid = xid
269 275
             (JournalItem.Add(item), 9 + data.length)
  276
+          case CMD_CONTINUE =>
  277
+            val xid = readInt(in)
  278
+            val data = readBlock(in)
  279
+            val item = QItem.unpack(data)
  280
+            (JournalItem.Continue(item, xid), 9 + data.length)
270 281
           case n =>
271 282
             throw new BrokenItemException(lastPosition, new IOException("invalid opcode in journal: " + n.toInt + " at position " + (in.position - 1)))
272 283
         }
19  src/main/scala/net/lag/kestrel/PersistentQueue.scala
@@ -210,7 +210,7 @@ class PersistentQueue(persistencePath: String, val name: String,
210 210
   /**
211 211
    * Add a value to the end of the queue, transactionally.
212 212
    */
213  
-  def add(value: Array[Byte], expiry: Long): Boolean = synchronized {
  213
+  def add(value: Array[Byte], expiry: Long, xid: Option[Int]): Boolean = synchronized {
214 214
     if (closed || value.size > maxItemSize()) return false
215 215
     while (queueLength >= maxItems() || queueSize >= maxSize()) {
216 216
       if (!discardOldWhenFull()) return false
@@ -232,15 +232,25 @@ class PersistentQueue(persistencePath: String, val name: String,
232 232
         journal.startReadBehind
233 233
       }
234 234
     }
  235
+    if (xid != None) openTransactions.removeKey(xid.get)
235 236
     _add(item)
236  
-    if (keepJournal()) journal.add(item)
  237
+    if (keepJournal()) {
  238
+      xid match {
  239
+        case None => journal.add(item)
  240
+        case _    => journal.continue(xid.get, item)
  241
+      }
  242
+    }
237 243
     if (waiters.size > 0) {
238 244
       waiters.remove(0).actor ! ItemArrived
239 245
     }
240 246
     true
241 247
   }
242 248
 
243  
-  def add(value: Array[Byte]): Boolean = add(value, 0)
  249
+  def add(value: Array[Byte]): Boolean = add(value, 0, None)
  250
+  def add(value: Array[Byte], expiry: Long): Boolean = add(value, expiry, None)
  251
+
  252
+  def continue(xid: Int, value: Array[Byte]): Boolean = add(value, 0, Some(xid))
  253
+  def continue(xid: Int, value: Array[Byte], expiry: Long): Boolean = add(value, expiry, Some(xid))
244 254
 
245 255
   /**
246 256
    * Peek at the head item in the queue, if there is one.
@@ -465,6 +475,9 @@ class PersistentQueue(persistencePath: String, val name: String,
465 475
       case JournalItem.SavedXid(xid) => xidCounter = xid
466 476
       case JournalItem.Unremove(xid) => _unremove(xid)
467 477
       case JournalItem.ConfirmRemove(xid) => openTransactions.removeKey(xid)
  478
+      case JournalItem.Continue(item, xid) =>
  479
+        openTransactions.removeKey(xid)
  480
+        _add(item)
468 481
       case x => log.error("Unexpected item in journal: %s", x)
469 482
     }
470 483
 
21  src/test/scala/net/lag/kestrel/PersistentQueueSpec.scala
@@ -507,6 +507,27 @@ class PersistentQueueSpec extends Specification with TestHelper {
507 507
       }
508 508
     }
509 509
 
  510
+    "continue a queue item" in {
  511
+      withTempFolder {
  512
+        val q = new PersistentQueue(folderName, "things", Config.fromMap(Map.empty))
  513
+        q.setup
  514
+        q.add("one".getBytes)
  515
+
  516
+        val item1 = q.remove(true)
  517
+        new String(item1.get.data) mustEqual "one"
  518
+
  519
+        q.continue(item1.get.xid, "two".getBytes)
  520
+        q.close
  521
+
  522
+        val q2 = new PersistentQueue(folderName, "things", Config.fromMap(Map.empty))
  523
+        q2.setup
  524
+        q2.length mustEqual 1
  525
+        q2.openTransactionCount mustEqual 0
  526
+        new String(q2.remove.get.data) mustEqual "two"
  527
+        q2.length mustEqual 0
  528
+      }
  529
+    }
  530
+
510 531
     "recreate the journal file when it gets too big" in {
511 532
       withTempFolder {
512 533
         val q = makeQueue("things", "max_journal_size" -> "1024", "max_journal_overflow" -> "3")

No commit comments for this range

Something went wrong with that request. Please try again.