Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Fix bad Future.sleep substitute
Browse files Browse the repository at this point in the history
  • Loading branch information
Leigh Stewart committed Sep 21, 2015
1 parent 1d3ad64 commit 5279aeb
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions src/test/scala/net/lag/kestrel/BlockingContainer.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package net.lag.kestrel

import java.util.concurrent.{CountDownLatch, ScheduledThreadPoolExecutor}
import com.twitter.util.{Duration, Future, JavaTimer}
import com.twitter.util.{Duration, Future, JavaTimer, Promise, Return, Throw}
import java.nio.ByteBuffer

class BlockingContainer(blockPeriod: Duration) extends LocalDirectory("", new ScheduledThreadPoolExecutor(1)) {
override def getStream(streamName: String, syncPeriod: Duration): PersistentStream = { new BlockingStream(blockPeriod) }
override def getStream(streamName: String, syncPeriod: Duration): PersistentStream = { new BlockingStream(blockPeriod) }
override def listStreams(): Array[String] = {
new Array[String](0)
}
Expand All @@ -22,12 +22,23 @@ class BlockingStreamWriter(blockPeriod: Duration) extends PersistentStreamWriter
implicit val timer = new JavaTimer(true)
def write(data: ByteBuffer): Future[Unit] = {
if (blockPeriod > Duration.Bottom) {
Thread.sleep(blockPeriod.inMilliseconds)
Future.Done
sleep(blockPeriod)
} else {
Future.Done
}
}
def sleep(blockPeriod: Duration): Future[Unit] = {
if (blockPeriod <= Duration.Zero)
return Future.Done
val p = new Promise[Unit]
val task = timer.schedule(blockPeriod.fromNow) { p.setValue(Unit) }
p.setInterruptHandler {
case e =>
if (p.updateIfEmpty(Throw(e)))
task.cancel()
}
p
}
def force(metadata: Boolean) {}
def truncate(position: Long) {}
def position(): Long = { 0 }
Expand Down

0 comments on commit 5279aeb

Please sign in to comment.