Skip to content

Commit

Permalink
v4.0.0-dev11
Browse files Browse the repository at this point in the history
增加事件处理结果在JVM中的扩展
  • Loading branch information
ForteScarlet committed Jan 19, 2024
1 parent fe4576d commit ac2bcf3
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 3 deletions.
Empty file added .changelog/v4.0.0-dev11.md
Empty file.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# v4.0.0-dev11
> [!warning]
> 这是一个尚在开发中的**预览版**,它可能不稳定,可能会频繁变更,且没有可用性保证。

> Release & Pull Notes: [v4.0.0-dev11](https://github.com/simple-robot/simpler-robot/releases/tag/v4.0.0-dev11)

# v4.0.0-dev10
> [!warning]
> 这是一个尚在开发中的**预览版**,它可能不稳定,可能会频繁变更,且没有可用性保证。
Expand Down
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/P.kt
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ sealed class P(override val group: String) : ProjectDetail() {
val versionWithoutSnapshot: Version

init {
val mainVersion = version(4, 0, 0) - version("dev10")
val mainVersion = version(4, 0, 0) - version("dev11")

fun initVersionWithoutSnapshot(status: Version?): Version = if (status == null) {
mainVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,18 @@ import kotlinx.coroutines.flow.toCollection
import kotlinx.coroutines.future.future
import kotlinx.coroutines.launch
import kotlinx.coroutines.reactor.asFlux
import love.forte.simbot.common.collection.asIterator
import love.forte.simbot.suspendrunner.runInNoScopeBlocking
import reactor.core.publisher.Flux
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.function.Function
import java.util.stream.Collector
import java.util.stream.Collector.Characteristics.*
import java.util.stream.Stream
import java.util.stream.StreamSupport
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* 推送事件并将结果转化为 [Flux].
Expand All @@ -50,6 +57,7 @@ public fun EventProcessor.pushAndAsFlux(event: Event): Flux<EventResult> =
@Suppress("UNCHECKED_CAST")
internal suspend fun <T, R> kotlinx.coroutines.flow.Flow<T>.collectBy(
scope: CoroutineScope,
launchContext: CoroutineContext = EmptyCoroutineContext,
collector: Collector<T, *, R>
): R {
val container = collector.supplier().get()
Expand All @@ -59,7 +67,7 @@ internal suspend fun <T, R> kotlinx.coroutines.flow.Flow<T>.collectBy(
if (CONCURRENT in characteristics && UNORDERED in characteristics) {
// collect in launch
collect { result ->
scope.launch { accumulator.accept(container, result) }
scope.launch(launchContext) { accumulator.accept(container, result) }
}
} else {
collect { result ->
Expand Down Expand Up @@ -91,6 +99,7 @@ internal suspend fun <T, R> kotlinx.coroutines.flow.Flow<T>.collectBy(collector:
}
}

//region async
/**
* 推送事件并将结果收集为 [C] 后返回 [CompletableFuture].
*/
Expand All @@ -109,7 +118,7 @@ public fun <R> EventProcessor.pushAndCollectToAsync(
scope: CoroutineScope,
collector: Collector<EventResult, *, R>
): CompletableFuture<R> =
scope.future { push(event).collectBy(this, collector) }
scope.future { push(event).collectBy(scope = this, collector = collector) }

/**
* 推送事件并将结果收集为 [List] 后返回 [CompletableFuture].
Expand All @@ -118,3 +127,40 @@ public fun EventProcessor.pushAndCollectToListAsync(
event: Event,
scope: CoroutineScope
): CompletableFuture<out List<EventResult>> = pushAndCollectToAsync(event, scope, ArrayList())
//endregion


//region block
/**
* 推送事件并将结果转化为 [Stream] 后返回。
*/
public fun EventProcessor.pushAndAsStream(event: Event, scope: CoroutineScope): Stream<EventResult> {
val iterator = push(event).asIterator(
scope,
hasNext = { runInNoScopeBlocking { hasNext() } },
next = { runInNoScopeBlocking { next() } })

return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
}


/**
* 推送事件并将结果收集为 [C] 后返回。
*/
public fun <C : MutableCollection<in EventResult>> EventProcessor.pushAndCollectToBlocking(
event: Event,
collection: C
): C = runInNoScopeBlocking { push(event).toCollection(collection) }

/**
* 推送事件并将结果使用 [Collector] 收集为 [R] 后返回。
*/
public fun <R> EventProcessor.pushAndCollectToBlocking(event: Event, collector: Collector<EventResult, *, R>): R =
runInNoScopeBlocking { push(event).collectBy(collector = collector) }

/**
* 推送事件并将结果收集为 [List] 后返回。
*/
public fun EventProcessor.pushAndCollectToListBlocking(event: Event): List<EventResult> =
pushAndCollectToBlocking(event, ArrayList())
//endregion

0 comments on commit ac2bcf3

Please sign in to comment.