Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

remove use of Serialization from libkestrel

RB_ID=97752
  • Loading branch information...
commit c02fa91f6c13a2dc2d3f2a22c9a9523b36241f74 1 parent 531894c
Stephan Zuercher authored
View
1  qtest
@@ -1 +0,0 @@
-java -Xms1024m -Xmx1024m -jar ./dist/libkestrel/libkestrel-1.0.0-SNAPSHOT.jar "$@"
View
109 src/main/scala/com/twitter/libkestrel/DeadlineWaitQueue.scala
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2011 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.twitter.libkestrel
+
+import java.util.LinkedHashSet
+import scala.collection.JavaConversions
+import com.twitter.util.{Time, Timer, TimerTask}
+
+sealed trait Waiter {
+ def awaken: () => Unit
+ def timeout: () => Unit
+ def cancel(): Unit
+}
+
+case class InfiniteWaiter(val awaken: () => Unit, val timeout: () => Unit) extends Waiter {
+ def cancel() { }
+}
+
+case class DeadlineWaiter(var timerTask: TimerTask, val awaken: () => Unit, val timeout: () => Unit)
+extends Waiter {
+ def cancel() {
+ if (timerTask ne null) timerTask.cancel()
+ }
+}
+
+/**
+ * A wait queue where each item has a timeout.
+ * On each `trigger()`, one waiter is awoken (the awaken function is called). If the timeout is
+ * triggered by the Timer, the timeout function will be called instead. The queue promises that
+ * exactly one of the functions will be called, never both.
+ */
+final class DeadlineWaitQueue(timer: Timer) {
+
+ private val queue = JavaConversions.asScalaSet(new LinkedHashSet[Waiter])
+
+ def add(deadline: Deadline, awaken: () => Unit, timeout: () => Unit) = {
+ val waiter: Waiter =
+ deadline match {
+ case Before(time) =>
+ val deadlineWaiter = DeadlineWaiter(null, awaken, timeout)
+ val timerTask = timer.schedule(time) {
+ if (synchronized { queue.remove(deadlineWaiter) }) deadlineWaiter.timeout()
+ }
+ deadlineWaiter.timerTask = timerTask
+ deadlineWaiter
+ case Forever => InfiniteWaiter(awaken, timeout)
+ }
+
+ synchronized { queue.add(waiter) }
+ waiter
+ }
+
+ def remove(waiter: Waiter) {
+ synchronized { queue.remove(waiter) }
+ waiter.cancel()
+ }
+
+ def trigger() {
+ synchronized {
+ queue.headOption.map { waiter =>
+ queue.remove(waiter)
+ waiter
+ }
+ }.foreach { waiter =>
+ waiter.cancel()
+ waiter.awaken()
+ }
+ }
+
+ def triggerAll() {
+ synchronized {
+ val rv = queue.toArray
+ queue.clear()
+ rv
+ }.foreach { waiter =>
+ waiter.cancel()
+ waiter.awaken()
+ }
+ }
+
+ def evictAll() {
+ synchronized {
+ val rv = queue.toArray
+ queue.clear()
+ rv
+ }.foreach { waiter =>
+ waiter.cancel()
+ waiter.timeout()
+ }
+ }
+
+ def size() = {
+ synchronized { queue.size }
+ }
+}
View
15 src/main/scala/com/twitter/libkestrel/ItemIdList.scala
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2011 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package com.twitter.libkestrel
import scala.annotation.tailrec
View
493 src/main/scala/com/twitter/libkestrel/Journal.scala
@@ -16,13 +16,13 @@
package com.twitter.libkestrel
-import com.twitter.concurrent.Serialized
import com.twitter.conversions.storage._
import com.twitter.logging.Logger
import com.twitter.util._
import java.io.{File, FileOutputStream, IOException}
import java.nio.ByteBuffer
-import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.{ConcurrentLinkedDeque, ScheduledExecutorService}
+import java.util.concurrent.atomic.{AtomicInteger,AtomicLong}
import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.mutable
@@ -59,6 +59,8 @@ object Journal {
new Journal(queuePath, queueName, maxFileSize, scheduler, syncJournal, saveArchivedJournals)
}
}
+
+ private[libkestrel] val NoCondition = (_: QueueItem) => true
}
/**
@@ -74,9 +76,14 @@ class Journal(
scheduler: ScheduledExecutorService,
syncJournal: Duration,
saveArchivedJournals: Option[File]
-) extends Serialized {
+) extends ReadWriteLock {
private[this] val log = Logger.get(getClass)
+ // NOTE ON LOCKING:
+ // - take the write lock to modify the journal or data structures related to writing the journal
+ // - take the read lock to read the journal or data structures shared by the Journal and its
+ // Reader(s)
+
val prefix = new File(queuePath, queueName)
@volatile var idMap = immutable.TreeMap.empty[Long, FileInfo]
@@ -172,10 +179,9 @@ class Journal(
// find the earliest possible head id
private[this] def earliestHead = {
- if (idMap.size == 0) {
- 0L
- } else {
- idMap.head match { case (id, file) => id }
+ idMap.headOption match {
+ case Some((id, file)) => id
+ case None => 0L
}
}
@@ -235,22 +241,22 @@ class Journal(
}
private[this] def openJournal() {
- if (idMap.size > 0) {
- val (id, fileInfo) = idMap.last
- try {
- _journalFile = JournalFile.append(fileInfo.file, scheduler, syncJournal, maxFileSize)
- _tailId = fileInfo.tailId
- currentItems = fileInfo.items
- currentBytes = fileInfo.bytes
- } catch {
- case e: IOException => {
- log.error("Unable to open journal %s; aborting!", fileInfo.file)
- throw e
+ idMap.lastOption match {
+ case Some((id, fileInfo)) =>
+ try {
+ _journalFile = JournalFile.append(fileInfo.file, scheduler, syncJournal, maxFileSize)
+ _tailId = fileInfo.tailId
+ currentItems = fileInfo.items
+ currentBytes = fileInfo.bytes
+ } catch {
+ case e: IOException => {
+ log.error("Unable to open journal %s; aborting!", fileInfo.file)
+ throw e
+ }
}
- }
- } else {
- log.info("No transaction journal for '%s'; starting with empty queue.", queueName)
- rotate()
+ case None =>
+ log.info("No transaction journal for '%s'; starting with empty queue.", queueName)
+ rotate()
}
}
@@ -267,45 +273,50 @@ class Journal(
private[this] def checkOldFiles() {
val minHead = readerMap.values.foldLeft(tail) { (n, r) => n min (r.head + 1) }
// all the files that start off with unreferenced ids, minus the last. :)
- idMap.takeWhile { case (id, fileInfo) => id <= minHead }.dropRight(1).foreach { case (id, fileInfo) =>
- log.info("Erasing unused journal file for '%s': %s", queueName, fileInfo.file)
- idMap = idMap - id
- if (saveArchivedJournals.isDefined) {
- val archiveFile = new File(saveArchivedJournals.get, "archive~" + fileInfo.file.getName)
- fileInfo.file.renameTo(archiveFile)
- } else {
- fileInfo.file.delete()
+ withWriteLock {
+ idMap.takeWhile { case (id, fileInfo) => id <= minHead }.dropRight(1).foreach { case (id, fileInfo) =>
+ log.info("Erasing unused journal file for '%s': %s", queueName, fileInfo.file)
+ idMap = idMap - id
+ if (saveArchivedJournals.isDefined) {
+ val archiveFile = new File(saveArchivedJournals.get, "archive~" + fileInfo.file.getName)
+ fileInfo.file.renameTo(archiveFile)
+ } else {
+ fileInfo.file.delete()
+ }
}
}
}
private[this] def rotate() {
- if (_journalFile ne null) {
- // fix up id map to have the new item/byte count
- idMap.last match { case (id, info) =>
- idMap += (id -> FileInfo(_journalFile.file, id, _tailId, currentItems, currentBytes))
+ withWriteLock {
+ if (_journalFile ne null) {
+ // fix up id map to have the new item/byte count
+ idMap.last match { case (id, info) =>
+ idMap += (id -> FileInfo(_journalFile.file, id, _tailId, currentItems, currentBytes))
+ }
}
+ // open new file
+ var newFile = uniqueFile(new File(queuePath, queueName + "."))
+ val oldJournalFile = _journalFile
+ _journalFile = JournalFile.create(newFile, scheduler, syncJournal, maxFileSize)
+ if (oldJournalFile eq null) {
+ log.info("Rotating %s to %s", queueName, newFile)
+ } else {
+ log.info("Rotating %s from %s (%s) to %s", queueName, oldJournalFile.file,
+ oldJournalFile.position.bytes.toHuman, newFile)
+ oldJournalFile.close()
+ }
+ currentItems = 0
+ currentBytes = 0
+ idMap += (_tailId + 1 -> FileInfo(newFile, _tailId + 1, 0, 0, 0L))
+ checkOldFiles()
}
- // open new file
- var newFile = uniqueFile(new File(queuePath, queueName + "."))
- if (_journalFile eq null) {
- log.info("Rotating %s to %s", queueName, newFile)
- } else {
- log.info("Rotating %s from %s (%s) to %s", queueName, _journalFile.file,
- _journalFile.position.bytes.toHuman, newFile)
- _journalFile.close()
- }
- _journalFile = JournalFile.create(newFile, scheduler, syncJournal, maxFileSize)
- currentItems = 0
- currentBytes = 0
- idMap += (_tailId + 1 -> FileInfo(newFile, _tailId + 1, 0, 0, 0L))
- checkOldFiles()
}
def reader(name: String): Reader = {
readerMap.get(name).getOrElse {
// grab a lock so only one thread does this potentially slow thing at once
- synchronized {
+ withWriteLock {
readerMap.get(name).getOrElse {
val file = new File(queuePath, queueName + ".read." + name)
val reader = readerMap.get("") match {
@@ -314,8 +325,7 @@ class Journal(
val oldFile = r.file
r.file = file
r.name = name
- r.commit(0L)
- r.checkpoint()
+ r.forceCheckpoint()
oldFile.delete()
readerMap = readerMap - ""
r
@@ -336,7 +346,7 @@ class Journal(
// rare operation: destroy a reader.
def dropReader(name: String) {
- synchronized {
+ withWriteLock {
readerMap.get(name) foreach { reader =>
readerMap -= name
reader.file.delete()
@@ -346,47 +356,50 @@ class Journal(
}
def journalSize: Long = {
- val files = writerFiles()
+ val (files, position) = withReadLock { (writerFiles(), _journalFile.position) }
val fileSizes = files.foldLeft(0L) { (sum, file) => sum + file.length() }
- fileSizes - files.last.length() + _journalFile.position
+ fileSizes - files.last.length() + position
}
- def tail = _tailId
+ def tail = withReadLock { _tailId }
def close() {
- readerMap.values.foreach { reader =>
- reader.checkpoint()
- reader.close()
+ withWriteLock {
+ readerMap.values.foreach { reader =>
+ reader.checkpoint()
+ reader.close()
+ }
+ readerMap = immutable.Map.empty[String, Reader]
+ _journalFile.close()
}
- readerMap = immutable.Map.empty[String, Reader]
- _journalFile.close()
}
/**
* Get rid of all journal files for this queue.
*/
def erase() {
- close()
- readerFiles().foreach { _.delete() }
- writerFiles().foreach { _.delete() }
- removeTemporaryFiles()
+ withWriteLock {
+ close()
+ readerFiles().foreach { _.delete() }
+ writerFiles().foreach { _.delete() }
+ removeTemporaryFiles()
+ }
}
- def checkpoint(): Future[Unit] = {
- val futures = readerMap.map { case (name, reader) =>
+ def checkpoint() {
+ val readers = withReadLock { readerMap.toList }
+ readers.foreach { case (_, reader) =>
reader.checkpoint()
}
- serialized {
+ withWriteLock {
checkOldFiles()
}
- Future.join(futures.toSeq)
}
def put(
- data: ByteBuffer, addTime: Time, expireTime: Option[Time], errorCount: Int = 0,
- f: QueueItem => Unit = { _ => () }
- ): Future[(QueueItem, Future[Unit])] = {
- serialized {
+ data: ByteBuffer, addTime: Time, expireTime: Option[Time],
+ errorCount: Int = 0): (QueueItem, Future[Unit]) = {
+ val (item, future) = withWriteLock {
val id = _tailId + 1
val item = QueueItem(id, addTime, expireTime, data, errorCount)
if (_journalFile.position + _journalFile.storageSizeOf(item) > maxFileSize.inBytes) rotate()
@@ -395,10 +408,18 @@ class Journal(
val future = _journalFile.put(item)
currentItems += 1
currentBytes += data.remaining
- // give the caller a chance to run some other code serialized:
- f(item)
+
+ readerMap.values.foreach { reader =>
+ reader.itemCount.getAndIncrement()
+ reader.byteCount.getAndAdd(item.dataSize)
+ reader.putCount.getAndIncrement()
+ reader.putBytes.getAndAdd(item.dataSize)
+ }
+
(item, future)
}
+
+ (item, future)
}
/**
@@ -407,17 +428,28 @@ class Journal(
* have been read out of order, usually because they refer to transactional reads that were
* confirmed out of order.
*/
- case class Reader(_name: String, _file: File) extends Serialized {
- @volatile var file: File = _file
- @volatile var name: String = _name
+ class Reader(@volatile var name: String, @volatile var file: File) {
@volatile var haveReadState: Boolean = false
@volatile private[this] var dirty = true
+ // NOTE ON LOCKING: withReadLock takes the Journal's read lock. Use synchronization to
+ // protect the Reader's data from concurrent modification by multiple callers on THIS reader
+ // instance.
+
+ private[this] var journalFile: JournalFileReader = _
private[this] var _head = 0L
+ private[this] var _next = 0L
private[this] val _doneSet = new mutable.HashSet[Long]
- private[this] var readBehind: Option[Scanner] = None
- def readState() {
+ val itemCount = new AtomicInteger(0)
+ val byteCount = new AtomicLong(0)
+
+ val putCount = new AtomicInteger(0)
+ val putBytes = new AtomicLong(0)
+
+ private val rejectedQueue = new ConcurrentLinkedDeque[QueueItem]()
+
+ private[libkestrel] def readState() {
val bookmarkFile = BookmarkFile.open(file)
try {
bookmarkFile.foreach { entry =>
@@ -436,170 +468,227 @@ class Journal(
log.debug("Read checkpoint %s+%s: head=%s done=(%s)", queueName, name, _head, _doneSet.toSeq.sorted.mkString(","))
}
- /**
- * To avoid a race while setting up a new reader, call this after initialization to reset the
- * head of the queue.
- */
- def catchUp() {
- if (!haveReadState) head = _tailId
- dirty = true
- }
+ private[this] def withReadLock[A](f: => A) = Journal.this.withReadLock(f)
- /**
- * Rewrite the reader file with the current head and out-of-order committed reads.
- */
- def checkpoint(): Future[Unit] = {
- // FIXME really this should go in another thread. doesn't need to happen inline.
- serialized {
- if (dirty) {
- dirty = false
- val head = _head
- val doneSet = _doneSet
- log.debug("Checkpoint %s+%s: head=%s done=(%s)", queueName, name, head, doneSet.toSeq.sorted.mkString(","))
- val newFile = uniqueFile(new File(file.getParent, file.getName + "~~"))
- val newBookmarkFile = BookmarkFile.create(newFile)
- newBookmarkFile.readHead(head)
- newBookmarkFile.readDone(doneSet.toSeq.sorted)
- newBookmarkFile.close()
- newFile.renameTo(file)
+ def open() {
+ synchronized {
+ withReadLock {
+ close()
+ readState()
+
+ val fileInfo = fileInfoForId(_head).getOrElse { idMap(earliestHead) }
+ log.info("Reader %s+%s starting on: %s", queueName, name, fileInfo.file)
+ val jf = JournalFile.open(fileInfo.file)
+ var initialItems = 0
+ var initialBytes = 0L
+ if (_head >= earliestHead) {
+ var lastId = -1L
+ var readItems = 0
+ var readBytes = 0L
+ while (lastId < _head) {
+ jf.readNext() match {
+ case None => lastId = _head + 1 // end of last file
+ case Some(Record.Put(QueueItem(id, _, _, data, _))) =>
+ lastId = id
+ readItems += 1
+ readBytes += data.remaining
+ case _ => ()
+ }
+ }
+ initialItems = fileInfo.items - readItems
+ initialBytes = fileInfo.bytes - readBytes
+ }
+ val afterFiles = fileInfosAfter(_head + 1)
+ itemCount.set(afterFiles.foldLeft(initialItems) { case (items, fileInfo) =>
+ items + fileInfo.items })
+ byteCount.set(afterFiles.foldLeft(initialBytes) { case (bytes, fileInfo) =>
+ bytes + fileInfo.bytes })
+ _next = _head + 1
+ journalFile = jf
}
}
}
- def head: Long = this._head
- def doneSet: Set[Long] = _doneSet.toSeq.toSet
- def tail: Long = Journal.this._tailId
-
- def head_=(id: Long) {
- _head = id
- _doneSet.retain { _ > head }
- dirty = true
+ private[this] def nextFile(): Boolean = {
+ val fileInfoCandidate = withReadLock { fileInfoForId(_next) }
+ fileInfoCandidate match {
+ case Some(fileInfo) if (fileInfo.file != journalFile.file) =>
+ journalFile.close()
+ journalFile = null
+ log.info("Reader %s+%s moving to: %s", queueName, name, fileInfo.file)
+ journalFile = JournalFile.open(fileInfo.file)
+ true
+ case _ =>
+ false
+ }
}
- def commit(id: Long) {
- if (id == _head + 1) {
- _head += 1
- while (_doneSet contains _head + 1) {
- _head += 1
- _doneSet.remove(_head)
+ @tailrec
+ private[this] def next(peek: Boolean, cond: QueueItem => Boolean): Option[QueueItem] = {
+ val retryItem = Option(rejectedQueue.poll()).flatMap { item =>
+ if (peek) {
+ val result = Some(item.copy(data = item.data.duplicate()))
+ rejectedQueue.addFirst(item)
+ result
+ } else if (cond(item)) {
+ Some(item)
+ } else {
+ rejectedQueue.addFirst(item)
+ None
}
- } else if (id > _head) {
- _doneSet.add(id)
}
- dirty = true
- }
+ if (retryItem.isDefined) return retryItem
- /**
- * Discard all items and catch up with the main queue.
- */
- def flush() {
- _head = _tailId
- _doneSet.clear()
- endReadBehind()
- dirty = true
- }
+ synchronized {
+ if (withReadLock { _next > tail }) {
+ return None
+ }
- def close() {
- endReadBehind()
+ val mark = journalFile.position
+ journalFile.readNext() match {
+ case Some(Record.Put(item)) if peek =>
+ val newItem = item.copy(data = item.data.duplicate())
+ journalFile.rewind(mark)
+ return Some(newItem)
+ case Some(Record.Put(item)) =>
+ if (cond(item)) {
+ _next = item.id + 1
+ return Some(item)
+ } else {
+ journalFile.rewind(mark)
+ return None
+ }
+ case None =>
+ if (!nextFile()) {
+ return None
+ }
+ case _ => ()
+ }
+ }
+
+ next(peek, cond)
}
- def inReadBehind = readBehind.isDefined
+ def next(): Option[QueueItem] = {
+ next(false, Journal.NoCondition)
+ }
- def readBehindId = readBehind.get.id
+ def nextIf(f: QueueItem => Boolean): Option[QueueItem] = {
+ next(false, f)
+ }
- /**
- * Open the journal file containing a given item, so we can read items directly out of the
- * file. This means the queue no longer wants to try keeping every item in memory.
- */
- def startReadBehind(id: Long) {
- readBehind = Some(new Scanner(id, followFiles = true, logIt = true))
+ final def peekOldest(): Option[QueueItem] = {
+ next(true, Journal.NoCondition)
}
/**
- * Read & return the next item in the read-behind journals.
- * If we've caught up, turn off read-behind and return None.
+ * Test if this reader is caught up with the writer.
*/
- def nextReadBehind(): Option[QueueItem] = {
- val rv = readBehind.get.next()
- if (rv == None) readBehind = None
- rv
- }
+ def isCaughtUp = synchronized { withReadLock { _next > tail } }
/**
- * End read-behind mode, and close any open journal file.
+ * To avoid a race while setting up a new reader, call this after initialization to reset the
+ * head of the queue.
*/
- def endReadBehind() {
- readBehind.foreach { _.end() }
- readBehind = None
+ def catchUp() {
+ synchronized {
+ withReadLock {
+ if (!haveReadState) head = _tailId
+ dirty = true
+ }
+ }
}
/**
- * Scan forward through journals from a specific starting point.
+ * Rewrite the reader file with the current head and out-of-order committed reads.
*/
- class Scanner(startId: Long, followFiles: Boolean = true, logIt: Boolean = false) {
- private[this] var journalFile: JournalFileReader = _
- var id = 0L
-
- start()
-
- def start() {
- val fileInfo = fileInfoForId(startId).getOrElse { idMap(earliestHead) }
- val jf = JournalFile.open(fileInfo.file)
- if (startId >= earliestHead) {
- var lastId = -1L
- while (lastId < startId) {
- jf.readNext() match {
- case None => {
- // just end read-behind immediately.
- id = tail
- return
- }
- case Some(Record.Put(QueueItem(id, _, _, _, _))) => lastId = id
- case _ =>
- }
+ def checkpoint() {
+ val checkpointData =
+ synchronized {
+ if (dirty) {
+ dirty = false
+ Some(_head, _doneSet.toSeq)
+ } else {
+ None
}
}
- journalFile = jf
- id = startId
+
+ checkpointData.foreach { case (head, doneSeq) =>
+ val sortedDoneSeq = doneSeq.sorted
+ log.debug("Checkpoint %s+%s: head=%s done=(%s)", queueName, name, head, sortedDoneSeq.mkString(","))
+ val newFile = uniqueFile(new File(file.getParent, file.getName + "~~"))
+ val newBookmarkFile = BookmarkFile.create(newFile)
+ newBookmarkFile.readHead(head)
+ newBookmarkFile.readDone(sortedDoneSeq)
+ newBookmarkFile.close()
+ newFile.renameTo(file)
}
+ }
- @tailrec
- final def next(): Option[QueueItem] = {
- if (id == tail) {
- end()
- return None
- }
+ private[libkestrel] def forceCheckpoint() {
+ synchronized { dirty = true }
+ checkpoint()
+ }
- journalFile.readNext() match {
- case None => {
- journalFile.close()
- journalFile = null
- if (followFiles) {
- val fileInfo = fileInfoForId(id + 1)
- if (!fileInfo.isDefined) throw new IOException("Unknown id")
- if (logIt) log.debug("Read-behind for %s+%s moving to: %s", queueName, name, fileInfo.get.file)
- journalFile = JournalFile.open(fileInfo.get.file)
- next()
- } else {
- end()
- None
- }
+ def head: Long = this._head
+ def doneSet: Set[Long] = _doneSet.toSeq.toSet
+ def tail: Long = Journal.this._tailId
+
+ def head_=(id: Long) {
+ synchronized {
+ _head = id
+ _doneSet.retain { _ > head }
+ dirty = true
+ }
+ }
+
+ def commit(item: QueueItem) {
+ val id = item.id
+ synchronized {
+ if (id == _head + 1) {
+ _head += 1
+ while (_doneSet contains _head + 1) {
+ _head += 1
+ _doneSet.remove(_head)
}
- case Some(Record.Put(item)) => {
- id = item.id
- Some(item)
+ } else if (id > _head) {
+ _doneSet.add(id)
+ }
+ dirty = true
+ itemCount.getAndDecrement()
+ byteCount.getAndAdd(-item.dataSize)
+ }
+ }
+
+ def unget(item: QueueItem) {
+ rejectedQueue.add(item)
+ }
+
+ /**
+ * Discard all items and catch up with the main queue.
+ */
+ def flush() {
+ synchronized {
+ withReadLock {
+ val currentTail = _tailId
+ while(_head < currentTail) {
+ next().foreach { commit(_) }
}
- case _ => next()
+ dirty = true
+ checkpoint()
}
}
+ }
- def end() {
- if (logIt) log.info("Leaving read-behind for %s+%s", queueName, name)
+ def close() {
+ synchronized {
if (journalFile ne null) {
journalFile.close()
journalFile = null
}
}
}
+
+ override def toString() = "Reader(%s, %s)".format(name, file)
}
}
View
387 src/main/scala/com/twitter/libkestrel/JournaledQueue.scala
@@ -16,7 +16,6 @@
package com.twitter.libkestrel
-import com.twitter.concurrent.Serialized
import com.twitter.conversions.storage._
import com.twitter.conversions.time._
import com.twitter.logging.Logger
@@ -24,20 +23,20 @@ import com.twitter.util._
import java.io.File
import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService}
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.{AtomicInteger,AtomicLong}
+import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.JavaConverters._
import config._
/**
- * An optionally-journaled queue built on top of `ConcurrentBlockingQueue` that may have multiple
- * "readers".
+ * A journaled queue built on top of `ConcurrentBlockingQueue` that may have multiple "readers".
*
* When an item is added to a queue, it's journaled and passed on to any readers. There is always
- * at least one reader, and the reader contains the actual in-memory queue. If there are multiple
- * readers, they behave as multiple independent queues, each receiving a copy of each item added,
- * but sharing a single journal. They may have different policies on memory use, queue size
- * limits, and item expiration.
+ * at least one reader, and the reader contains the actual queue. If there are multiple readers,
+ * they behave as multiple independent queues, each receiving a copy of each item added, but
+ * sharing a single journal. They may have different policies on memory use, queue size limits,
+ * and item expiration.
*
* Items are read only from readers. When an item is available, it's set aside as an "open read",
* but not committed to the journal. A separate call is made to either commit the item or abort
@@ -57,7 +56,7 @@ import config._
*/
class JournaledQueue(
val config: JournaledQueueConfig, path: File, timer: Timer, scheduler: ScheduledExecutorService
-) extends Serialized {
+) {
private[this] val log = Logger.get(getClass)
private[this] val NAME_REGEX = """[^A-Za-z0-9:_-]""".r
@@ -65,22 +64,14 @@ class JournaledQueue(
throw new Exception("Illegal queue name: " + config.name)
}
- private[this] val journal = if (config.journaled) {
- Some(new Journal(path, config.name, config.journalSize, scheduler, config.syncJournal,
- config.saveArchivedJournals))
- } else {
- None
- }
+ private[JournaledQueue] val journal =
+ new Journal(path, config.name, config.journalSize, scheduler, config.syncJournal,
+ config.saveArchivedJournals)
@volatile private[this] var closed = false
@volatile private[this] var readerMap = immutable.Map.empty[String, Reader]
-
- @volatile private[this] var nonJournalId = 0L
-
- journal.foreach { j =>
- j.readerMap.foreach { case (name, _) => reader(name) }
- }
+ journal.readerMap.foreach { case (name, _) => reader(name) }
// checkpoint readers on a schedule.
timer.schedule(config.checkpointTimer) { checkpoint() }
@@ -105,9 +96,7 @@ class JournaledQueue(
/**
* Total number of bytes of data used by the on-disk journal.
*/
- def journalBytes = {
- journal map { _.journalSize } getOrElse(0L)
- }
+ def journalBytes = journal.journalSize
/**
* Get the named reader. If this is a normal (single reader) queue, the default reader is named
@@ -124,7 +113,7 @@ class JournaledQueue(
val readerConfig = config.readerConfigs.get(name).getOrElse(config.defaultReaderConfig)
log.info("Creating reader queue %s+%s", config.name, name)
val reader = new Reader(name, readerConfig)
- journal.foreach { j => reader.loadFromJournal(j.reader(name)) }
+ reader.loadFromJournal(journal.reader(name))
readerMap += (name -> reader)
reader.catchUp()
@@ -132,8 +121,7 @@ class JournaledQueue(
readerMap.get("") foreach { r =>
// kill the default reader.
readerMap -= ""
- r.close()
- journal.foreach { _.dropReader("") }
+ journal.dropReader("")
}
}
@@ -144,12 +132,12 @@ class JournaledQueue(
}
def dropReader(name: String) {
- readerMap.get(name) foreach { r =>
- synchronized {
+ synchronized {
+ readerMap.get(name) foreach { r =>
log.info("Destroying reader queue %s+%s", config.name, name)
readerMap -= name
r.close()
- journal.foreach { j => j.dropReader(name) }
+ journal.dropReader(name)
}
}
}
@@ -158,25 +146,31 @@ class JournaledQueue(
* Save the state of all readers.
*/
def checkpoint() {
- journal.foreach { _.checkpoint() }
+ journal.checkpoint()
}
/**
* Close this queue. Any further operations will fail.
*/
def close() {
- closed = true
- readerMap.values.foreach { _.close() }
- journal.foreach { _.close() }
+ synchronized {
+ closed = true
+ readerMap.values.foreach { _.close() }
+ journal.close()
+ readerMap = Map()
+ }
}
/**
* Close this queue and also erase any journal files.
*/
def erase() {
- closed = true
- readerMap.values.foreach { _.close() }
- journal.foreach { _.erase() }
+ synchronized {
+ closed = true
+ readerMap.values.foreach { _.close() }
+ journal.erase()
+ readerMap = Map()
+ }
}
/**
@@ -199,7 +193,7 @@ class JournaledQueue(
* of the queue. Returns the total number of items expired across all readers.
*/
def discardExpired(applyMaxExpireSweep: Boolean): Int = {
- readerMap.values.foldLeft(0) { _ + _.discardExpired(applyMaxExpireSweep)() }
+ readerMap.values.foldLeft(0) { _ + _.discardExpired(applyMaxExpireSweep) }
}
/**
@@ -221,20 +215,11 @@ class JournaledQueue(
return None
}
- Some(journal.map { j =>
- j.put(data, addTime, expireTime, errorCount, { item =>
- readerMap.values.foreach { _.put(item) }
- }).flatMap { case (item, syncFuture) =>
- syncFuture
- }
- }.getOrElse {
- // no journal
- serialized {
- nonJournalId += 1
- val item = QueueItem(nonJournalId, addTime, expireTime, data)
- readerMap.values.foreach { _.put(item) }
- }
- })
+ val (_, syncFuture) = journal.put(data, addTime, expireTime, errorCount)
+
+ readerMap.values.foreach { _.postPut() }
+
+ Some(syncFuture)
}
/**
@@ -274,20 +259,22 @@ class JournaledQueue(
/**
* A reader for this queue, which has its own head pointer and list of open reads.
*/
- class Reader(val name: String, val readerConfig: JournaledQueueReaderConfig)
- extends Serialized
- {
- val journalReader = journal.map { _.reader(name) }
- private[libkestrel] val queue = ConcurrentBlockingQueue[QueueItem](timer)
-
- @volatile var items = 0
- @volatile var bytes = 0L
- @volatile var memoryItems = 0
- @volatile var memoryBytes = 0L
+ class Reader(val name: String, val readerConfig: JournaledQueueReaderConfig) {
+ val journalReader = journal.reader(name)
private val openReads = new ConcurrentHashMap[Long, QueueItem]()
/**
+ * The current value of `itemCount`.
+ */
+ def items = journalReader.itemCount.get
+
+ /**
+ * The current value of `byteCount`.
+ */
+ def bytes = journalReader.byteCount.get
+
+ /**
* When was this reader created?
*/
val createTime = Time.now
@@ -305,12 +292,12 @@ class JournaledQueue(
/**
* Total number of items ever added to this queue.
*/
- val putCount = new AtomicLong(0)
+ def putCount = journalReader.putCount
/**
* Total number of bytes ever added to this queue.
*/
- val putBytes = new AtomicLong(0)
+ def putBytes = journalReader.putBytes
/**
* Total number of items ever successfully fetched from this queue.
@@ -328,7 +315,8 @@ class JournaledQueue(
val expiredCount = new AtomicLong(0)
/**
- * Total number of items ever discarded from this queue.
+ * Total number of items ever discarded from this queue. Discards occur when the queue
+ * reaches its configured maximum size or maximum number of items.
*/
val discardedCount = new AtomicLong(0)
@@ -338,19 +326,19 @@ class JournaledQueue(
val openedItemCount = new AtomicLong(0)
/**
- * Total number of canceled reads (transactions) on this queue.
+ * Total number of committed reads (transactions) on this queue.
*/
- val canceledItemCount = new AtomicLong(0)
+ val committedItemCount = new AtomicLong(0)
/**
- * Number of consumers waiting for an item to arrive.
+ * Total number of canceled reads (transactions) on this queue.
*/
- def waiterCount: Int = queue.waiterCount
+ val canceledItemCount = new AtomicLong(0)
/**
- * Number of items dropped because the queue was full.
+ * Number of consumers waiting for an item to arrive.
*/
- def droppedCount: Int = queue.droppedCount.get
+ def waiterCount: Int = waiters.size
/**
* Number of times this queue has been flushed.
@@ -367,66 +355,37 @@ class JournaledQueue(
def writer: JournaledQueue = JournaledQueue.this
- /**
- * Age of the oldest item in this queue or 0 if the queue is empty.
- */
- def age: Duration =
- queue.peekOldest match {
- case Some(item) => Time.now - item.addTime
- case None => 0.nanoseconds
- }
+ val waiters = new DeadlineWaitQueue(timer)
/*
* in order to reload the contents of a queue at startup, we need to:
* - read the last saved state (head id, done ids)
* - start at the head, and read in items (ignoring ones already done) until we hit the
- * memory limit for read-behind. if we hit that limit:
+ * current item and then:
* - count the items left in the current journal file
* - add in the summarized counts from the remaining journal files, if any.
*/
private[libkestrel] def loadFromJournal(j: Journal#Reader) {
- log.info("Replaying contents of %s+%s", config.name, name)
- j.readState()
- val scanner = new j.Scanner(j.head, followFiles = true, logIt = false)
-
- var inReadBehind = false
- var lastId = 0L
- var optItem = scanner.next()
- while (optItem.isDefined) {
- val item = optItem.get
- lastId = item.id
- if (!inReadBehind) {
- queue.put(item)
- memoryItems += 1
- memoryBytes += item.dataSize
- }
- items += 1
- bytes += item.dataSize
- if (bytes >= readerConfig.maxMemorySize.inBytes && !inReadBehind) {
- j.startReadBehind(item.id)
- inReadBehind = true
- }
- optItem = scanner.next()
- }
- scanner.end()
- if (inReadBehind) {
- // count items/bytes from remaining journals.
- journal.get.fileInfosAfter(lastId).foreach { info =>
- items += info.items
- bytes += info.bytes
- }
- }
-
- log.info("Replaying contents of %s+%s done: %d items, %s, %s in memory",
- config.name, name, items, bytes.bytes.toHuman, memoryBytes.bytes.toHuman)
+ log.info("Restoring state of %s+%s", config.name, name)
+ j.open()
}
+ /**
+ * Age of the oldest item in this queue or 0 if the queue is empty.
+ */
+ def age: Duration =
+ journalReader.peekOldest() match {
+ case Some(item) =>
+ Time.now - item.addTime
+ case None => 0.nanoseconds
+ }
+
def catchUp() {
- journalReader.foreach { _.catchUp() }
+ journalReader.catchUp()
}
def checkpoint() {
- journalReader.foreach { _.checkpoint() }
+ journalReader.checkpoint()
}
def canPut: Boolean = {
@@ -434,52 +393,24 @@ class JournaledQueue(
(items < readerConfig.maxItems && bytes < readerConfig.maxSize.inBytes)
}
- private[libkestrel] def dropOldest() {
+ @tailrec
+ private[libkestrel] final def dropOldest() {
if (readerConfig.fullPolicy == ConcurrentBlockingQueue.FullPolicy.DropOldest &&
(items > readerConfig.maxItems || bytes > readerConfig.maxSize.inBytes)) {
- queue.poll() map { itemOption =>
- itemOption foreach { item =>
- serialized {
- discardedCount.getAndIncrement()
- commitItem(item)
- dropOldest()
- }
- }
+ journalReader.next() match {
+ case None => ()
+ case Some(item) =>
+ commitItem(item)
+ discardedCount.getAndIncrement()
+ dropOldest()
}
}
}
- // this happens within the serialized block of a put.
- private[libkestrel] def put(item: QueueItem) {
+ private [libkestrel] def postPut() {
+ dropOldest()
discardExpiredWithoutLimit()
- serialized {
- val inReadBehind = journalReader.map { j =>
- // if item.id <= j.readBehindId, fillReadBehind already saw this item.
- if (j.inReadBehind && item.id > j.readBehindId) {
- true
- } else if (!j.inReadBehind && memoryBytes >= readerConfig.maxMemorySize.inBytes) {
- log.info("Dropping to read-behind for queue '%s+%s' (%s) @ item %d",
- config.name, name, bytes.bytes.toHuman, item.id - 1)
- j.startReadBehind(item.id - 1)
- true
- } else {
- false
- }
- }.getOrElse(false)
-
- if (!inReadBehind) {
- queue.put(item.copy(data = item.data.duplicate()))
- memoryItems += 1
- memoryBytes += item.dataSize
- }
- items += 1
- bytes += item.dataSize
- putCount.getAndIncrement()
- putBytes.getAndAdd(item.dataSize)
-
- // we've already checked canPut by here, but we may still drop the oldest item(s).
- dropOldest()
- }
+ waiters.trigger()
}
private[this] def hasExpired(startTime: Time, expireTime: Option[Time], now: Time): Boolean = {
@@ -494,60 +425,65 @@ class JournaledQueue(
def discardExpiredWithoutLimit() = discardExpired(false)
- def discardExpired(applyMaxExpireSweep: Boolean = true): Future[Int] = {
+ def discardExpired(applyMaxExpireSweep: Boolean = true): Int = {
val max = if (applyMaxExpireSweep) readerConfig.maxExpireSweep else Int.MaxValue
discardExpired(max)
}
- // check the in-memory portion of the queue and discard anything that's expired.
- private[this] def discardExpired(max: Int): Future[Int] = {
- def expireLoop(remainingAttempts: Int, numExpired: Int): Future[(Int, Int)] = {
- if (remainingAttempts == 0) return Future.value { (0, numExpired) }
- val expireFuture =
- queue.pollIf { item => hasExpired(item.addTime, item.expireTime, Time.now) } map {
- case Some(item) =>
- readerConfig.processExpiredItem(item)
- expiredCount.getAndIncrement()
- serialized {
- items -= 1
- bytes -= item.dataSize
- memoryItems -= 1
- memoryBytes -= item.dataSize
- journalReader.foreach { _.commit(item.id) }
- fillReadBehind()
- }
- (remainingAttempts - 1, numExpired + 1)
- case None =>
- (0, numExpired)
- }
-
- expireFuture flatMap { case (remaining, expired) => expireLoop(remaining, expired) }
+ // check the queue and discard anything that's expired.
+ private[this] def discardExpired(max: Int): Int = {
+ var numExpired = 0
+ var remainingAttempts = max
+ while(remainingAttempts > 0) {
+ journalReader.nextIf { item => hasExpired(item.addTime, item.expireTime, Time.now) } match {
+ case Some(item) =>
+ readerConfig.processExpiredItem(item)
+ expiredCount.getAndIncrement()
+ commitItem(item)
+ remainingAttempts -= 1
+ numExpired += 1
+ case None =>
+ remainingAttempts = 0
+ }
}
- expireLoop(max, 0) map { case (_, expired) => expired }
+ numExpired
}
- // if we're in read-behind mode, scan forward in the journal to keep memory as full as
- // possible. this amortizes the disk overhead across all reads.
- // always call this from within a serialized block.
- private[this] def fillReadBehind() {
- journalReader.foreach { j =>
- while (j.inReadBehind && memoryBytes < readerConfig.maxMemorySize.inBytes) {
- if (bytes < readerConfig.maxMemorySize.inBytes) {
- j.endReadBehind()
+ private[this] def waitNext(deadline: Deadline, peeking: Boolean): Future[Option[QueueItem]] = {
+ val startTime = Time.now
+ val promise = new Promise[Option[QueueItem]]
+ waitNext(startTime, deadline, promise, peeking)
+ promise
+ }
+
+ private[this] def waitNext(startTime: Time,
+ deadline: Deadline,
+ promise: Promise[Option[QueueItem]],
+ peeking: Boolean) {
+ val item = if (peeking) journalReader.peekOldest() else journalReader.next()
+ if (item.isDefined || closed) {
+ promise.setValue(item)
+ } else {
+ // checking future.isCancelled is a race, we assume that the caller will either commit
+ // or unget the item if we miss the cancellation
+ def onTrigger() = {
+ if (promise.isCancelled) {
+ promise.setValue(None)
+ waiters.trigger()
} else {
- j.nextReadBehind().foreach { item =>
- queue.put(item)
- memoryItems += 1
- memoryBytes += item.dataSize
- }
+ // if we get woken up, try again with the same deadline.
+ waitNext(startTime, deadline, promise, peeking)
}
}
+ def onTimeout() {
+ promise.setValue(None)
+ }
+ val w = waiters.add(deadline, onTrigger, onTimeout)
+ promise.onCancellation { waiters.remove(w) }
}
}
- def inReadBehind = journalReader.map { _.inReadBehind }.getOrElse(false)
-
/**
* Remove and return an item from the queue, if there is one.
* If no deadline is given, an item is only returned if one is immediately available.
@@ -556,9 +492,11 @@ class JournaledQueue(
if (closed) return Future.value(None)
discardExpiredWithoutLimit()
val startTime = Time.now
+
val future = deadline match {
- case Some(d) => queue.get(d)
- case None => queue.poll()
+ case Some(d) => waitNext(d, peeking)
+ case None if !peeking => Future.value(journalReader.next())
+ case None => Future.value(journalReader.peekOldest())
}
future.flatMap { optItem =>
optItem match {
@@ -574,15 +512,12 @@ class JournaledQueue(
} else {
readerConfig.deliveryLatency(this, Time.now - item.addTime)
getHitCount.getAndIncrement()
- if (peeking) {
- queue.putHead(item)
- Future.value(Some(item.copy(data = item.data.duplicate())))
- } else {
+ if (!peeking) {
openedItemCount.incrementAndGet
openReads.put(item.id, item)
item.data.mark()
- Future.value(s)
}
+ Future.value(s)
}
}
}
@@ -601,19 +536,12 @@ class JournaledQueue(
return
}
- serialized {
- commitItem(item)
- }
+ commitItem(item)
}
- // serialized
private[this] def commitItem(item: QueueItem) {
- journalReader.foreach { _.commit(item.id) }
- items -= 1
- bytes -= item.dataSize
- memoryItems -= 1
- memoryBytes -= item.dataSize
- fillReadBehind()
+ journalReader.commit(item)
+ committedItemCount.getAndIncrement()
}
/**
@@ -632,7 +560,8 @@ class JournaledQueue(
if (readerConfig.errorHandler(newItem)) {
commitItem(newItem)
} else {
- queue.putHead(newItem)
+ journalReader.unget(newItem)
+ waiters.trigger()
}
}
@@ -646,31 +575,18 @@ class JournaledQueue(
/**
* Drain all items from this reader.
*/
- def flush(): Future[Unit] = {
- serialized {
- journalReader.foreach { j =>
- j.flush()
- j.checkpoint()
- }
- queue.flush()
- items = 0
- bytes = 0
- memoryItems = 0
- memoryBytes = 0
- flushCount.getAndIncrement()
- }
+ def flush() {
+ journalReader.flush()
+ flushCount.getAndIncrement()
}
def close() {
- journalReader.foreach { j =>
- j.checkpoint() respond {
- case _ => j.close()
- }
- }
+ journalReader.checkpoint()
+ journalReader.close()
}
def evictWaiters() {
- queue.evictWaiters()
+ waiters.evictAll()
}
/**
@@ -679,13 +595,14 @@ class JournaledQueue(
*/
def isReadyForExpiration: Boolean = {
readerConfig.maxQueueAge.map { age =>
- Time.now > createTime + age && queue.size == 0
+ Time.now > createTime + age && journalReader.isCaughtUp
}.getOrElse(false)
}
def toDebug: String = {
- "<JournaledQueue#Reader: name=%s items=%d bytes=%d mem_items=%d mem_bytes=%d age=%s queue=%s open=%s>".format(
- name, items, bytes, memoryItems, memoryBytes, age, queue.toDebug, openReads.keys.asScala.toList.sorted)
+ "<JournaledQueue#Reader: name=%s items=%d bytes=%d age=%s open=%s puts=%d discarded=%d, expired=%d>".format(
+ name, items, bytes, age, openReads.keys.asScala.toList.sorted, putCount.get, discardedCount.get,
+ expiredCount.get)
}
}
}
View
64 src/main/scala/com/twitter/libkestrel/ReadWriteLock.scala
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2011 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.twitter.libkestrel
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+trait ReadWriteLock {
+ private[this] val lock = new ReentrantReadWriteLock(true)
+
+ def withWriteLock[A](f: => A) = {
+ val writeLock = lock.writeLock
+ writeLock.lock()
+ try {
+ f
+ } finally {
+ writeLock.unlock()
+ }
+ }
+
+ def withDowngradeableWriteLock[A](f: (Function0[Unit]) => A) = {
+ val writeLock = lock.writeLock
+ val readLock = lock.readLock
+ val downgrade = () => {
+ readLock.lock()
+ writeLock.unlock()
+ }
+
+ writeLock.lock()
+ try {
+ f(downgrade)
+ } finally {
+ if (writeLock.isHeldByCurrentThread) {
+ // did not downgrade
+ writeLock.unlock()
+ } else {
+ readLock.unlock()
+ }
+ }
+ }
+
+ def withReadLock[A](f: => A) = {
+ val readLock = lock.readLock
+ readLock.lock()
+ try {
+ f
+ } finally {
+ readLock.unlock()
+ }
+ }
+}
View
6 src/main/scala/com/twitter/libkestrel/RecordReader.scala
@@ -34,6 +34,12 @@ abstract class RecordReader extends Iterable[Record] {
magic
}
+ def mark: Int = reader.position
+
+ def rewind(position: Int) {
+ reader.position(position)
+ }
+
def readNext(): Option[Record] = {
val lastPosition = reader.position
View
70 src/main/scala/com/twitter/libkestrel/SimpleBlockingQueue.scala
@@ -16,7 +16,7 @@ object SimpleBlockingQueue {
}
/**
- * Simple reproduction of the queue from kestrel 2.1.
+ * Simple reproduction of the queue from kestrel 2.x.
*
* Puts and gets are done within synchronized blocks, and a DeadlineWaitQueue is used to arbitrate
* timeouts and handoffs.
@@ -114,71 +114,3 @@ final class SimpleBlockingQueue[A <: AnyRef](
waiters.evictAll()
}
}
-
-/**
- * A wait queue where each item has a timeout.
- * On each `trigger()`, one waiter is awoken (the awaken function is called). If the timeout is
- * triggered by the Timer, the timeout function will be called instead. The queue promises that
- * exactly one of the functions will be called, never both.
- */
-final class DeadlineWaitQueue(timer: Timer) {
- case class Waiter(var timerTask: Option[TimerTask], awaken: () => Unit, timeout: () => Unit)
- private val queue = new LinkedHashSet[Waiter].asScala
-
- def add(deadline: Deadline, awaken: () => Unit, timeout: () => Unit) = {
- val waiter = Waiter(None, awaken, timeout)
- deadline match {
- case Before(time) =>
- val timerTask = timer.schedule(time) {
- if (synchronized { queue.remove(waiter) }) waiter.timeout()
- }
- waiter.timerTask = Some(timerTask)
- case Forever => ()
- }
- synchronized { queue.add(waiter) }
- waiter
- }
-
- def trigger() {
- synchronized {
- queue.headOption.map { waiter =>
- queue.remove(waiter)
- waiter
- }
- }.foreach { waiter =>
- waiter.timerTask.foreach { _.cancel() }
- waiter.awaken()
- }
- }
-
- def triggerAll() {
- synchronized {
- val rv = queue.toArray
- queue.clear()
- rv
- }.foreach { waiter =>
- waiter.timerTask.foreach { _.cancel() }
- waiter.awaken()
- }
- }
-
- def evictAll() {
- synchronized {
- val rv = queue.toArray
- queue.clear()
- rv
- }.foreach { waiter =>
- waiter.timerTask.foreach { _.cancel() }
- waiter.timeout()
- }
- }
-
- def remove(waiter: Waiter) {
- synchronized { queue.remove(waiter) }
- waiter.timerTask.foreach { _.cancel() }
- }
-
- def size = {
- synchronized { queue.size }
- }
-}
View
19 src/main/scala/com/twitter/libkestrel/config/JournaledQueueConfig.scala
@@ -28,13 +28,10 @@ import java.io.File
* and enforce policy on maximum queue size and item expiration.
*
* @param maxItems Set a hard limit on the number of items this queue can hold. When the queue is
- * full, `discardOldWhenFull` dictates the behavior when a client attempts to add another item.
+ * full, `fullPolicy` dictates the behavior when a client attempts to add another item.
* @param maxSize Set a hard limit on the number of bytes (of data in queued items) this queue can
- * hold. When the queue is full, discardOldWhenFull dictates the behavior when a client attempts
+ * hold. When the queue is full, `fullPolicy` dictates the behavior when a client attempts
* to add another item.
- * @param maxMemorySize Keep only this much of the queue in memory. The journal will be used to
- * store backlogged items, and they'll be read back into memory as the queue is drained. This
- * setting is a release valve to keep a backed-up queue from consuming all memory.
* @param maxAge Expiration time for items on this queue. Any item that has been sitting on the
* queue longer than this duration will be discarded. Clients may also attach an expiration time
* when adding items to a queue, in which case the item expires at the earlier of the two
@@ -60,7 +57,6 @@ import java.io.File
case class JournaledQueueReaderConfig(
maxItems: Int = Int.MaxValue,
maxSize: StorageUnit = Long.MaxValue.bytes,
- maxMemorySize: StorageUnit = 128.megabytes,
maxAge: Option[Duration] = None,
fullPolicy: ConcurrentBlockingQueue.FullPolicy = ConcurrentBlockingQueue.FullPolicy.RefusePuts,
processExpiredItem: (QueueItem) => Unit = { _ => },
@@ -71,9 +67,9 @@ case class JournaledQueueReaderConfig(
timeoutLatency: (JournaledQueue#Reader, Duration) => Unit = { (_, _) => }
) {
override def toString() = {
- ("maxItems=%d maxSize=%s maxMemorySize=%s maxAge=%s fullPolicy=%s maxExpireSweep=%d " +
+ ("maxItems=%d maxSize=%s maxAge=%s fullPolicy=%s maxExpireSweep=%d " +
"maxQueueAge=%s").format(
- maxItems, maxSize, maxMemorySize, maxAge, fullPolicy, maxExpireSweep, maxQueueAge)
+ maxItems, maxSize, maxAge, fullPolicy, maxExpireSweep, maxQueueAge)
}
}
@@ -84,8 +80,6 @@ case class JournaledQueueReaderConfig(
* @param name Name of the queue being configured.
* @param maxItemSize Set a hard limit on the number of bytes a single queued item can contain. A
* put request for an item larger than this will be rejected.
- * @param journaled If false, don't keep a journal file for this queue. When libkestrel exits, any
- * remaining contents in the queue will be lost.
* @param journalSize Maximum size of an individual journal file before libkestrel moves to a new
* file. In the (normal) state where a queue is usually empty, this is the amount of disk space
* a queue should consume before moving to a new file and erasing the old one.
@@ -104,7 +98,6 @@ case class JournaledQueueReaderConfig(
case class JournaledQueueConfig(
name: String,
maxItemSize: StorageUnit = Long.MaxValue.bytes,
- journaled: Boolean = true,
journalSize: StorageUnit = 16.megabytes,
syncJournal: Duration = Duration.MaxValue,
saveArchivedJournals: Option[File] = None,
@@ -114,9 +107,9 @@ case class JournaledQueueConfig(
defaultReaderConfig: JournaledQueueReaderConfig = new JournaledQueueReaderConfig()
) {
override def toString() = {
- ("name=%s maxItemSize=%s journaled=%s journalSize=%s syncJournal=%s " +
+ ("name=%s maxItemSize=%s journalSize=%s syncJournal=%s " +
"saveArchivedJournals=%s checkpointTimer=%s").format(
- name, maxItemSize, journaled, journalSize, syncJournal, saveArchivedJournals,
+ name, maxItemSize, journalSize, syncJournal, saveArchivedJournals,
checkpointTimer)
}
View
57 src/main/scala/com/twitter/libkestrel/tools/QueueDumper.scala
@@ -23,7 +23,13 @@ import com.twitter.conversions.storage._
import com.twitter.conversions.time._
import com.twitter.util.{Duration, Time}
-class QueueDumper(filename: String, quiet: Boolean, dump: Boolean, dumpRaw: Boolean, reader: Boolean) {
+sealed trait DumpMode
+case object DumpString extends DumpMode
+case object DumpRaw extends DumpMode
+case object DumpHex extends DumpMode
+case object DumpBase64 extends DumpMode
+
+class QueueDumper(filename: String, quiet: Boolean, dumpMode: Option[DumpMode], reader: Boolean) {
var operations = 0L
var bytes = 0L
var firstId = 0L
@@ -44,6 +50,10 @@ class QueueDumper(filename: String, quiet: Boolean, dump: Boolean, dumpRaw: Bool
}
var lastDisplay = 0L
+ val dumpRaw = dumpMode match {
+ case Some(DumpRaw) => true
+ case _ => false
+ }
var position = file.position
file.foreach { record =>
operations += 1
@@ -73,6 +83,22 @@ class QueueDumper(filename: String, quiet: Boolean, dump: Boolean, dumpRaw: Bool
}
}
+ def encodeData(data: Array[Byte]): String = {
+ dumpMode match {
+ case Some(DumpHex) =>
+ val builder =
+ data.map { byte =>
+ "%02x".format(byte.toInt & 0xFF)
+ }.foldLeft(new StringBuilder(data.length * 3)) { (b, s) =>
+ b.append(s).append(" ")
+ b
+ }
+ builder.toString.trim
+ case Some(DumpBase64) => new sun.misc.BASE64Encoder().encode(data)
+ case _ => new String(data, "ISO-8859-1") // raw, string, none
+ }
+ }
+
def dumpItem(position: Long, record: Record) {
val now = Time.now
verbose("%08x ", position & 0xffffffffL)
@@ -81,7 +107,7 @@ class QueueDumper(filename: String, quiet: Boolean, dump: Boolean, dumpRaw: Bool
if (firstId == 0) firstId = item.id
lastId = item.id
if (!quiet) {
- verbose("PUT %-6d id=%d", item.dataSize, item.id)
+ verbose("PUT %-6d @ %s id=%d", item.dataSize, item.addTime, item.id)
if (item.expireTime.isDefined) {
if (item.expireTime.get - now < 0.milliseconds) {
verbose(" expired")
@@ -94,10 +120,10 @@ class QueueDumper(filename: String, quiet: Boolean, dump: Boolean, dumpRaw: Bool
}
val data = new Array[Byte](item.dataSize)
item.data.get(data)
- if (dump) {
- println(" " + new String(data, "ISO-8859-1"))
- } else if (dumpRaw) {
- print(new String(data, "ISO-8859-1"))
+ dumpMode match {
+ case Some(DumpRaw) => print(encodeData(data))
+ case None => ()
+ case _ => println(" " + encodeData(data))
}
case Record.ReadHead(id) =>
verbose("HEAD %d\n", id)
@@ -115,6 +141,8 @@ object QueueDumper {
var quiet = false
var dump = false
var dumpRaw = false
+ var dumpHex = false
+ var dumpBase64 = false
var reader = false
def usage() {
@@ -126,6 +154,8 @@ object QueueDumper {
println(" -q quiet: don't describe every line, just the summary")
println(" -d dump contents of added items")
println(" -A dump only the raw contents of added items")
+ println(" -x dump contents as hex")
+ println(" -64 dump contents as base 64")
println(" -R file is a reader pointer")
println()
}
@@ -146,6 +176,12 @@ object QueueDumper {
dumpRaw = true
quiet = true
parseArgs(xs)
+ case "-x" :: xs =>
+ dumpHex = true
+ parseArgs(xs)
+ case "-64" :: xs =>
+ dumpBase64 = true
+ parseArgs(xs)
case "-R" :: xs =>
reader = true
parseArgs(xs)
@@ -162,9 +198,16 @@ object QueueDumper {
System.exit(0)
}
+ val dumpMode =
+ if (dumpRaw) Some(DumpRaw)
+ else if (dumpHex) Some(DumpHex)
+ else if (dumpBase64) Some(DumpBase64)
+ else if (dump) Some(DumpString)
+ else None
+
for (filename <- filenames) {
if (!quiet) println("Queue: " + filename)
- new QueueDumper(filename, quiet, dump, dumpRaw, reader)()
+ new QueueDumper(filename, quiet, dumpMode, reader)()
}
}
}
View
30 src/test/scala/com/twitter/libkestrel/DeadlineWaitQueueSpec.scala
@@ -17,7 +17,7 @@
package com.twitter.libkestrel
import com.twitter.conversions.time._
-import com.twitter.util.{MockTimer, Time, TimeControl}
+import com.twitter.util._
import java.util.concurrent.atomic.AtomicInteger
import org.scalatest.{BeforeAndAfter, FunSpec}
@@ -27,7 +27,7 @@ class DeadlineWaitQueueSpec extends FunSpec with BeforeAndAfter {
var timeouts = new AtomicInteger(0)
var awakens = new AtomicInteger(0)
- def newQueue() = new DeadlineWaitQueue(timer)
+ val deadlineWaitQueue = new DeadlineWaitQueue(timer)
before {
timeouts.set(0)
@@ -44,7 +44,6 @@ class DeadlineWaitQueueSpec extends FunSpec with BeforeAndAfter {
it("should invoke timeout function when deadline expires") {
Time.withCurrentTimeFrozen { tc =>
- val deadlineWaitQueue = newQueue()
deadlineWaitQueue.add(Before(10.seconds.fromNow), defaultAwakens, defaultTimeout)
tc.advance(5.seconds)
@@ -61,7 +60,6 @@ class DeadlineWaitQueueSpec extends FunSpec with BeforeAndAfter {
it("should remove waiters after timeout") {
Time.withCurrentTimeFrozen { tc =>
- val deadlineWaitQueue = newQueue()
deadlineWaitQueue.add(Before(10.seconds.fromNow), defaultAwakens, defaultTimeout)
assert(deadlineWaitQueue.size === 1)
tc.advance(11.seconds)
@@ -70,9 +68,23 @@ class DeadlineWaitQueueSpec extends FunSpec with BeforeAndAfter {
}
}
+ it("should not use the timer for infinite deadlines") {
+ val mockTimer = new Timer {
+ def schedule(when: Time)(f: => Unit): TimerTask = {
+ throw new UnsupportedOperationException
+ }
+ def schedule(when: Time, period: Duration)(f: => Unit): TimerTask = {
+ throw new UnsupportedOperationException
+ }
+ def stop() { }
+ }
+
+ val deadlineWaitQueue = new DeadlineWaitQueue(mockTimer)
+ deadlineWaitQueue.add(Forever, defaultAwakens, defaultTimeout)
+ }
+
it("should invoke the awakens function when triggered") {
Time.withCurrentTimeFrozen { tc =>
- val deadlineWaitQueue = newQueue()
deadlineWaitQueue.add(Before(10.seconds.fromNow), defaultAwakens, defaultTimeout)
tc.advance(5.seconds)
@@ -88,7 +100,6 @@ class DeadlineWaitQueueSpec extends FunSpec with BeforeAndAfter {
it("should remove waiters after trigger") {
Time.withCurrentTimeFrozen { tc =>
- val deadlineWaitQueue = newQueue()
deadlineWaitQueue.add(Before(10.seconds.fromNow), defaultAwakens, defaultTimeout)
assert(deadlineWaitQueue.size === 1)
deadlineWaitQueue.trigger
@@ -98,7 +109,6 @@ class DeadlineWaitQueueSpec extends FunSpec with BeforeAndAfter {
it("should awaken only a single waiter at a time") {
Time.withCurrentTimeFrozen { tc =>
- val deadlineWaitQueue = newQueue()
deadlineWaitQueue.add(Before(10.seconds.fromNow), defaultAwakens, defaultTimeout)
deadlineWaitQueue.add(Before(10.seconds.fromNow), defaultAwakens, defaultTimeout)
assert(timeouts.get === 0)
@@ -120,7 +130,6 @@ class DeadlineWaitQueueSpec extends FunSpec with BeforeAndAfter {
it("should awaken all waiters when requested") {
Time.withCurrentTimeFrozen { tc =>
- val deadlineWaitQueue = newQueue()
deadlineWaitQueue.add(Before(10.seconds.fromNow), defaultAwakens, defaultTimeout)
deadlineWaitQueue.add(Before(10.seconds.fromNow), defaultAwakens, defaultTimeout)
assert(timeouts.get === 0)
@@ -134,7 +143,6 @@ class DeadlineWaitQueueSpec extends FunSpec with BeforeAndAfter {
it("should remove waiters after triggering all") {
Time.withCurrentTimeFrozen { tc =>
- val deadlineWaitQueue = newQueue()
deadlineWaitQueue.add(Before(10.seconds.fromNow), defaultAwakens, defaultTimeout)
deadlineWaitQueue.add(Before(10.seconds.fromNow), defaultAwakens, defaultTimeout)
assert(deadlineWaitQueue.size === 2)
@@ -145,7 +153,6 @@ class DeadlineWaitQueueSpec extends FunSpec with BeforeAndAfter {
it("should explicitly remove a waiter without awakening or timing out") {
Time.withCurrentTimeFrozen { tc =>
- val deadlineWaitQueue = newQueue()
val waiter = deadlineWaitQueue.add(Before(10.seconds.fromNow), defaultAwakens, defaultTimeout)
assert(deadlineWaitQueue.size === 1)
assert(timeouts.get === 0)
@@ -159,9 +166,8 @@ class DeadlineWaitQueueSpec extends FunSpec with BeforeAndAfter {
it("should evict waiters and cancel their timer tasks") {
Time.withCurrentTimeFrozen { tc =>
- val deadlineWaitQueue = newQueue()
- deadlineWaitQueue.add(Before(10.seconds.fromNow), defaultAwakens, defaultTimeout)
deadlineWaitQueue.add(Before(10.seconds.fromNow), defaultAwakens, defaultTimeout)
+ deadlineWaitQueue.add(Forever, defaultAwakens, defaultTimeout)
assert(deadlineWaitQueue.size === 2)
assert(timeouts.get === 0)
assert(awakens.get === 0)
View
190 src/test/scala/com/twitter/libkestrel/JournalReaderSpec.scala
@@ -25,6 +25,9 @@ import org.scalatest.{AbstractSuite, FunSpec, Suite}
import org.scalatest.matchers.{Matcher, MatchResult, ShouldMatchers}
class JournalReaderSpec extends FunSpec with ResourceCheckingSuite with ShouldMatchers with TempFolder with TestLogging {
+
+ implicit def stringToBuffer(s: String): ByteBuffer = ByteBuffer.wrap(s.getBytes)
+
def makeJournal(name: String, maxFileSize: StorageUnit): Journal =
new Journal(testFolder, name, maxFileSize, null, Duration.MaxValue, None)
@@ -32,26 +35,22 @@ class JournalReaderSpec extends FunSpec with ResourceCheckingSuite with ShouldMa
def makeFiles() {
val jf1 = JournalFile.create(new File(testFolder, "test.1"), null, Duration.MaxValue, 16.kilobytes)
- jf1.put(QueueItem(100L, Time.now, None, ByteBuffer.wrap("100".getBytes)))
- jf1.put(QueueItem(101L, Time.now, None, ByteBuffer.wrap("101".getBytes)))
+ jf1.put(QueueItem(100L, Time.now, None, "100"))
+ jf1.put(QueueItem(101L, Time.now, None, "101"))
jf1.close()
val jf2 = JournalFile.create(new File(testFolder, "test.2"), null, Duration.MaxValue, 16.kilobytes)
- jf2.put(QueueItem(102L, Time.now, None, ByteBuffer.wrap("102".getBytes)))
- jf2.put(QueueItem(103L, Time.now, None, ByteBuffer.wrap("103".getBytes)))
+ jf2.put(QueueItem(102L, Time.now, None, "102"))
+ jf2.put(QueueItem(103L, Time.now, None, "103"))
jf2.close()
val jf3 = JournalFile.create(new File(testFolder, "test.3"), null, Duration.MaxValue, 16.kilobytes)
- jf3.put(QueueItem(104L, Time.now, None, ByteBuffer.wrap("104".getBytes)))
- jf3.put(QueueItem(105L, Time.now, None, ByteBuffer.wrap("105".getBytes)))
+ jf3.put(QueueItem(104L, Time.now, None, "104"))
+ jf3.put(QueueItem(105L, Time.now, None, "105"))
jf3.close()
}
- def bufferToString(b: ByteBuffer) = {
- val bytes = new Array[Byte](b.remaining)
- b.get(bytes)
- new String(bytes)
- }
+ def queueItem(id: Long) = QueueItem(id, Time.now, None, "blah")
describe("Journal#Reader") {
it("created with a checkpoint file") {
@@ -67,8 +66,8 @@ class JournalReaderSpec extends FunSpec with ResourceCheckingSuite with ShouldMa
val j = makeJournal("test")
val reader = new j.Reader("1", file)
reader.head = 123L
- reader.commit(125L)
- reader.commit(130L)
+ reader.commit(queueItem(125L))
+ reader.commit(queueItem(130L))
reader.checkpoint()
assert(BookmarkFile.open(file).toList === List(
@@ -87,15 +86,16 @@ class JournalReaderSpec extends FunSpec with ResourceCheckingSuite with ShouldMa
bf.close()
val jf = JournalFile.create(new File(testFolder, "test.1"), null, Duration.MaxValue, 16.kilobytes)
- jf.put(QueueItem(890L, Time.now, None, ByteBuffer.wrap("hi".getBytes)))
- jf.put(QueueItem(910L, Time.now, None, ByteBuffer.wrap("hi".getBytes)))
+ jf.put(queueItem(890L))
+ jf.put(queueItem(910L))
jf.close()
val j = makeJournal("test")
val reader = new j.Reader("1", file)
- reader.readState()
+ reader.open()
assert(reader.head === 900L)
assert(reader.doneSet.toList.sorted === List(902L, 903L))
+ reader.close()
j.close()
}
@@ -105,22 +105,22 @@ class JournalReaderSpec extends FunSpec with ResourceCheckingSuite with ShouldMa
val reader = new j.Reader("1", file)
reader.head = 123L
- reader.commit(124L)
+ reader.commit(queueItem(124L))
assert(reader.head === 124L)
assert(reader.doneSet.toList.sorted === List())
- reader.commit(126L)
- reader.commit(127L)
- reader.commit(129L)
+ reader.commit(queueItem(126L))
+ reader.commit(queueItem(127L))
+ reader.commit(queueItem(129L))
assert(reader.head === 124L)
assert(reader.doneSet.toList.sorted === List(126L, 127L, 129L))
- reader.commit(125L)
+ reader.commit(queueItem(125L))
assert(reader.head === 127L)
assert(reader.doneSet.toList.sorted === List(129L))
- reader.commit(130L)
- reader.commit(128L)
+ reader.commit(queueItem(130L))
+ reader.commit(queueItem(128L))
assert(reader.head === 130L)
assert(reader.doneSet.toList.sorted === List())
@@ -129,10 +129,11 @@ class JournalReaderSpec extends FunSpec with ResourceCheckingSuite with ShouldMa
it("flush all items") {
val j = makeJournal("test")
- val (item1, future1) = j.put(ByteBuffer.wrap("hi".getBytes), Time.now, None)()
+ val (item1, future1) = j.put(ByteBuffer.wrap("hi".getBytes), Time.now, None)
val reader = j.reader("1")
- reader.commit(item1.id)
- val (item2, future2) = j.put(ByteBuffer.wrap("bye".getBytes), Time.now, None)()
+ reader.open()
+ reader.commit(item1)
+ val (item2, future2) = j.put(ByteBuffer.wrap("bye".getBytes), Time.now, None)
assert(reader.head === item1.id)
reader.flush()
@@ -141,92 +142,94 @@ class JournalReaderSpec extends FunSpec with ResourceCheckingSuite with ShouldMa
j.close()
}
- describe("read-behind") {
- it("start") {
- val jf = JournalFile.create(new File(testFolder, "test.1"), null, Duration.MaxValue, 16.kilobytes)
- jf.put(QueueItem(100L, Time.now, None, ByteBuffer.wrap("100".getBytes)))
- jf.put(QueueItem(101L, Time.now, None, ByteBuffer.wrap("101".getBytes)))
- jf.close()
+ describe("file boundaries") {
+ def createJournalFiles(journalName: String, startId: Long, idsPerFile: Int, files: Int, head: Long) {
+ for (fileNum <- 1 to files) {
+ val name = "%s.%d".format(journalName, fileNum)
+ val jf = JournalFile.create(new File(testFolder, name), null, Duration.MaxValue, 16.kilobytes)
+ for(idNum <- 1 to idsPerFile) {
+ val id = startId + ((fileNum - 1) * idsPerFile) + (idNum - 1)
+ jf.put(QueueItem(id, Time.now, None, ByteBuffer.wrap(id.toString.getBytes)))
+ }
+ jf.close()
+ }
+
+ val name = "%s.read.client".format(journalName)
+ val bf = BookmarkFile.create(new File(testFolder, name))
+ bf.readHead(head)
+ bf.close()
+ }
+ it("should start at a file edge") {
+ createJournalFiles("test", 100L, 2, 2, 101L)
val j = makeJournal("test")
- val reader = j.reader("client1")
- reader.head = 100L
- assert(!reader.inReadBehind)
- reader.startReadBehind(100L)
- assert(reader.inReadBehind)
- val item = reader.nextReadBehind()
- assert(item.map { _.id } === Some(101L))
- assert(bufferToString(item.get.data) === "101")
-
- reader.endReadBehind()
- assert(!reader.inReadBehind)
+ val reader = j.reader("client")
+ reader.open()
+
+ assert(reader.next().map { _.id } === Some(102L))
j.close()
}
- it("start at file edge") {
- val jf1 = JournalFile.create(new File(testFolder, "test.1"), null, Duration.MaxValue, 16.kilobytes)
- jf1.put(QueueItem(100L, Time.now, None, ByteBuffer.wrap("100".getBytes)))
- jf1.put(QueueItem(101L, Time.now, None, ByteBuffer.wrap("101".getBytes)))
- jf1.close()
+ it("should cross files") {
+ createJournalFiles("test", 100L, 2, 2, 100L)
+ val j = makeJournal("test")
+ val reader = j.reader("client")
+ reader.open()
- val jf2 = JournalFile.create(new File(testFolder, "test.2"), null, Duration.MaxValue, 16.kilobytes)
- jf2.put(QueueItem(102L, Time.now, None, ByteBuffer.wrap("102".getBytes)))
- jf2.put(QueueItem(103L, Time.now, None, ByteBuffer.wrap("103".getBytes)))
- jf2.close()
+ assert(reader.next().map { _.id } === Some(101L))
+ assert(reader.next().map { _.id } === Some(102L))
+ j.close()
+ }
+
+ it("should peek at leading file edge") {
+ createJournalFiles("test", 100L, 2, 2, 101L)
val j = makeJournal("test")
val reader = j.reader("client")
- reader.head = 101L
- reader.startReadBehind(101L)
- assert(reader.nextReadBehind().map { _.id } === Some(102L))
+ reader.open()
+
+ assert(reader.peekOldest().map { _.id } === Some(102L))
+ assert(reader.peekOldest().map { _.id } === Some(102L))
j.close()
}
- describe("across journal files") {
- it("when asked to") {
- makeFiles()
- val j = makeJournal("test")
- val reader = j.reader("client")
- reader.head = 102L
- reader.startReadBehind(102L)
- assert(reader.nextReadBehind().map { _.id } === Some(103L))
-
- val item = reader.nextReadBehind()
- assert(item.map { _.id } === Some(104L))
- assert(bufferToString(item.get.data) === "104")
- j.close()
- }
+ it("should peek at trailing file edge") {
+ createJournalFiles("test", 100L, 2, 2, 100L)
+ val j = makeJournal("test")
+ val reader = j.reader("client")
+ reader.open()
- it("when asked not to") {
- makeFiles()
- val j = makeJournal("test")
- val reader = j.reader("client")
- reader.head = 102L
- val scanner = new reader.Scanner(102L, followFiles = false)
- assert(scanner.next().map { _.id } === Some(103L))
- assert(scanner.next() === None)
- j.close()
- }
+ assert(reader.peekOldest().map { _.id } === Some(101L))
+ assert(reader.peekOldest().map { _.id } === Some(101L))
+
+ j.close()
}
- it("until it catches up") {
- val jf1 = JournalFile.create(new File(testFolder, "test.1"), null, Duration.MaxValue, 16.kilobytes)
- jf1.put(QueueItem(100L, Time.now, None, ByteBuffer.wrap("100".getBytes)))
- jf1.put(QueueItem(101L, Time.now, None, ByteBuffer.wrap("101".getBytes)))
- jf1.close()
+ it("should conditionally get at leading file edge") {
+ createJournalFiles("test", 100L, 2, 2, 101L)
+ val j = makeJournal("test")
+ val reader = j.reader("client")