Skip to content
This repository was archived by the owner on Jan 20, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,10 @@ object TimeExtractor {
}
}

trait TimeExtractor[T] extends (T => Long) with java.io.Serializable
/** This cannot be a subclass of function and use the pattern
* of implicit dependencies, since then you get an implicit function.
* Not good
*/
trait TimeExtractor[T] extends java.io.Serializable {
def apply(t: T): Long
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
Copyright 2013 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.summingbird.online

import com.twitter.util.{ Await, Duration, Future, Try }

import java.util.{Queue => JQueue}
import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, LinkedBlockingQueue, TimeUnit}
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
/**
*
* @author Oscar Boykin
*/

object Queue {
/**
* By default, don't block on put
*/
def apply[T]() = linkedNonBlocking[T]

/** Use this for blocking puts when the size is reached
*/
def arrayBlocking[T](size: Int): Queue[T] =
fromBlocking(new ArrayBlockingQueue(size))

def linkedBlocking[T]: Queue[T] =
fromBlocking(new LinkedBlockingQueue())

def linkedNonBlocking[T]: Queue[T] =
fromQueue(new ConcurrentLinkedQueue())

def fromBlocking[T](bq: BlockingQueue[T]): Queue[T] = {
new Queue[T] {
override def add(t: T) = bq.put(t)
override def pollNonBlocking = Option(bq.poll())
}
}

// Uses Queue.add to put. This will fail for full blocking queues
def fromQueue[T](q: JQueue[T]): Queue[T] = {
new Queue[T] {
override def add(t: T) = q.add(t)
override def pollNonBlocking = Option(q.poll())
}
}
}

/**
* Use this class with a thread-safe queue to receive
* results from futures in one thread.
* Storm needs us to touch it's code in one event path (via
* the execute method in bolts)
*/
abstract class Queue[T] {

/** These are the only two methods to implement.
* these must be thread-safe.
*/
protected def add(t: T): Unit
protected def pollNonBlocking: Option[T]

private val count = new AtomicInteger(0)

def put(item: T): Int = {
add(item)
count.incrementAndGet
}

/** Returns the size immediately after the put */
def putAll(items: TraversableOnce[T]): Int = {
val added = items.foldLeft(0) { (cnt, item) =>
add(item)
cnt + 1
}
count.addAndGet(added)
}

/**
* check if something is ready now
*/
def poll: Option[T] = {
val res = pollNonBlocking
// This is for performance sensitive code. Prefering if to match defensively
if(res.isDefined) count.decrementAndGet
res
}

/**
* Obviously, this might not be the same by the time you
* call trimTo or poll
*/
def size: Int = count.get

// Do something on all the elements ready:
@annotation.tailrec
final def foreach(fn: T => Unit): Unit =
poll match {
case None => ()
case Some(it) =>
fn(it)
foreach(fn)
}

// fold on all the elements ready:
@annotation.tailrec
final def foldLeft[V](init: V)(fn: (V, T) => V): V =
poll match {
case None => init
case Some(it) => foldLeft(fn(init, it))(fn)
}

/**
* Take enough elements to get .size == maxLength
*/
def trimTo(maxLength: Int): Seq[T] = {
require(maxLength >= 0, "maxLength must be >= 0.")

@annotation.tailrec
def loop(size: Int, acc: List[T] = Nil): List[T] = {
if(size > maxLength) {
pollNonBlocking match {
case None => acc.reverse // someone else cleared us out
case Some(item) =>
loop(count.decrementAndGet, item::acc)
}
}
else acc.reverse
}
loop(count.get)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
Copyright 2013 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.summingbird.online

import org.scalacheck._
import Gen._
import Arbitrary._
import org.scalacheck.Prop._

import com.twitter.util.{Return, Throw, Future, Try}

object QueueLaws extends Properties("Queue") {

property("Putting into a BoundedQueue gets size right") = forAll { (items: List[String]) =>
val q = Queue[String]()
q.putAll(items)
q.size == items.size
}
property("not spill if capacity is enough") = forAll { (items: List[Int]) =>
val q = Queue[Int]()
q.putAll(items)
q.trimTo(items.size).size == 0
}
property("Work with indepent additions") = forAll { (items: List[Int]) =>
val q = Queue[Int]()
items.map(q.put(_)) == (1 to items.size).toList
}
property("spill all with zero capacity") = forAll { (items: List[Int]) =>
val q = Queue[Int]()
q.putAll(items)
q.trimTo(0) == items
}
property("Queue works with finished futures") = forAll { (items: List[Int]) =>
val q = Queue.linkedBlocking[(Int,Try[Int])]
items.foreach { i => q.put((i, Try(i*i))) }
q.foldLeft((0, true)) { case ((cnt, good), (i, ti)) =>
ti match {
case Return(ii) => (cnt + 1, good)
case Throw(e) => (cnt + 1, false)
}
} == (items.size, true)
}
property("Queue.linkedNonBlocking works") = forAll { (items: List[Int]) =>
val q = Queue.linkedNonBlocking[(Int,Try[Int])]
items.foreach { i => q.put((i, Try(i*i))) }
q.foldLeft((0, true)) { case ((cnt, good), (i, ti)) =>
ti match {
case Return(ii) => (cnt + 1, good)
case Throw(e) => (cnt + 1, false)
}
} == (items.size, true)
}
property("Queue foreach works") = forAll { (items: List[Int]) =>
// Make sure we can fit everything
val q = Queue.arrayBlocking[(Int,Try[Int])](items.size + 1)
items.foreach { i => q.put((i,Try(i*i))) }
var works = true
q.foreach { case (i, Return(ii)) =>
works = works && (ii == i*i)
}
works && (q.size == 0)
}
property("Queue foldLeft works") = forAll { (items: List[Int]) =>
// Make sure we can fit everything
val q = Queue.arrayBlocking[(Int,Try[Int])](items.size + 1)
items.foreach { i => q.put((i,Try(i*i))) }
q.foldLeft(true) { case (works, (i, Return(ii))) =>
(ii == i*i)
} && (q.size == 0)
}

property("Queue poll + size is correct") = forAll { (items: List[Int]) =>
// Make sure we can fit everything
val q = Queue[Int]()
items.map { i =>
q.put(i)
val size = q.size
if(i % 2 == 0) {
// do a poll test
q.poll match {
case None => q.size == 0
case Some(_) => q.size == (size - 1)
}
}
else true
}.forall(identity)
}
property("Queue is fifo") = forAll { (items: List[Int]) =>
val q = Queue[Int]()
q.putAll(items)
(q.trimTo(0).toList == items) && {
val q2 = Queue[Int]()
q2.putAll(items)
q2.foldLeft(List[Int]()) { (l, it) => it :: l }.reverse == items
}
}
}
Loading