Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,29 @@
*/

plugins {
id 'com.gradle.build-scan' version '1.16'
id 'io.spring.dependency-management' version '1.0.6.RELEASE' apply false
id 'com.gradle.build-scan' version '2.3'
id 'io.spring.dependency-management' version '1.0.8.RELEASE' apply false
id 'com.jfrog.bintray' version '1.8.4' apply false
id 'com.jfrog.artifactory' version '4.7.3' apply false
id 'org.jetbrains.kotlin.jvm' version '1.2.71' apply false
id 'com.jfrog.artifactory' version '4.9.7' apply false
id 'org.jetbrains.kotlin.jvm' version '1.3.31' apply false
}

subprojects {
apply plugin: 'java-library'
apply plugin: 'kotlin'
apply plugin: 'io.spring.dependency-management'

ext['netty-buffer.version'] = '4.1.30.Final'
ext['netty-buffer.version'] = '4.1.37.Final'
ext['jsr305.version'] = '3.0.2'
ext['kotlin-stdlib-jdk7.version'] = '1.2.71'
ext['rxjava.version'] = '2.2.2'
ext['kotlin-stdlib-jdk7.version'] = '1.3.41'
ext['rxjava.version'] = '2.2.10'
ext['slf4j-api.version'] = '1.7.25'
ext['junit.version'] = '4.12'
ext['mockito.version'] = '2.23.0'
ext['mockito.version'] = '2.28.2'
ext['hamcrest-library.version'] = '1.3'
ext['slf4j-log4j.version'] = '1.7.25'
ext['reactor-netty.version'] = '0.7.10.RELEASE'
ext['okhttp.version'] = '3.11.0'
ext['reactor-netty.version'] = '0.7.15.RELEASE'
ext['okhttp.version'] = '3.14.2'

dependencyManagement {

Expand Down Expand Up @@ -104,6 +104,6 @@ apply from: 'gradle/bintray.gradle'
description = 'RSocket-kotlin: Reactive Streams over network boundary with Kotlin/Rxjava'

buildScan {
licenseAgreementUrl = 'https://gradle.com/terms-of-service'
licenseAgree = 'yes'
termsOfServiceUrl = 'https://gradle.com/terms-of-service'
termsOfServiceAgree = 'yes'
}
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@

release.scope=patch
version=0.9.7-SNAPSHOT
org.gradle.parallel=false
org.gradle.configureondemand=false
org.gradle.parallel=true
org.gradle.configureondemand=true

2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.7-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-5.5-all.zip
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import io.reactivex.Single
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable
import io.reactivex.processors.UnicastProcessor
import io.reactivex.schedulers.Schedulers
import io.rsocket.kotlin.*
import io.rsocket.kotlin.transport.netty.client.TcpClientTransport
import io.rsocket.kotlin.transport.netty.server.NettyContextCloseable
Expand All @@ -33,6 +32,7 @@ import org.junit.Test
import org.reactivestreams.Publisher
import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit
import kotlin.math.max

class InteractionsStressTest {
private lateinit var server: NettyContextCloseable
Expand Down Expand Up @@ -75,10 +75,10 @@ class InteractionsStressTest {
interaction(
{ payload -> payload.matches("response") },
{
it.flatMapSingle { num ->
it.flatMapSingle( { num ->
client.requestResponse(
DefaultPayload.text("response$num"))
}
}, false, interactionConcurrency)
})
}

Expand All @@ -87,10 +87,10 @@ class InteractionsStressTest {
interaction(
{ payload -> payload.matches("stream") },
{
it.flatMap { num ->
it.flatMap ({ num ->
client.requestStream(
DefaultPayload.text("stream$num"))
}
}, false, interactionConcurrency)
})
}

Expand All @@ -99,10 +99,10 @@ class InteractionsStressTest {
interaction(
{ payload -> payload.matches("channel") },
{
it.flatMap { num ->
it.flatMap ({ num ->
client.requestChannel(
Flowable.just(DefaultPayload.text("channel$num")))
}
},false, interactionConcurrency)
})
}

Expand All @@ -116,8 +116,8 @@ class InteractionsStressTest {

val errors = UnicastProcessor.create<Long>()
val disposable = CompositeDisposable()
repeat(threadsNum()) {
disposable += interaction(source().observeOn(Schedulers.io()))
repeat(interactionCount) {
disposable += interaction(source())
.subscribe({ res ->
if (!pred(res)) {
errors.onError(
Expand All @@ -142,8 +142,6 @@ class InteractionsStressTest {

}

private fun threadsNum() = Runtime.getRuntime().availableProcessors()

internal class TestHandler
: AbstractRSocket() {

Expand All @@ -170,11 +168,15 @@ class InteractionsStressTest {

companion object {
private fun source() =
Flowable.interval(intervalMillis, TimeUnit.MICROSECONDS)
Flowable.interval(intervalMillis, TimeUnit.MILLISECONDS)
.onBackpressureDrop()

private const val intervalMillis: Long = 100
private const val intervalMillis: Long = 1

private const val testDuration = 20L

private val interactionCount = max(1, Runtime.getRuntime().availableProcessors() / 2)

private const val interactionConcurrency = 4
}
}