Skip to content

Commit

Permalink
Repeat like no-one is watching
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Mar 13, 2018
1 parent 51aecce commit ae5e41c
Show file tree
Hide file tree
Showing 24 changed files with 117 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.Request
import akka.stream.tck.ActorPublisherTest.TestPublisher
import org.reactivestreams.Publisher

object ActorPublisherTest {

case object Produce
Expand Down Expand Up @@ -37,6 +36,7 @@ object ActorPublisherTest {
}

}
/*
class ActorPublisherTest extends AkkaPublisherVerification[Int] {
Expand All @@ -46,3 +46,4 @@ class ActorPublisherTest extends AkkaPublisherVerification[Int] {
ActorPublisher(ref)
}
}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import akka.stream.actor.OneByOneRequestStrategy
import akka.stream.actor.RequestStrategy
import org.reactivestreams.Subscriber

/*
object ActorSubscriberOneByOneRequestTest {
class StrategySubscriber(val requestStrategy: RequestStrategy) extends ActorSubscriber {
Expand All @@ -26,3 +27,4 @@ class ActorSubscriberOneByOneRequestTest extends AkkaSubscriberBlackboxVerificat
override def createElement(element: Int): Int = element
}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package akka.stream.tck
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams.Publisher
/*
class ConcatTest extends AkkaPublisherVerification[Int] {
Expand All @@ -14,3 +15,4 @@ class ConcatTest extends AkkaPublisherVerification[Int] {
}
}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ package akka.stream.tck

import org.reactivestreams.Publisher
import akka.stream.impl.EmptyPublisher
/*
class EmptyPublisherTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = EmptyPublisher[Int]
override def maxElementsFromPublisher(): Long = 0
}
*/

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams.Publisher

