Skip to content

Commit

Permalink
util-core: Promote ActivitySource Out of Experimental
Browse files Browse the repository at this point in the history
Problem / Solution

`com.twitter.io.exp.ActivitySource` et al. are depended on in production at
Twitter, and have remained mostly untouched in the last five years. It no
longer makes sense for them to remain as 'experimental', so let's promote
them out of that namespace to `com.twitter.io.ActivitySource`, etc.

Differential Revision: https://phabricator.twitter.biz/D478498
  • Loading branch information
ryanoneill authored and jenkins committed May 5, 2020
1 parent 6ee7b49 commit 3e93970
Show file tree
Hide file tree
Showing 15 changed files with 350 additions and 342 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.rst
Expand Up @@ -7,6 +7,13 @@ Note that ``PHAB_ID=#`` and ``RB_ID=#`` correspond to associated messages in com
Unreleased
----------

Breaking API Changes
~~~~~~~~~~~~~~~~~~~~

* util-core: Change the namespace of `ActivitySource` and its derivatives to
`com.twitter.io` as its no longer considered experimental since the code has
changed minimally in the past 5 years. ``PHAB_ID=D478498``

20.4.1
------

Expand Down Expand Up @@ -160,7 +167,7 @@ Breaking API Changes

* util-stats: abstract methods of StatsReceiver now take Schemas. The old APIs
are now final and cannot be overriden. For custom implementations, define
schema based methods (eg, counter(verbosity, name) is now
schema based methods (eg, counter(verbosity, name) is now
counter(CounterSchema)). NB: users can continue to call the old interface;
only implementors must migrate.``PHAB_ID=D385068``

Expand Down
2 changes: 0 additions & 2 deletions util-core/BUILD
Expand Up @@ -56,13 +56,11 @@ scala_library(
"util/util-core/src/main/scala/com/twitter/concurrent",
"util/util-core/src/main/scala/com/twitter/conversions",
"util/util-core/src/main/scala/com/twitter/io",
"util/util-core/src/main/scala/com/twitter/io/exp",
],
exports = [
":util-core-util",
"util/util-core/src/main/scala/com/twitter/concurrent",
"util/util-core/src/main/scala/com/twitter/conversions",
"util/util-core/src/main/scala/com/twitter/io",
"util/util-core/src/main/scala/com/twitter/io/exp",
],
)
63 changes: 63 additions & 0 deletions util-core/src/main/scala/com/twitter/io/ActivitySource.scala
@@ -0,0 +1,63 @@
package com.twitter.io

import com.twitter.util._

/**
* An ActivitySource provides access to observerable named variables.
*/
trait ActivitySource[+T] {

/**
* Returns an [[com.twitter.util.Activity]] for a named T-typed variable.
*/
def get(name: String): Activity[T]

/**
* Produces an ActivitySource which queries this ActivitySource, falling back to
* a secondary ActivitySource only when the primary result is ActivitySource.Failed.
*
* @param that the secondary ActivitySource
*/
def orElse[U >: T](that: ActivitySource[U]): ActivitySource[U] =
new ActivitySource.OrElse(this, that)
}

object ActivitySource {

/**
* A Singleton exception to indicate that an ActivitySource failed to find
* a named variable.
*/
object NotFound extends IllegalStateException

/**
* An ActivitySource for observing file contents. Once observed,
* each file will be polled once per period.
*/
def forFiles(
period: Duration = Duration.fromSeconds(60)
)(
implicit timer: Timer
): ActivitySource[Buf] =
new CachingActivitySource(new FilePollingActivitySource(period)(timer))

/**
* Create an ActivitySource for ClassLoader resources.
*/
def forClassLoaderResources(
cl: ClassLoader = ClassLoader.getSystemClassLoader
): ActivitySource[Buf] =
new CachingActivitySource(new ClassLoaderActivitySource(cl))

private[ActivitySource] class OrElse[T, U >: T](
primary: ActivitySource[T],
failover: ActivitySource[U])
extends ActivitySource[U] {
def get(name: String): Activity[U] = {
primary.get(name) transform {
case Activity.Failed(_) => failover.get(name)
case state => Activity(Var.value(state))
}
}
}
}
@@ -0,0 +1,47 @@
package com.twitter.io

