Skip to content

Commit

Permalink
!str akka#19710 Use Java types in Attributes Java API
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Feb 8, 2016
1 parent f817130 commit 50aba6c
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 19 deletions.
Expand Up @@ -7,6 +7,7 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;

import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -47,4 +48,32 @@ public void mustGetAttributeByClass() {
attributes.getAttribute(Attributes.Name.class, new Attributes.Name("default")));
}

@Test
public void mustGetMissingAttributeByClass() {
assertEquals(
Optional.empty(),
attributes.getAttribute(Attributes.LogLevels.class));
}

@Test
public void mustGetPossiblyMissingAttributeByClass() {
assertEquals(
Optional.of(new Attributes.Name("b")),
attributes.getAttribute(Attributes.Name.class));
}

@Test
public void mustGetPossiblyMissingFirstAttributeByClass() {
assertEquals(
Optional.of(new Attributes.Name("b")),
attributes.getFirstAttribute(Attributes.Name.class));
}

@Test
public void mustGetMissingFirstAttributeByClass() {
assertEquals(
Optional.empty(),
attributes.getFirstAttribute(Attributes.LogLevels.class));
}

}
Expand Up @@ -60,6 +60,17 @@ class AttributesSpec extends AkkaSpec with ScalaFutures {
attributes.get[Name] should contain(Name("attributesSink"))
}
}

val attributes = Attributes.name("a") and Attributes.name("b") and Attributes.inputBuffer(1, 2)

"give access to first attribute" in {
attributes.getFirst[Name] should ===(Some(new Attributes.Name("a")))
}

"give access to attribute byt type" in {
attributes.get[Name] should ===(Some(new Attributes.Name("b")))
}

}

}
37 changes: 21 additions & 16 deletions akka-stream/src/main/scala/akka/stream/Attributes.scala
Expand Up @@ -3,12 +3,15 @@
*/
package akka.stream

import java.util.Optional

import akka.event.Logging
import scala.annotation.tailrec
import scala.reflect.{ classTag, ClassTag }
import akka.japi.function
import akka.stream.impl.StreamLayout._
import java.net.URLEncoder
import scala.compat.java8.OptionConverters._

/**
* Holds attributes which can be used to alter [[akka.stream.scaladsl.Flow]] / [[akka.stream.javadsl.Flow]]
Expand Down Expand Up @@ -48,32 +51,30 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) {
* If no such attribute exists the `default` value is returned.
*/
def getAttribute[T <: Attribute](c: Class[T], default: T): T =
getAttribute(c) match {
case Some(a) a
case None default
}
getAttribute(c).orElse(default)

/**
* Java API: Get the first (least specific) attribute of a given `Class` or subclass thereof.
* If no such attribute exists the `default` value is returned.
*/
def getFirstAttribute[T <: Attribute](c: Class[T], default: T): T =
getFirstAttribute(c) match {
case Some(a) a
case None default
}
getFirstAttribute(c).orElse(default)

/**
* Java API: Get the last (most specific) attribute of a given `Class` or subclass thereof.
*/
def getAttribute[T <: Attribute](c: Class[T]): Option[T] =
Option(attributeList.foldLeft(null.asInstanceOf[T])((acc, attr) if (c.isInstance(attr)) c.cast(attr) else acc))
def getAttribute[T <: Attribute](c: Class[T]): Optional[T] =
Optional.ofNullable(attributeList.foldLeft(
null.asInstanceOf[T]
)(
(acc, attr) if (c.isInstance(attr)) c.cast(attr) else acc)
)

/**
* Java API: Get the first (least specific) attribute of a given `Class` or subclass thereof.
*/
def getFirstAttribute[T <: Attribute](c: Class[T]): Option[T] =
attributeList.find(c isInstance _).map(c cast _)
def getFirstAttribute[T <: Attribute](c: Class[T]): Optional[T] =
attributeList.find(c isInstance _).map(c cast _).asJava

/**
* Scala API: get all attributes of a given type (or subtypes thereof).
Expand Down Expand Up @@ -102,14 +103,18 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) {
/**
* Scala API: Get the last (most specific) attribute of a given type parameter T `Class` or subclass thereof.
*/
def get[T <: Attribute: ClassTag]: Option[T] =
getAttribute(classTag[T].runtimeClass.asInstanceOf[Class[T]])
def get[T <: Attribute: ClassTag]: Option[T] = {
val c = classTag[T].runtimeClass.asInstanceOf[Class[T]]
attributeList.reverseIterator.collectFirst[T] { case attr if c.isInstance(attr) => c.cast(attr) }
}

/**
* Scala API: Get the first (least specific) attribute of a given type parameter T `Class` or subclass thereof.
*/
def getFirst[T <: Attribute: ClassTag]: Option[T] =
getFirstAttribute(classTag[T].runtimeClass.asInstanceOf[Class[T]])
def getFirst[T <: Attribute: ClassTag]: Option[T] = {
val c = classTag[T].runtimeClass.asInstanceOf[Class[T]]
attributeList.find(c isInstance _).map(c cast _)
}

/**
* Test whether the given attribute is contained within this attributes list.
Expand Down
6 changes: 3 additions & 3 deletions akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
Expand Up @@ -632,7 +632,7 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut
override def toString = s"MapAsync.Logic(buffer=$buffer)"

//FIXME Put Supervision.stoppingDecider as a SupervisionStrategy on DefaultAttributes.mapAsync?
val decider = inheritedAttributes.getAttribute(classOf[SupervisionStrategy]).map(_.decider).getOrElse(Supervision.stoppingDecider)
val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)

val buffer = new BoundedBuffer[Holder[Try[Out]]](parallelism)
def todo = buffer.used
Expand Down Expand Up @@ -710,7 +710,7 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I
override def toString = s"MapAsyncUnordered.Logic(inFlight=$inFlight, buffer=$buffer)"

val decider =
inheritedAttributes.getAttribute(classOf[SupervisionStrategy])
inheritedAttributes.get[SupervisionStrategy]
.map(_.decider).getOrElse(Supervision.stoppingDecider)

var inFlight = 0
Expand Down Expand Up @@ -948,7 +948,7 @@ private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowS
override def initialAttributes: Attributes = DefaultAttributes.delay
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
val size =
inheritedAttributes.getAttribute(classOf[InputBuffer]) match {
inheritedAttributes.get[InputBuffer] match {
case None throw new IllegalStateException(s"Couldn't find InputBuffer Attribute for $this")
case Some(InputBuffer(min, max)) max
}
Expand Down

0 comments on commit 50aba6c

Please sign in to comment.