Skip to content
Browse files

Merge remote branch 'twitter/fix_discard_expired_a_bit'

Conflicts:
	project/build/KestrelProject.scala
  • Loading branch information...
2 parents a7ef62a + 6e16cf0 commit 4cd726fbe30d3a09af6ab4dbc24c69395f72c62c Robey Pointer committed Jan 12, 2011
View
5 project/build/KestrelProject.scala
@@ -1,9 +1,10 @@
import sbt._
import com.twitter.sbt._
-class KestrelProject(info: ProjectInfo) extends StandardProject(info) with SubversionPublisher {
- val configgy = "net.lag" % "configgy" % "1.6.4"
+class KestrelProject(info: ProjectInfo) extends StandardProject(info) with SubversionPublisher with InlineDependencies {
+ inline("net.lag" % "configgy" % "1.6.4")
val naggati = "net.lag" %% "naggati" % "0.7.5"
+ val twitterActors = "com.twitter" % "twitteractors" % "1.1.0"
val mina = "org.apache.mina" % "mina-core" % "2.0.0-M6"
val slf4j_api = "org.slf4j" % "slf4j-api" % "1.5.2"
val slf4j_jdk14 = "org.slf4j" % "slf4j-jdk14" % "1.5.2"
View
2 project/plugins/Plugins.scala
@@ -2,5 +2,5 @@ import sbt._
class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
val twitterMaven = "twitter.com" at "http://maven.twttr.com/"
- val defaultProject = "com.twitter" % "standard-project" % "0.7.12"
+ val defaultProject = "com.twitter" % "standard-project" % "0.7.17"
}
View
48 src/main/scala/net/lag/kestrel/PersistentQueue.scala
@@ -519,26 +519,38 @@ class PersistentQueue(persistencePath: String, val name: String,
Some(item)
}
- final def discardExpired(max: Int): Int = synchronized {
- if (queue.isEmpty || journal.isReplaying || max <= 0) {
- 0
- } else {
- val realExpiry = adjustExpiry(queue.front.addTime, queue.front.expiry)
- if ((realExpiry != 0) && (realExpiry < Time.now.inMilliseconds)) {
- _totalExpired += 1
- val item = queue.dequeue
- val len = item.data.length
- queueSize -= len
- _memoryBytes -= len
- queueLength -= 1
- fillReadBehind
- if (keepJournal()) journal.remove()
- expiredQueue().map { _.add(item.data, 0) }
- 1 + discardExpired(max - 1)
- } else {
- 0
+ final def discardExpired(max: Int): Int = {
+ val itemsToRemove = synchronized {
+ var continue = true
+ val toRemove = new mutable.ListBuffer[QItem]
+ while(continue) {
+ if (synchronized { queue.isEmpty || journal.isReplaying }) {
+ continue = false
+ } else {
+ val realExpiry = adjustExpiry(queue.front.addTime, queue.front.expiry)
+ if ((realExpiry != 0) && (realExpiry < Time.now.inMilliseconds)) {
+ _totalExpired += 1
+ val item = queue.dequeue
+ val len = item.data.length
+ queueSize -= len
+ _memoryBytes -= len
+ queueLength -= 1
+ fillReadBehind
+ toRemove += item
+ } else {
+ continue = false
+ }
+ }
+ }
+ toRemove
+ }
+
+ expiredQueue().map { expiredQueue =>
+ itemsToRemove.foreach { item =>
+ expiredQueue.add(item.data, 0)
}
}
+ itemsToRemove.size
}
private def _unremove(xid: Int) = {
View
44 src/test/scala/net/lag/kestrel/PersistentQueueSpec.scala
@@ -627,4 +627,48 @@ class PersistentQueueSpec extends Specification with TestHelper {
}
}
}
+
+ "PersistentQueue with item expiry" should {
+ doBefore {
+ Time.freeze
+ }
+
+ "expire items into the ether" in {
+ withTempFolder {
+ val q = makeQueue("wu_tang", "journal" -> "false")
+ val expiry = Time.now + 1.second
+ q.add("rza".getBytes, expiry.inMillis) mustEqual true
+ q.add("gza".getBytes, expiry.inMillis) mustEqual true
+ q.add("ol dirty bastard".getBytes, expiry.inMillis) mustEqual true
+ q.add("raekwon".getBytes) mustEqual true
+ Time.advance(2.seconds)
+ q.discardExpired(q.length.toInt) mustEqual 3
+ q.length mustEqual 1
+ q.remove must beSomeQItem("raekwon")
+ }
+ }
+
+ "expire items into a queue" in {
+ withTempFolder {
+ val r = makeQueue("rappers", "journal" -> "false")
+ val q = makeQueue("wu_tang", "journal" -> "false")
+ q.expiredQueue.set(Some(Some(r))) // :(
+ val expiry = Time.now + 1.second
+
+ q.add("method man".getBytes, expiry.inMillis) mustEqual true
+ q.add("ghostface killah".getBytes, expiry.inMillis) mustEqual true
+ q.add("u-god".getBytes, expiry.inMillis) mustEqual true
+ q.add("masta killa".getBytes) mustEqual true
+ Time.advance(2.seconds)
+ q.discardExpired(q.length.toInt) mustEqual 3
+ q.length mustEqual 1
+ q.remove must beSomeQItem("masta killa")
+
+ r.length mustEqual 3
+ r.remove must beSomeQItem("method man")
+ r.remove must beSomeQItem("ghostface killah")
+ r.remove must beSomeQItem("u-god")
+ }
+ }
+ }
}

0 comments on commit 4cd726f

Please sign in to comment.
Something went wrong with that request. Please try again.