import com.twitter.util._
import java.lang.ref.{ReferenceQueue, WeakReference}
import java.util.HashMap

/**
* A convenient wrapper for caching the results returned by the
* underlying ActivitySource.
*/
class CachingActivitySource[T](underlying: ActivitySource[T]) extends ActivitySource[T] {

private[this] val refq = new ReferenceQueue[Activity[T]]
private[this] val forward = new HashMap[String, WeakReference[Activity[T]]]
private[this] val reverse = new HashMap[WeakReference[Activity[T]], String]

/**
* A caching proxy to the underlying ActivitySource. Vars are cached by
* name, and are tracked with WeakReferences.
*/
def get(name: String): Activity[T] = synchronized {
gc()
Option(forward.get(name)) flatMap { wr => Option(wr.get()) } match {
case Some(v) => v
case None =>
val v = underlying.get(name)
val ref = new WeakReference(v, refq)
forward.put(name, ref)
reverse.put(ref, name)
v
}
}

/**
* Remove garbage collected cache entries.
*/
def gc(): Unit = synchronized {
var ref = refq.poll()
while (ref != null) {
val key = reverse.remove(ref)
if (key != null)
forward.remove(key)

ref = refq.poll()
}
}
}
@@ -0,0 +1,47 @@
package com.twitter.io

import com.twitter.util._
import java.util.concurrent.atomic.AtomicBoolean

/**
* An ActivitySource for ClassLoader resources.
*/
class ClassLoaderActivitySource private[io] (classLoader: ClassLoader, pool: FuturePool)
extends ActivitySource[Buf] {

private[io] def this(classLoader: ClassLoader) = this(classLoader, FuturePool.unboundedPool)

def get(name: String): Activity[Buf] = {
// This Var is updated at most once since ClassLoader
// resources don't change (do they?).
val runOnce = new AtomicBoolean(false)
val p = new Promise[Activity.State[Buf]]

// Defer loading until the first observation
val v = Var.async[Activity.State[Buf]](Activity.Pending) { value =>
if (runOnce.compareAndSet(false, true)) {
pool {
classLoader.getResourceAsStream(name) match {
case null => p.setValue(Activity.Failed(ActivitySource.NotFound))
case stream =>
val reader =
new InputStreamReader(stream, InputStreamReader.DefaultMaxBufferSize, pool)
BufReader.readAll(reader) respond {
case Return(buf) =>
p.setValue(Activity.Ok(buf))
case Throw(cause) =>
p.setValue(Activity.Failed(cause))
} ensure {
// InputStreamReader ignores the deadline in close
reader.close(Time.Undefined)
}
}
}
}
p.onSuccess(value() = _)
Closable.nop
}

Activity(v)
}
}
@@ -0,0 +1,51 @@
package com.twitter.io

import com.twitter.util._
import java.io.{File, FileInputStream}

/**
* An ActivitySource for observing the contents of a file with periodic polling.
*/
class FilePollingActivitySource private[io] (
period: Duration,
pool: FuturePool
)(
implicit timer: Timer)
extends ActivitySource[Buf] {

private[io] def this(period: Duration)(implicit timer: Timer) =
this(period, FuturePool.unboundedPool)

def get(name: String): Activity[Buf] = {
val v = Var.async[Activity.State[Buf]](Activity.Pending) { value =>
val timerTask = timer.schedule(Time.now, period) {
val file = new File(name)

if (file.exists()) {
pool {
val reader = new InputStreamReader(
new FileInputStream(file),
InputStreamReader.DefaultMaxBufferSize,
pool
)
BufReader.readAll(reader) respond {
case Return(buf) =>
value() = Activity.Ok(buf)
case Throw(cause) =>
value() = Activity.Failed(cause)
} ensure {
// InputStreamReader ignores the deadline in close
reader.close(Time.Undefined)
}
}
} else {
value() = Activity.Failed(ActivitySource.NotFound)
}
}

Closable.make { _ => Future { timerTask.cancel() } }
}

Activity(v)
}
}

0 comments on commit 3e93970

Please sign in to comment.