Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add j.u.c.Flow #4792

Merged
merged 1 commit into from Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 38 additions & 0 deletions javalib/src/main/scala/java/util/concurrent/Flow.scala
@@ -0,0 +1,38 @@
/*
* Scala.js (https://www.scala-js.org/)
*
* Copyright EPFL.
*
* Licensed under Apache License 2.0
* (https://www.apache.org/licenses/LICENSE-2.0).
*
* See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.
*/

package java.util.concurrent

object Flow {

@inline def defaultBufferSize(): Int = 256

trait Processor[T, R] extends Subscriber[T] with Publisher[R]

@FunctionalInterface
trait Publisher[T] {
sjrd marked this conversation as resolved.
Show resolved Hide resolved
def subscribe(subscriber: Subscriber[_ >: T]): Unit
}

trait Subscriber[T] {
def onSubscribe(subscription: Subscription): Unit
def onNext(item: T): Unit
def onError(throwable: Throwable): Unit
def onComplete(): Unit
}

trait Subscription {
def request(n: Long): Unit
def cancel(): Unit
}

}
@@ -0,0 +1,90 @@
/*
* Scala.js (https://www.scala-js.org/)
*
* Copyright EPFL.
*
* Licensed under Apache License 2.0
* (https://www.apache.org/licenses/LICENSE-2.0).
*
* See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.
*/

package org.scalajs.testsuite.javalib.util.concurrent

import java.util.concurrent.Flow

import org.junit.Test
import org.junit.Assert._

class FlowTest {
import FlowTest._

@Test def testDefaultBufferSize(): Unit =
assertEquals(256, Flow.defaultBufferSize())

@Test def testProcessor(): Unit = {
val processor = makeProcessor[Int, String]()
processor.subscribe(makeSubscriber[String]())
processor.onSubscribe(makeSubscription())
processor.onNext(42)
processor.onError(new Exception)
processor.onComplete()
}

@Test def testPublisher(): Unit = {
val publisher = makePublisher[Int]()
publisher.subscribe(makeSubscriber[Int]())
}

@Test def testSubscriber(): Unit = {
val subscriber = makeSubscriber[Int]()
subscriber.onSubscribe(makeSubscription())
subscriber.onNext(42)
subscriber.onError(new Exception)
subscriber.onComplete()
}

@Test def testSubscription(): Unit = {
val subscription = makeSubscription()
subscription.request(42)
subscription.cancel()
}

}

object FlowTest {

def makeProcessor[T, R](): Flow.Processor[T, R] = {
new Flow.Processor[T, R] {
def subscribe(subscriber: Flow.Subscriber[_ >: R]): Unit = ()
def onSubscribe(subscription: Flow.Subscription): Unit = ()
def onNext(item: T): Unit = ()
def onError(throwable: Throwable): Unit = ()
def onComplete(): Unit = ()
}
}

def makePublisher[T](): Flow.Publisher[T] = {
new Flow.Publisher[T] {
def subscribe(subscriber: Flow.Subscriber[_ >: T]): Unit = ()
}
}

def makeSubscriber[T](): Flow.Subscriber[T] = {
new Flow.Subscriber[T] {
def onSubscribe(subscription: Flow.Subscription): Unit = ()
def onNext(item: T): Unit = ()
def onError(throwable: Throwable): Unit = ()
def onComplete(): Unit = ()
}
}

def makeSubscription(): Flow.Subscription = {
new Flow.Subscription {
def request(n: Long): Unit = ()
def cancel(): Unit = ()
}
}

}