Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Revert "Changed the query Timeout behavior to only purge the Timer qu…
Browse files Browse the repository at this point in the history
…eue intermittently."

On further inspection, we don't need to call purge() at all. We'll revert this, and remove
the call to purge in a future change.

This reverts commit 576bda7.
  • Loading branch information
Benjy Weinberger committed Jan 5, 2011
1 parent e200299 commit 89dfacd
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 57 deletions.
33 changes: 11 additions & 22 deletions src/main/scala/com/twitter/querulous/Timeout.scala
Original file line number Diff line number Diff line change
@@ -1,49 +1,38 @@
package com.twitter.querulous

import java.util.{Timer, TimerTask}
import java.util.concurrent.atomic.AtomicInteger
import com.twitter.util.Duration
import net.lag.logging.Logger


class TimeoutException extends Exception

object Timeout {
val defaultTimer = new Timer("Timer thread", true)
val defaultPurgeInterval = 1000 // TODO(benjy): Set from config?
}

class Timeout(timer: Timer, val purgeInterval: Int) {
def this(timer: Timer) = this(timer, Timeout.defaultPurgeInterval)
def this(purgeInterval: Int) = this(Timeout.defaultTimer, purgeInterval)
def this() = this(Timeout.defaultTimer, Timeout.defaultPurgeInterval)

private val log = Logger.get(getClass.getName)

private var numTimeouts = new AtomicInteger(0)
val defaultTimer = new Timer("Timer thread", true)

def apply[T](timeout: Duration)(f: => T)(onTimeout: => Unit): T = {
@volatile var timedOut = false
def apply[T](timer: Timer, timeout: Duration)(f: => T)(onTimeout: => Unit): T = {
@volatile var cancelled = false
val task = if (timeout.inMillis > 0)
Some(schedule(timer, timeout, { timedOut = true; numTimeouts.incrementAndGet(); onTimeout }))
Some(schedule(timer, timeout, { cancelled = true; onTimeout }))
else None

try {
f
} finally {
task map { t =>
t.cancel() // Cancel the timeout (regardless of whether it was reached or not).
// We purge intermittently, so that cancelled tasks can get garbage-collected.
// We don't want to purge on every cancel, as purge() is linear in the number
// of tasks scheduled in the timer, and is synchronized on the timer, so if
// many tasks are cancelled we end up with quadratic behavior.
if (numTimeouts.get() % purgeInterval == 0)
timer.purge()
t.cancel()
timer.purge()
}
if (timedOut) throw new TimeoutException
if (cancelled) throw new TimeoutException
}
}

def apply[T](timeout: Duration)(f: => T)(onTimeout: => Unit): T = {
apply(defaultTimer, timeout)(f)(onTimeout)
}

private def schedule(timer: Timer, timeout: Duration, f: => Unit) = {
val task = new TimerTask() {
override def run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class PerQueryTimingOutQueryFactory(queryFactory: QueryFactory, val timeouts: Ma

private object QueryCancellation {
val cancelTimer = new java.util.Timer("global query cancellation timer", true)
val queryCanceller = new Timeout(cancelTimer)
}

/**
Expand All @@ -60,7 +59,7 @@ class TimingOutQuery(query: Query, connection: Connection, timeout: Duration, ca

override def delegate[A](f: => A) = {
try {
queryCanceller(timeout)(f) {
Timeout(cancelTimer, timeout)(f) {
if (cancelOnTimeout) cancel()
destroyConnection(connection)
}
Expand Down
35 changes: 2 additions & 33 deletions src/test/scala/com/twitter/querulous/integration/TimeoutSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ package com.twitter.querulous.integration

import java.util.concurrent.CountDownLatch
import org.specs.Specification
import org.specs.mock.{JMocker, ClassMocker}
import net.lag.configgy.Configgy
import com.twitter.util.Time
import com.twitter.util.TimeConversions._
import com.twitter.querulous.{TestEvaluator, Timeout, TimeoutException}
import com.twitter.querulous.TestEvaluator
import com.twitter.querulous.database.ApachePoolingDatabaseFactory
import com.twitter.querulous.query.{SqlQueryFactory, TimingOutQueryFactory, SqlQueryTimeoutException}
import com.twitter.querulous.evaluator.{StandardQueryEvaluatorFactory, QueryEvaluator}


class TimeoutSpec extends Specification with JMocker with ClassMocker {
class TimeoutSpec extends Specification {
Configgy.configure("config/test.conf")

import TestEvaluator._
Expand Down Expand Up @@ -52,35 +51,5 @@ class TimeoutSpec extends Specification with JMocker with ClassMocker {
thread.interrupt()
thread.join()
}

"purge intermittently" in {
val task = capturingParam[java.util.TimerTask]

class FakeTimer extends java.util.Timer {
var numPurges = 0

// Ignore the timeout and run the task immediately, to force the appearance of a timeout.
override def schedule(task: java.util.TimerTask, timeout: Long) = task.run

override def purge = { numPurges += 1; 0}
}

val fakeTimer = new FakeTimer()
val fakeTimeout = new Timeout(fakeTimer, 3)

def forceTimeout(shouldPurge: Boolean) {
val purgesBefore = fakeTimer.numPurges
fakeTimeout(1.second)(1)(()) must throwA[TimeoutException]
val purgesAfter = fakeTimer.numPurges
shouldPurge == (purgesAfter != purgesBefore) must beTrue
}

forceTimeout(false)
forceTimeout(false)
forceTimeout(true)
forceTimeout(false)
forceTimeout(false)
forceTimeout(true)
}
}
}

0 comments on commit 89dfacd

Please sign in to comment.