Skip to content

Commit

Permalink
v4.0.0-dev10
Browse files Browse the repository at this point in the history
增加事件处理结果在JVM中使用 `Collector` 的API
  • Loading branch information
ForteScarlet committed Jan 19, 2024
1 parent da33ec6 commit fe4576d
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 4 deletions.
Empty file added .changelog/v4.0.0-dev10.md
Empty file.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# v4.0.0-dev10
> [!warning]
> 这是一个尚在开发中的**预览版**,它可能不稳定,可能会频繁变更,且没有可用性保证。

> Release & Pull Notes: [v4.0.0-dev10](https://github.com/simple-robot/simpler-robot/releases/tag/v4.0.0-dev10)

# v4.0.0-dev9
> [!warning]
> 这是一个尚在开发中的**预览版**,它可能不稳定,可能会频繁变更,且没有可用性保证。
Expand Down
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/P.kt
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ sealed class P(override val group: String) : ProjectDetail() {
val versionWithoutSnapshot: Version

init {
val mainVersion = version(4, 0, 0) - version("dev9")
val mainVersion = version(4, 0, 0) - version("dev10")

fun initVersionWithoutSnapshot(status: Version?): Version = if (status == null) {
mainVersion
Expand Down
4 changes: 4 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ ksp = "1.9.22-1.0.16"
# https://square.github.io/kotlinpoet/
kotlinPoet = "1.15.3"

reactor = "3.6.2"

[libraries]
# jetbrains-annotation
jetbrains-annotations = "org.jetbrains:annotations:24.0.1"
Expand Down Expand Up @@ -102,5 +104,7 @@ ksp = { module = "com.google.devtools.ksp:symbol-processing-api", version.ref =
# https://square.github.io/kotlinpoet/interop-ksp/
kotlinPoet-ksp = { module = "com.squareup:kotlinpoet-ksp", version.ref = "kotlinPoet" }

reactor-core = { group = "io.projectreactor", name = "reactor-core", version.ref = "reactor" }

[plugins]
ksp = { id = "com.google.devtools.ksp", version.ref = "ksp" }
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ package love.forte.simbot.event
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.toCollection
import kotlinx.coroutines.future.future
import kotlinx.coroutines.launch
import kotlinx.coroutines.reactor.asFlux
import reactor.core.publisher.Flux
import java.util.concurrent.CompletableFuture
import java.util.function.Function
import java.util.stream.Collector
import java.util.stream.Collector.Characteristics.*

/**
* 推送事件并将结果转化为 [Flux].
Expand All @@ -43,6 +47,50 @@ import java.util.concurrent.CompletableFuture
public fun EventProcessor.pushAndAsFlux(event: Event): Flux<EventResult> =
push(event).asFlux()

@Suppress("UNCHECKED_CAST")
internal suspend fun <T, R> kotlinx.coroutines.flow.Flow<T>.collectBy(
scope: CoroutineScope,
collector: Collector<T, *, R>
): R {
val container = collector.supplier().get()
val accumulator = collector.accumulator() as java.util.function.BiConsumer<Any?, T>
val characteristics = collector.characteristics()

if (CONCURRENT in characteristics && UNORDERED in characteristics) {
// collect in launch
collect { result ->
scope.launch { accumulator.accept(container, result) }
}
} else {
collect { result ->
accumulator.accept(container, result)
}
}

return if (IDENTITY_FINISH in characteristics) {
container as R
} else {
(collector.finisher() as Function<Any?, R>).apply(container)
}
}

@Suppress("UNCHECKED_CAST")
internal suspend fun <T, R> kotlinx.coroutines.flow.Flow<T>.collectBy(collector: Collector<T, *, R>): R {
val container = collector.supplier().get()
val accumulator = collector.accumulator() as java.util.function.BiConsumer<Any?, T>
val characteristics = collector.characteristics()

collect { result ->
accumulator.accept(container, result)
}

return if (IDENTITY_FINISH in characteristics) {
container as R
} else {
(collector.finisher() as Function<Any?, R>).apply(container)
}
}

/**
* 推送事件并将结果收集为 [C] 后返回 [CompletableFuture].
*/
Expand All @@ -53,6 +101,16 @@ public fun <C : MutableCollection<in EventResult>> EventProcessor.pushAndCollect
): CompletableFuture<C> =
scope.future { push(event).toCollection(collection) }

/**
* 推送事件并将结果使用 [Collector] 收集为 [R] 后返回 [CompletableFuture].
*/
public fun <R> EventProcessor.pushAndCollectToAsync(
event: Event,
scope: CoroutineScope,
collector: Collector<EventResult, *, R>
): CompletableFuture<R> =
scope.future { push(event).collectBy(this, collector) }

/**
* 推送事件并将结果收集为 [List] 后返回 [CompletableFuture].
*/
Expand Down
96 changes: 94 additions & 2 deletions simbot-api/src/jvmTest/kotlin/DispatcherTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,20 @@
*/

import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import love.forte.simbot.event.collectBy
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.Executors
import java.util.function.Function
import java.util.stream.Collector
import java.util.stream.Collectors
import kotlin.test.Test
import kotlin.test.assertContains
import kotlin.test.assertEquals

/**
*
Expand All @@ -48,7 +56,7 @@ class DispatcherTests {
}.asCoroutineDispatcher()

@Test
fun test() = runBlocking {
fun test() = runTest {
flow {
println(Thread.currentThread())
emit(1)
Expand All @@ -65,4 +73,88 @@ class DispatcherTests {

}

@Test
fun flowCollectSetByTest() = runTest {
val flow = flow {
repeat(100) {
emit(it)
kotlinx.coroutines.delay(1)
}
}

val set = flow.collectBy(Collectors.toSet())
assertEquals(100, set.size)
repeat(100) {
assertContains(set, it)
}
}

@Test
fun flowCollectSetByWithScopeTest() = runTest {
coroutineScope {
val flow = flow {
repeat(100) {
emit(it)
kotlinx.coroutines.delay(1)
}
}

val concurrentMap =
flow.collectBy(this, Collectors.toConcurrentMap(Function.identity(), Function.identity()))
val keySet = concurrentMap.keys
assertEquals(100, keySet.size)
repeat(100) {
assertContains(keySet, it)
}
}
}

@Test
fun flowCollectListByTest() = runTest {
val flow = flow {
repeat(100) {
emit(it)
kotlinx.coroutines.delay(1)
}
}

val list = flow.collectBy(Collectors.toList())
assertEquals(100, list.size)
repeat(100) {
assertEquals(it, list[it])
}
}

@Test
fun flowCollectListByWithScopeTest() = runTest {
coroutineScope {
val flow = flow {
repeat(100) {
emit(it)
kotlinx.coroutines.delay(1)
}
}

val concurrentList =
flow.collectBy(
this,
Collector.of(
{ CopyOnWriteArrayList<Int>() },
{ l, v -> l.add(v) },
{ r1, r2 ->
r1.addAll(r2)
r1
},
Collector.Characteristics.IDENTITY_FINISH,
Collector.Characteristics.CONCURRENT
)
)

assertEquals(100, concurrentList.size)
repeat(100) {
assertEquals(it, concurrentList[it])
}
}
}

}
2 changes: 2 additions & 0 deletions simbot-cores/simbot-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ kotlin {
dependencies {
implementation(project(":simbot-api"))
implementation(kotlin("test-junit5"))
implementation(libs.kotlinx.coroutines.reactor)
implementation(libs.reactor.core)
}
}

Expand Down

0 comments on commit fe4576d

Please sign in to comment.