Skip to content

Commit

Permalink
Add j.u.c.Flow
Browse files Browse the repository at this point in the history
  • Loading branch information
armanbilge committed Jan 19, 2023
1 parent 0f1fcb2 commit f177c4e
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 0 deletions.
36 changes: 36 additions & 0 deletions javalib/src/main/scala/java/util/concurrent/Flow.scala
@@ -0,0 +1,36 @@
/*
* Scala.js (https://www.scala-js.org/)
*
* Copyright EPFL.
*
* Licensed under Apache License 2.0
* (https://www.apache.org/licenses/LICENSE-2.0).
*
* See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.
*/

package java.util.concurrent

object Flow {

trait Processor[T, R] extends Subscriber[T] with Publisher[R]

@FunctionalInterface
trait Publisher[T] {
def subscribe(subscriber: Subscriber[_ >: T]): Unit
}

trait Subscriber[T] {
def onSubscribe(subscription: Subscription): Unit
def onNext(item: T): Unit
def onError(throwable: Throwable): Unit
def onComplete(): Unit
}

trait Subscription {
def request(n: Long): Unit
def cancel(): Unit
}

}
@@ -0,0 +1,83 @@
/*
* Scala.js (https://www.scala-js.org/)
*
* Copyright EPFL.
*
* Licensed under Apache License 2.0
* (https://www.apache.org/licenses/LICENSE-2.0).
*
* See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.
*/

package org.scalajs.testsuite.javalib.util.concurrent

import java.util.concurrent.Flow

import org.junit.Test
import org.junit.Assert._

class FlowTest {
import FlowTest._

@Test def testProcessor(): Unit = {
val processor = makeProcessor[Int, String]()
processor.subscribe(makeSubscriber[String]())
processor.onSubscribe(makeSubscription())
processor.onNext(42)
processor.onError(new Exception)
processor.onComplete()
}

@Test def testPublisher(): Unit = {
val publisher = makePublisher[Int]()
publisher.subscribe(makeSubscriber[Int]())
}

@Test def testSubscriber(): Unit = {
val subscriber = makeSubscriber[Int]()
subscriber.onSubscribe(makeSubscription())
subscriber.onNext(42)
subscriber.onError(new Exception)
subscriber.onComplete()
}

@Test def testSubscription(): Unit = {
val subscription = makeSubscription()
subscription.request(42)
subscription.cancel()
}

}

object FlowTest {

def makeProcessor[T, R](): Flow.Processor[T, R] =
new Flow.Processor[T, R] {
def subscribe(subscriber: Flow.Subscriber[_ >: R]): Unit = ()
def onSubscribe(subscription: Flow.Subscription): Unit = ()
def onNext(item: T): Unit = ()
def onError(throwable: Throwable): Unit = ()
def onComplete(): Unit = ()
}

def makePublisher[T](): Flow.Publisher[T] =
new Flow.Publisher[T] {
def subscribe(subscriber: Flow.Subscriber[_ >: T]): Unit = ()
}

def makeSubscriber[T](): Flow.Subscriber[T] =
new Flow.Subscriber[T] {
def onSubscribe(subscription: Flow.Subscription): Unit = ()
def onNext(item: T): Unit = ()
def onError(throwable: Throwable): Unit = ()
def onComplete(): Unit = ()
}

def makeSubscription(): Flow.Subscription =
new Flow.Subscription {
def request(n: Long): Unit = ()
def cancel(): Unit = ()
}

}

0 comments on commit f177c4e

Please sign in to comment.