Permalink
Browse files

fix error replay

  • Loading branch information...
1 parent bc280b6 commit c1c90ae1490d36deb6d7f93788773761270aefc1 eaceaser committed Nov 17, 2011
Showing with 29 additions and 25 deletions.
  1. +29 −25 src/main/scala/net/lag/kestrel/PersistentQueue.scala
@@ -502,36 +502,40 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
}
final def discardExpired(max: Int): Int = {
- val itemsToRemove = synchronized {
- var continue = true
- val toRemove = new mutable.ListBuffer[QItem]
- while (continue) {
- if (queue.isEmpty || journal.isReplaying) {
- continue = false
- } else {
- val realExpiry = adjustExpiry(queue.front.addTime, queue.front.expiry)
- if (realExpiry.isDefined && realExpiry.get < Time.now) {
- totalExpired.incr()
- val item = queue.dequeue
- val len = item.data.length
- queueSize -= len
- _memoryBytes -= len
- queueLength -= 1
- fillReadBehind
- if (config.keepJournal) journal.remove()
- toRemove += item
- } else {
+ if (max > 0) {
+ var count = 0
+ val itemsToRemove = synchronized {
+ var continue = true
+ val toRemove = new mutable.ListBuffer[QItem]
+ while (continue) {
+ if (queue.isEmpty || journal.isReplaying) {
continue = false
+ } else {
+ val realExpiry = adjustExpiry(queue.front.addTime, queue.front.expiry)
+ if (realExpiry.isDefined && realExpiry.get < Time.now && count < max) {
+ totalExpired.incr()
+ val item = queue.dequeue
+ val len = item.data.length
+ queueSize -= len
+ _memoryBytes -= len
+ queueLength -= 1
+ fillReadBehind
+ if (config.keepJournal) journal.remove()
+ toRemove += item
+ count += 1
+ } else {
+ continue = false
+ }
}
}
+ toRemove
}
- toRemove
- }
- expireQueue.foreach { q =>
- itemsToRemove.foreach { item => q.add(item.data, None) }
- }
- itemsToRemove.size
+ expireQueue.foreach { q =>
+ itemsToRemove.foreach { item => q.add(item.data, None) }
+ }
+ itemsToRemove.size
+ } else 0
}
private def _unremove(xid: Int) = {

0 comments on commit c1c90ae

Please sign in to comment.