Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

match kestrel 2.x discardExpired semantics and remove duplicate stat …

…expired (c.f. expiredCount)

 - allow maxExpireSweep limit to be controlled by caller
 - maxExpireSweep does not apply to expirations due to get/put
 - report number of items discarded

RB_ID=90685
  • Loading branch information...
commit bbd313c5dead7d66f828ca0a2402b4a62b75cdad 1 parent 46819a9
@zuercher zuercher authored
View
1  .gitignore
@@ -14,3 +14,4 @@ ignore/
.scala_dependencies
.#*
*~
+[#]*[#]
View
66 src/main/scala/com/twitter/libkestrel/JournaledQueue.scala
@@ -185,10 +185,19 @@ class JournaledQueue(
}
/**
- * Do a sweep of each reader, and discard any expired items.
+ * Do a sweep of each reader, discarding all expired items up to the reader's configured
+ * `maxExpireSweep`. Returns the total number of items expired across all readers.
*/
- def discardExpired() {
- readerMap.values foreach { _.discardExpired() }
+ def discardExpired(): Int = discardExpired(true)
+
+ /**
+ * Do a sweep of each reader, discarding expired items at the head of the reader's queue. If
+ * applyMaxExpireSweep is true, the reader's currently configured `maxExpireSweep` limit is
+ * enforced, otherwise expiration continues until there are no more expired items at the head
+ * of the queue. Returns the total number of items expired across all readers.
+ */
+ def discardExpired(applyMaxExpireSweep: Boolean): Int = {
+ readerMap.values.foldLeft(0) { _ + _.discardExpired(applyMaxExpireSweep)() }
}
/**
@@ -273,7 +282,6 @@ class JournaledQueue(
@volatile var bytes = 0L
@volatile var memoryItems = 0
@volatile var memoryBytes = 0L
- @volatile var expired = 0L
private val openReads = new ConcurrentHashMap[Long, QueueItem]()
@@ -441,7 +449,7 @@ class JournaledQueue(
// this happens within the serialized block of a put.
private[libkestrel] def put(item: QueueItem) {
- discardExpired()
+ discardExpiredWithoutLimit()
serialized {
val inReadBehind = journalReader.map { j =>
// if item.id <= j.readBehindId, fillReadBehind already saw this item.
@@ -482,29 +490,39 @@ class JournaledQueue(
adjusted.isDefined && adjusted.get <= now
}
- def discardExpired() {
- discardExpired(readerConfig.maxExpireSweep)
+ def discardExpiredWithoutLimit() = discardExpired(false)
+
+ def discardExpired(applyMaxExpireSweep: Boolean = true): Future[Int] = {
+ val max = if (applyMaxExpireSweep) readerConfig.maxExpireSweep else Int.MaxValue
+ discardExpired(max)
}
// check the in-memory portion of the queue and discard anything that's expired.
- def discardExpired(max: Int) {
- if (max == 0) return
- queue.pollIf { item => hasExpired(item.addTime, item.expireTime, Time.now) } map { itemOption =>
- itemOption foreach { item =>
- readerConfig.processExpiredItem(item)
- expiredCount.getAndIncrement()
- serialized {
- items -= 1
- bytes -= item.dataSize
- memoryItems -= 1
- memoryBytes -= item.dataSize
- expired += 1
- journalReader.foreach { _.commit(item.id) }
- fillReadBehind()
+ private[this] def discardExpired(max: Int): Future[Int] = {
+ def expireLoop(remainingAttempts: Int, numExpired: Int): Future[(Int, Int)] = {
+ if (remainingAttempts == 0) return Future.value { (0, numExpired) }
+ val expireFuture =
+ queue.pollIf { item => hasExpired(item.addTime, item.expireTime, Time.now) } map {
+ case Some(item) =>
+ readerConfig.processExpiredItem(item)
+ expiredCount.getAndIncrement()
+ serialized {
+ items -= 1
+ bytes -= item.dataSize
+ memoryItems -= 1
+ memoryBytes -= item.dataSize
+ journalReader.foreach { _.commit(item.id) }
+ fillReadBehind()
+ }
+ (remainingAttempts - 1, numExpired + 1)
+ case None =>
+ (0, numExpired)
}
- discardExpired(max - 1)
- }
+
+ expireFuture flatMap { case (remaining, expired) => expireLoop(remaining, expired) }
}
+
+ expireLoop(max, 0) map { case (_, expired) => expired }
}
// if we're in read-behind mode, scan forward in the journal to keep memory as full as
@@ -534,7 +552,7 @@ class JournaledQueue(
*/
def get(deadline: Option[Deadline], peeking: Boolean = false): Future[Option[QueueItem]] = {
if (closed) return Future.value(None)
- discardExpired()
+ discardExpiredWithoutLimit()
val startTime = Time.now
val future = deadline match {
case Some(d) => queue.get(d)
View
223 src/test/scala/com/twitter/libkestrel/JournaledQueueSpec.scala
@@ -536,81 +536,200 @@ class JournaledQueueSpec extends Spec with ResourceCheckingSuite with ShouldMatc
q.close()
}
- it("expires old items") {
- setupWriteJournals(4, 1, expiredItems = 1)
- setupBookmarkFile("", 0)
- val q = makeQueue()
- val reader = q.reader("")
+ describe("old item expiration") {
+ it("expires old items on get") {
+ setupWriteJournals(4, 1, expiredItems = 1)
+ setupBookmarkFile("", 0)
+ val q = makeQueue()
+ val reader = q.reader("")
- assert(reader.expired === 0)
- val item = reader.get(None)()
- assert(item.isDefined)
- assert(item.get.id === 2L)
- assert(eventually(reader.expired == 1))
- assert(reader.expiredCount.get === 1)
- q.close()
- }
+ assert(reader.expiredCount.get === 0)
+ val item = reader.get(None)()
+ assert(item.isDefined)
+ assert(item.get.id === 2L)
+ assert(eventually(reader.expiredCount.get == 1))
+ q.close()
+ }
- it("sends expired items to the callback") {
- Time.withCurrentTimeFrozen { timeMutator =>
- var received: Option[QueueItem] = None
- def callback(item: QueueItem) {
- received = Some(item)
+ it("expires old items on put") {
+ setupWriteJournals(4, 1, expiredItems = 1)
+ setupBookmarkFile("", 0)
+ val q = makeQueue()
+ val reader = q.reader("")
+
+ assert(reader.expiredCount.get === 0)
+ q.put(stringToBuffer("new item"), Time.now, None)
+ val item = reader.get(None)()
+ assert(item.isDefined)
+ assert(item.get.id === 2L)
+ assert(eventually(reader.expiredCount.get == 1))
+ q.close()
+ }
+
+ it("sends expired items to the callback") {
+ Time.withCurrentTimeFrozen { timeMutator =>
+ var received: Option[QueueItem] = None
+ def callback(item: QueueItem) {
+ received = Some(item)
+ }
+
+ val q = makeQueue(readerConfig = makeReaderConfig.copy(processExpiredItem = callback))
+ val reader = q.reader("")
+ q.put(stringToBuffer("dead!"), Time.now, Some(1.second.fromNow))
+
+ timeMutator.advance(1.second)
+ assert(reader.get(None)() === None)
+ assert(received.isDefined)
+ assert(bufferToString(received.get.data) === "dead!")
+ q.close()
}
+ }
+
+ it("limits the number of expirations in a single sweep") {
+ Time.withCurrentTimeFrozen { timeMutator =>
+ var received: List[String] = Nil
+ def callback(item: QueueItem) {
+ received ::= bufferToString(item.data)
+ }
+
+ val q = makeQueue(readerConfig = makeReaderConfig.copy(
+ processExpiredItem = callback,
+ maxExpireSweep = 3
+ ))
+ val reader = q.reader("")
+ (1 to 10).foreach { id =>
+ q.put(stringToBuffer(id.toString), Time.now, Some(100.milliseconds.fromNow))
+ }
+
+ timeMutator.advance(100.milliseconds)
+ assert(reader.items === 10)
+ assert(received === Nil)
+
+ q.discardExpired()
+ assert(reader.items === 7)
+ assert(received === List("3", "2", "1"))
+ received = Nil
+
+ q.discardExpired()
+ assert(reader.items === 4)
+ assert(received === List("6", "5", "4"))
+ q.close()
+ }
+ }
+
+ it("ignores maxExpireSweep when expiring items on get/put") {
+ Time.withCurrentTimeFrozen { timeMutator =>
+ val q = makeQueue(readerConfig = makeReaderConfig.copy(
+ maxExpireSweep = 3
+ ))
+ val reader = q.reader("")
+ (1 to 10).foreach { id =>
+ q.put(stringToBuffer(id.toString), Time.now, Some(100.milliseconds.fromNow))
+ }
+
+ timeMutator.advance(100.milliseconds)
+ assert(reader.items === 10)
+
+ q.put(stringToBuffer("poof"), Time.now, None)
+ assert(reader.items === 1)
+ q.flush()
- val q = makeQueue(readerConfig = makeReaderConfig.copy(processExpiredItem = callback))
+ (1 to 10).foreach { id =>
+ q.put(stringToBuffer(id.toString), Time.now, Some(100.milliseconds.fromNow))
+ }
+
+ timeMutator.advance(100.milliseconds)
+ assert(reader.items === 10)
+
+ assert(!reader.get(None)().isDefined)
+ assert(reader.items === 0)
+
+ q.close()
+ }
+ }
+
+ it("defaults expiration of items with a limit") {
+ setupWriteJournals(10, 1, expiredItems = 9)
+ setupBookmarkFile("", 0)
+
+ val q = makeQueue(readerConfig = makeReaderConfig.copy(
+ maxExpireSweep = 3
+ ))
val reader = q.reader("")
- q.put(stringToBuffer("dead!"), Time.now, Some(1.second.fromNow))
- timeMutator.advance(1.second)
- assert(reader.get(None)() === None)
- assert(received.isDefined)
- assert(bufferToString(received.get.data) === "dead!")
+ assert(reader.expiredCount.get === 0)
+ assert(q.discardExpired() === 3)
+ assert(reader.expiredCount.get === 3)
+ assert(reader.items === 7)
q.close()
}
- }
- it("limits the number of expirations in a single sweep") {
- Time.withCurrentTimeFrozen { timeMutator =>
- var received: List[String] = Nil
- def callback(item: QueueItem) {
- received ::= bufferToString(item.data)
- }
+ it("allows explicit expiration of items without limit") {
+ setupWriteJournals(10, 1, expiredItems = 9)
+ setupBookmarkFile("", 0)
val q = makeQueue(readerConfig = makeReaderConfig.copy(
- processExpiredItem = callback,
maxExpireSweep = 3
))
val reader = q.reader("")
- (1 to 10).foreach { id =>
- q.put(stringToBuffer(id.toString), Time.now, Some(100.milliseconds.fromNow))
- }
- timeMutator.advance(100.milliseconds)
- assert(reader.items === 10)
- assert(received === Nil)
+ assert(reader.expiredCount.get === 0)
+ assert(q.discardExpired(false) === 9)
+ assert(reader.expiredCount.get === 9)
+ q.close()
+ }
- q.put(stringToBuffer("poof"), Time.now, None)
- assert(reader.items === 8)
- assert(received === List("3", "2", "1"))
- received = Nil
+ it("stops unlimited expiration when queue is empty") {
+ setupWriteJournals(10, 1, expiredItems = 10)
+ setupBookmarkFile("", 0)
- q.put(stringToBuffer("poof"), Time.now, None)
- assert(reader.items === 6)
- assert(received === List("6", "5", "4"))
+ val q = makeQueue(readerConfig = makeReaderConfig.copy(
+ maxExpireSweep = 3
+ ))
+ val reader = q.reader("")
+
+ assert(reader.items == 10)
+ assert(q.discardExpired(false) === 10)
+ assert(reader.items == 0)
q.close()
}
- }
- it("honors default expiration") {
- Time.withCurrentTimeFrozen { timeMutator =>
- val q = makeQueue(readerConfig = makeReaderConfig.copy(maxAge = Some(1.second)))
- q.put(stringToBuffer("hi"), Time.now, None)
+ it("reports number of expired items across readers") {
+ setupWriteJournals(10, 1, expiredItems = 9)
+ setupBookmarkFile("1", 0)
+ setupBookmarkFile("2", 0)
+ setupBookmarkFile("3", 0)
- timeMutator.advance(1.second)
- assert(q.reader("").get(None)() === None)
+ val q = makeQueue(readerConfig = makeReaderConfig.copy(
+ maxExpireSweep = 3
+ ))
+ val reader1 = q.reader("1")
+ val reader2 = q.reader("2")
+ val reader3 = q.reader("3")
+
+ assert(reader1.items === 10)
+ assert(reader2.items === 10)
+ assert(reader3.items === 10)
+ assert(reader1.expiredCount.get === 0)
+ assert(reader2.expiredCount.get === 0)
+ assert(reader3.expiredCount.get === 0)
+ assert(q.discardExpired() >= 3)
+ assert(reader1.expiredCount.get === 3)
+ assert(reader2.expiredCount.get === 3)
+ assert(reader3.expiredCount.get === 3)
q.close()
}
+
+ it("honors default expiration") {
+ Time.withCurrentTimeFrozen { timeMutator =>
+ val q = makeQueue(readerConfig = makeReaderConfig.copy(maxAge = Some(1.second)))
+ q.put(stringToBuffer("hi"), Time.now, None)
+
+ timeMutator.advance(1.second)
+ assert(q.reader("").get(None)() === None)
+ q.close()
+ }
+ }
}
it("saves archived journals") {
Please sign in to comment.
Something went wrong with that request. Please try again.