class FanoutPublisherTest extends AkkaPublisherVerification[Int] {
/*class FanoutPublisherTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
val iterable: immutable.Iterable[Int] =
Expand All @@ -18,4 +18,4 @@ class FanoutPublisherTest extends AkkaPublisherVerification[Int] {
Source(iterable).runWith(Sink.asPublisher(true))
}
}
}*/
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.reactivestreams.Publisher
import org.testng.annotations.{ AfterClass, BeforeClass }
import akka.testkit.AkkaSpec

class FilePublisherTest extends AkkaPublisherVerification[ByteString] {
/*class FilePublisherTest extends AkkaPublisherVerification[ByteString] {
val ChunkSize = 256
val Elements = 1000
Expand Down Expand Up @@ -45,4 +45,4 @@ class FilePublisherTest extends AkkaPublisherVerification[ByteString] {
def after() = Files.delete(file)
override def maxElementsFromPublisher(): Long = Elements
}
}*/
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import akka.stream.scaladsl.Source
import akka.util.ConstantFun
import org.reactivestreams.Publisher

class FlattenTest extends AkkaPublisherVerification[Int] {
/*class FlattenTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
val s1 = Source(iterable(elements / 2))
val s2 = Source(iterable((elements + 1) / 2))
Source(List(s1, s2)).flatMapConcat(ConstantFun.scalaIdentityFunction).runWith(Sink.asPublisher(false))
}
}
}*/
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package akka.stream.tck

import akka.stream.scaladsl._
import org.reactivestreams.Subscriber
/*
class FoldSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
Expand All @@ -13,3 +14,4 @@ class FoldSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
override def createElement(element: Int): Int = element
}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ package akka.stream.tck
import akka.stream.scaladsl._
import org.reactivestreams.Subscriber

/*
class ForeachSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
override def createSubscriber(): Subscriber[Int] =
Flow[Int].to(Sink.foreach { _ ⇒ }).runWith(Source.asSubscriber)
override def createElement(element: Int): Int = element
}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import akka.stream._
import akka.stream.scaladsl.Flow
import org.reactivestreams.Processor

/*
class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] {
override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = {
Expand All @@ -21,3 +22,4 @@ class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] {
override def createElement(element: Int): Int = element
}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.reactivestreams._

import scala.concurrent.Promise

/*
class FuturePublisherTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
Expand All @@ -20,3 +21,4 @@ class FuturePublisherTest extends AkkaPublisherVerification[Int] {
override def maxElementsFromPublisher(): Long = 1
}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams.Publisher

class GroupByTest extends AkkaPublisherVerification[Int] {
/*class GroupByTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] =
if (elements == 0) EmptyPublisher[Int]
Expand All @@ -28,4 +28,4 @@ class GroupByTest extends AkkaPublisherVerification[Int] {
}
}
}*/
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import akka.stream.scaladsl.{ Sink, StreamConverters }
import akka.util.ByteString
import org.reactivestreams.Publisher

class InputStreamSourceTest extends AkkaPublisherVerification[ByteString] {
/*class InputStreamSourceTest extends AkkaPublisherVerification[ByteString] {
def createPublisher(elements: Long): Publisher[ByteString] = {
StreamConverters.fromInputStream(() ⇒ new InputStream {
Expand All @@ -23,5 +23,5 @@ class InputStreamSourceTest extends AkkaPublisherVerification[ByteString] {
.take(elements)
.runWith(Sink.asPublisher(false))
}
}
}*/

Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams._

class IterablePublisherTest extends AkkaPublisherVerification[Int] {
/*class IterablePublisherTest extends AkkaPublisherVerification[Int] {
override def createPublisher(elements: Long): Publisher[Int] = {
Source(iterable(elements)).runWith(Sink.asPublisher(false))
}
}
}*/
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Flow
import org.reactivestreams.Processor

/*
class MapTest extends AkkaIdentityProcessorVerification[Int] {
override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = {
Expand All @@ -18,3 +19,4 @@ class MapTest extends AkkaIdentityProcessorVerification[Int] {
override def createElement(element: Int): Int = element
}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package akka.stream.tck
import org.reactivestreams.Publisher
import akka.stream.scaladsl.{ Keep, Source, Sink }

/*
class MaybeSourceTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
Expand All @@ -16,4 +17,5 @@ class MaybeSourceTest extends AkkaPublisherVerification[Int] {
override def maxElementsFromPublisher(): Long = 1
}
*/

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams.Publisher

/*
class PrefixAndTailTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
Expand All @@ -18,3 +19,4 @@ class PrefixAndTailTest extends AkkaPublisherVerification[Int] {
}
}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,93 @@ package akka.stream.tck
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Flow
import org.junit.Test
import org.reactivestreams.Processor
import org.reactivestreams.{ Processor, Publisher, Subscriber }
import org.reactivestreams.tck.SubscriberWhiteboxVerification
import org.scalatest.WordSpecLike

class ReproducerTest extends AkkaIdentityProcessorVerification[Int] {

@Test
@throws[Throwable]
def reproduce(): Unit = {
override def required_spec109_mustIssueOnSubscribeForNonNullSubscriber(): Unit = {
println("Reproducer")
var i = 0
while (i < 100000) {
System.out.println("Run " + i)
this.required_spec109_mustIssueOnSubscribeForNonNullSubscriber()
println("Run " + i)
super.required_spec109_mustIssueOnSubscribeForNonNullSubscriber()
i += 1
}
}

override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = {
implicit val materializer = ActorMaterializer()(system)

Flow[Int].toProcessor.run()
}

override def createElement(element: Int): Int = element

/*
override def optional_spec111_maySupportMultiSubscribe(): Unit = ()
override def untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError(): Unit = ()
override def required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall(): Unit = ()
override def optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne(): Unit = ()
override def untested_spec202_shouldAsynchronouslyDispatch(): Unit = ()
override def untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled(): Unit = ()
override def required_exerciseWhiteboxHappyPath(): Unit = ()
// override def required_spec109_mustIssueOnSubscribeForNonNullSubscriber(): Unit = ()
override def untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError(): Unit = ()
override def untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists(): Unit = ()
override def required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling(): Unit = ()
override def required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete(): Unit = ()
override def required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe(): Unit = ()
override def required_spec308_requestMustRegisterGivenNumberElementsToBeProduced(): Unit = ()
override def optional_spec104_mustSignalOnErrorWhenFails(): Unit = ()
override def required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull(): Unit = ()
override def untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError(): Unit = ()
override def untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals(): Unit = ()
override def required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall(): Unit = ()
override def untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents(): Unit = ()
override def required_spec317_mustSupportAPendingElementCountUpToLongMaxValue(): Unit = ()
override def required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops(): Unit = ()
override def untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber(): Unit = ()
override def optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage(): Unit = ()
override def untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation(): Unit = ()
override def stochastic_spec103_mustSignalOnMethodsSequentially(): Unit = ()
override def required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError(): Unit = ()
override def required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal(): Unit = ()
override def required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe(): Unit = ()
override def required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull(): Unit = ()
override def untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber(): Unit = ()
override def required_validate_maxElementsFromPublisher(): Unit = ()
override def untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid(): Unit = ()
override def required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall(): Unit = ()
override def required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue(): Unit = ()
override def required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber(): Unit = ()
override def untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice(): Unit = ()
override def required_validate_boundedDepthOfOnNextAndRequestRecursion(): Unit = ()
override def optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront(): Unit = ()
override def optional_spec105_emptyStreamMustTerminateBySignallingOnComplete(): Unit = ()
override def required_spec309_requestNegativeNumberMustSignalIllegalArgumentException(): Unit = ()
override def required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo(): Unit = ()
override def untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled(): Unit = ()
override def required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel(): Unit = ()
override def optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected(): Unit = ()
override def required_createPublisher3MustProduceAStreamOfExactly3Elements(): Unit = ()
override def untested_spec213_failingOnSignalInvocation(): Unit = ()
override def required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements(): Unit = ()
override def required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull(): Unit = ()
override def untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation(): Unit = ()
override def required_spec303_mustNotAllowUnboundedRecursion(): Unit = ()
override def required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall(): Unit = ()
override def untested_spec301_mustNotBeCalledOutsideSubscriberContext(): Unit = ()
override def untested_spec304_requestShouldNotPerformHeavyComputations(): Unit = ()
override def required_spec109_subscribeThrowNPEOnNullSubscriber(): Unit = ()
override def required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates(): Unit = ()
override def untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization(): Unit = ()
override def required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue(): Unit = ()
override def required_spec201_mustSignalDemandViaSubscriptionRequest(): Unit = ()
override def optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals(): Unit = ()
override def required_spec309_requestZeroMustSignalIllegalArgumentException(): Unit = ()
override def required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled(): Unit = ()
override def required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError(): Unit = ()
override def required_spec306_afterSubscriptionIsCancelledRequestMustBeNops(): Unit = ()
override def required_spec102_maySignalLessThanRequestedAndTerminateSubscription(): Unit = ()
*/
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source

import org.reactivestreams.Publisher
/*
class SingleElementSourceTest extends AkkaPublisherVerification[Int] {
Expand All @@ -15,4 +16,5 @@ class SingleElementSourceTest extends AkkaPublisherVerification[Int] {
override def maxElementsFromPublisher(): Long = 1
}
*/

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import java.lang.{ Integer ⇒ JInt }
import scala.concurrent.Promise
import org.reactivestreams.{ Subscription, Subscriber }

class SinkholeSubscriberTest extends SubscriberWhiteboxVerification[JInt](new TestEnvironment()) with TestNGSuiteLike {
/*class SinkholeSubscriberTest extends SubscriberWhiteboxVerification[JInt](new TestEnvironment()) with TestNGSuiteLike {
override def createSubscriber(probe: WhiteboxSubscriberProbe[JInt]): Subscriber[JInt] = {
new Subscriber[JInt] {
val hole = new SinkholeSubscriber[JInt](Promise[Done]())
Expand Down Expand Up @@ -43,5 +43,5 @@ class SinkholeSubscriberTest extends SubscriberWhiteboxVerification[JInt](new Te
}
override def createElement(element: Int): JInt = element
}
}*/

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams.Publisher

/*
class SplitWhenTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] =
Expand All @@ -28,3 +29,4 @@ class SplitWhenTest extends AkkaPublisherVerification[Int] {
}
}
*/

0 comments on commit ae5e41c

Please sign in to comment.