From 03af77c135f7c8b22b3dec2a10a16db9dbe25022 Mon Sep 17 00:00:00 2001 From: Oleg Yukhnevich Date: Sat, 3 Oct 2020 22:29:58 +0300 Subject: [PATCH] Kotlin/Native support Reworked TCP implementation Multiplatform Synchronized IntMap implementation --- .github/workflows/gradle-all.yml | 63 +++-- .github/workflows/gradle-main.yml | 58 ++-- .github/workflows/gradle-pr.yml | 31 ++- .github/workflows/gradle-release.yml | 68 +++-- build.gradle.kts | 182 ++++++++++--- examples/multiplatform-chat/build.gradle.kts | 21 ++ .../src/clientJsMain/kotlin/App.kt | 2 +- .../src/clientJvmMain/kotlin/AppTCP.kt | 34 +++ .../clientJvmMain/kotlin/{App.kt => AppWS.kt} | 4 +- .../src/clientMain/kotlin/Api.kt | 13 +- .../src/clientNativeMain/kotlin/AppTCP.kt | 35 +++ .../src/serverJvmMain/kotlin/App.kt | 35 ++- gradle.properties | 7 +- playground/build.gradle.kts | 12 + .../src/nativeMain/kotlin/TcpApp.kt | 13 +- rsocket-core/build.gradle.kts | 7 +- .../io/rsocket/kotlin/internal/IntMap.kt | 254 ++++++++++++++++++ .../rsocket/kotlin/internal/RSocketState.kt | 10 +- .../io/rsocket/kotlin/internal/StreamId.kt | 2 +- .../RequestChannelRequesterFlowCollector.kt | 7 +- .../kotlin/keepalive/KeepAliveHandler.kt | 6 +- .../io/rsocket/kotlin/logging/NoopLogger.kt | 27 ++ .../io/rsocket/kotlin/logging/PrintLogger.kt | 34 +++ .../io/rsocket/kotlin/internal/IntMapTest.kt | 246 +++++++++++++++++ .../kotlin/internal/RSocketRequesterTest.kt | 8 +- .../io/rsocket/kotlin/internal/RSocketTest.kt | 7 +- .../rsocket/kotlin/internal/StreamIdTest.kt | 2 +- .../io/rsocket/kotlin/internal/IntMap.kt | 59 ++++ .../io/rsocket/kotlin/internal/IntMap.kt | 59 ++++ .../io/rsocket/kotlin/internal/IntMap.kt | 51 ++++ .../kotlin/logging/DefaultLoggerFactory.kt} | 4 +- rsocket-test/build.gradle.kts | 7 +- .../io/rsocket/kotlin/test/SuspendTest.kt | 6 +- .../io/rsocket/kotlin/test/Test.common.kt | 10 +- .../io/rsocket/kotlin/test/TestConnection.kt | 7 +- .../io/rsocket/kotlin/test/TestPacketStore.kt | 9 +- .../io/rsocket/kotlin/test/TransportTest.kt | 17 +- .../kotlin/io/rsocket/kotlin/test/Test.kt | 12 +- .../io/rsocket/kotlin/test/TestPacketStore.kt | 29 ++ .../kotlin/io/rsocket/kotlin/test/Test.kt | 18 +- .../io/rsocket/kotlin/test/TestPacketStore.kt | 29 ++ .../kotlin/io/rsocket/kotlin/test/Test.kt | 41 +++ .../io/rsocket/kotlin/test/TestPacketStore.kt | 36 +++ rsocket-transport-ktor/build.gradle.kts | 13 +- .../build.gradle.kts | 3 - .../build.gradle.kts | 2 - .../kotlin/connection/KtorTcpConnection.kt | 48 +++- .../io/rsocket/kotlin/TcpTransportTest.kt | 56 ++++ .../io/rsocket/kotlin/JvmTcpTransportTest.kt | 29 ++ .../io/rsocket/kotlin/TcpTransportTest.kt | 50 ---- .../rsocket/kotlin/WebSocketTransportTest.kt | 18 +- .../rsocket/kotlin/NativeTcpTransportTest.kt | 29 ++ rsocket-transport-local/build.gradle.kts | 3 - settings.gradle.kts | 11 + 54 files changed, 1585 insertions(+), 259 deletions(-) create mode 100644 examples/multiplatform-chat/src/clientJvmMain/kotlin/AppTCP.kt rename examples/multiplatform-chat/src/clientJvmMain/kotlin/{App.kt => AppWS.kt} (93%) create mode 100644 examples/multiplatform-chat/src/clientNativeMain/kotlin/AppTCP.kt rename rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/ConcurrentMap.kt => playground/src/nativeMain/kotlin/TcpApp.kt (72%) create mode 100644 rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt create mode 100644 rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/NoopLogger.kt create mode 100644 rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/PrintLogger.kt create mode 100644 rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/IntMapTest.kt create mode 100644 rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt create mode 100644 rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt create mode 100644 rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt rename rsocket-core/src/{commonMain/kotlin/io/rsocket/kotlin/internal/ConcurrentMap.kt => nativeMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt} (86%) rename rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/ConcurrentMap.kt => rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestPacketStore.kt (78%) create mode 100644 rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/TestPacketStore.kt create mode 100644 rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/TestPacketStore.kt create mode 100644 rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/Test.kt create mode 100644 rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/TestPacketStore.kt create mode 100644 rsocket-transport-ktor/src/commonTest/kotlin/io/rsocket/kotlin/TcpTransportTest.kt create mode 100644 rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/JvmTcpTransportTest.kt delete mode 100644 rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/TcpTransportTest.kt create mode 100644 rsocket-transport-ktor/src/nativeTest/kotlin/io/rsocket/kotlin/NativeTcpTransportTest.kt diff --git a/.github/workflows/gradle-all.yml b/.github/workflows/gradle-all.yml index ef44512b2..c44f4a89c 100644 --- a/.github/workflows/gradle-all.yml +++ b/.github/workflows/gradle-all.yml @@ -9,35 +9,60 @@ on: jobs: build: + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ macos-latest ] + macTargetsCompilation: [ macos, ios, watchos, tvos ] + include: + - os: ubuntu-20.04 + - os: windows-latest + fail-fast: false - runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v2 + - name: Set up JDK + uses: actions/setup-java@v1 + with: + java-version: 11 + - name: Cache Gradle packages + uses: actions/cache@v2 + with: + path: | + ~/.gradle/caches/modules-2 + ~/.konan + key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle.*') }} #hash based on *.gradle.kts and *.gradle.properties + restore-keys: ${{ runner.os }}-gradle + - uses: eskatos/gradle-command-action@v1 + with: + arguments: build -PmacTargetsCompilation=${{ matrix.macTargetsCompilation }} --scan --no-daemon + publish: + needs: build + runs-on: ${{ matrix.os }} strategy: matrix: - jdk: [ 1.8, 11, 14 ] + os: [ ubuntu-20.04, macos-latest, windows-latest ] fail-fast: false steps: - uses: actions/checkout@v2 - - name: Set up JDK ${{ matrix.jdk }} + - name: Set up JDK uses: actions/setup-java@v1 with: - java-version: ${{ matrix.jdk }} + java-version: 11 - name: Cache Gradle packages - uses: actions/cache@v1 + uses: actions/cache@v2 with: - path: ~/.gradle/caches - key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }} + path: | + ~/.gradle/caches/modules-2 + ~/.konan + key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle.*') }} #hash based on *.gradle.kts and *.gradle.properties restore-keys: ${{ runner.os }}-gradle - - name: Grant execute permission for gradlew - run: chmod +x gradlew - - name: Build with Gradle - run: ./gradlew clean build --info - - name: Publish Packages to Artifactory - if: ${{ matrix.jdk == '1.8' }} - run: ./gradlew -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PversionSuffix="-${githubRef#refs/heads/}-SNAPSHOT" -PbuildNumber="${buildNumber}" artifactoryPublish --stacktrace - env: - bintrayUser: ${{ secrets.bintrayUser }} - bintrayKey: ${{ secrets.bintrayKey }} - githubRef: ${{ github.ref }} - buildNumber: ${{ github.run_number }} + - if: ${{ matrix.os == 'windows-latest' }} + run: echo ("::set-env name=BRANCH_NAME::" + $env:GITHUB_REF.replace('refs/heads/', '')) + - if: ${{ matrix.os != 'windows-latest' }} + run: echo "##[set-env name=BRANCH_NAME;]$(echo ${GITHUB_REF#refs/heads/})" + - uses: eskatos/gradle-command-action@v1 + with: + arguments: artifactoryPublish -PbintrayUser=${{ secrets.bintrayUser }} -PbintrayKey=${{ secrets.bintrayKey }} -PversionSuffix=-${{ env.BRANCH_NAME }}-SNAPSHOT -PbuildNumber=${{ github.run_number }} --stacktrace --no-daemon diff --git a/.github/workflows/gradle-main.yml b/.github/workflows/gradle-main.yml index c943299e1..8dc53bd3b 100644 --- a/.github/workflows/gradle-main.yml +++ b/.github/workflows/gradle-main.yml @@ -9,34 +9,56 @@ on: jobs: build: + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ macos-latest ] + macTargetsCompilation: [ macos, ios, watchos, tvos ] + include: + - os: ubuntu-20.04 + - os: windows-latest + fail-fast: false - runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v2 + - name: Set up JDK + uses: actions/setup-java@v1 + with: + java-version: 11 + - name: Cache Gradle packages + uses: actions/cache@v2 + with: + path: | + ~/.gradle/caches/modules-2 + ~/.konan + key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle.*') }} #hash based on *.gradle.kts and *.gradle.properties + restore-keys: ${{ runner.os }}-gradle + - uses: eskatos/gradle-command-action@v1 + with: + arguments: build -PmacTargetsCompilation=${{ matrix.macTargetsCompilation }} --scan --no-daemon + publish: + needs: build + runs-on: ${{ matrix.os }} strategy: matrix: - jdk: [ 1.8, 11, 14 ] + os: [ ubuntu-20.04, macos-latest, windows-latest ] fail-fast: false steps: - uses: actions/checkout@v2 - - name: Set up JDK ${{ matrix.jdk }} + - name: Set up JDK uses: actions/setup-java@v1 with: - java-version: ${{ matrix.jdk }} + java-version: 11 - name: Cache Gradle packages - uses: actions/cache@v1 + uses: actions/cache@v2 with: - path: ~/.gradle/caches - key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }} + path: | + ~/.gradle/caches/modules-2 + ~/.konan + key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle.*') }} #hash based on *.gradle.kts and *.gradle.properties restore-keys: ${{ runner.os }}-gradle - - name: Grant execute permission for gradlew - run: chmod +x gradlew - - name: Build with Gradle - run: ./gradlew clean build - - name: Publish Packages to Artifactory - if: ${{ matrix.jdk == '1.8' }} - run: ./gradlew -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PversionSuffix="-SNAPSHOT" -PbuildNumber="${buildNumber}" artifactoryPublish --stacktrace - env: - bintrayUser: ${{ secrets.bintrayUser }} - bintrayKey: ${{ secrets.bintrayKey }} - buildNumber: ${{ github.run_number }} + - uses: eskatos/gradle-command-action@v1 + with: + arguments: artifactoryPublish -PbintrayUser=${{ secrets.bintrayUser }} -PbintrayKey=${{ secrets.bintrayKey }} -PversionSuffix=-SNAPSHOT -PbuildNumber=${{ github.run_number }} --stacktrace --no-daemon diff --git a/.github/workflows/gradle-pr.yml b/.github/workflows/gradle-pr.yml index 9b537b79d..64c0f51ee 100644 --- a/.github/workflows/gradle-pr.yml +++ b/.github/workflows/gradle-pr.yml @@ -1,30 +1,33 @@ name: Pull Request CI -on: [pull_request] +on: [ pull_request ] jobs: build: - - runs-on: ubuntu-20.04 - + runs-on: ${{ matrix.os }} strategy: matrix: - jdk: [ 1.8, 11, 14 ] + os: [ macos-latest ] + macTargetsCompilation: [ macos, ios, watchos, tvos ] + include: + - os: ubuntu-20.04 + - os: windows-latest fail-fast: false steps: - uses: actions/checkout@v2 - - name: Set up JDK ${{ matrix.jdk }} + - name: Set up JDK uses: actions/setup-java@v1 with: - java-version: ${{ matrix.jdk }} + java-version: 11 - name: Cache Gradle packages - uses: actions/cache@v1 + uses: actions/cache@v2 with: - path: ~/.gradle/caches - key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }} + path: | + ~/.gradle/caches/modules-2 + ~/.konan + key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle.*') }} #hash based on *.gradle.kts and *.gradle.properties restore-keys: ${{ runner.os }}-gradle - - name: Grant execute permission for gradlew - run: chmod +x gradlew - - name: Build with Gradle - run: ./gradlew clean build + - uses: eskatos/gradle-command-action@v1 + with: + arguments: build -PmacTargetsCompilation=${{ matrix.macTargetsCompilation }} --scan --no-daemon diff --git a/.github/workflows/gradle-release.yml b/.github/workflows/gradle-release.yml index 8732fce5e..c67b5fe62 100644 --- a/.github/workflows/gradle-release.yml +++ b/.github/workflows/gradle-release.yml @@ -8,32 +8,62 @@ on: - '*' # Push events to matching *, i.e. 1.0, 20.15.10 jobs: - publish: + build: + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ macos-latest ] + macTargetsCompilation: [ macos, ios, watchos, tvos ] + include: + - os: ubuntu-20.04 + - os: windows-latest + fail-fast: false + + steps: + - uses: actions/checkout@v2 + - name: Set up JDK + uses: actions/setup-java@v1 + with: + java-version: 11 + - name: Cache Gradle packages + uses: actions/cache@v2 + with: + path: | + ~/.gradle/caches/modules-2 + ~/.konan + key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle.*') }} #hash based on *.gradle.kts and *.gradle.properties + restore-keys: ${{ runner.os }}-gradle + - uses: eskatos/gradle-command-action@v1 + with: + arguments: build -PmacTargetsCompilation=${{ matrix.macTargetsCompilation }} --scan --no-daemon - runs-on: ubuntu-20.04 + publish: + needs: build + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ ubuntu-20.04, macos-latest, windows-latest ] + fail-fast: false steps: - uses: actions/checkout@v2 - - name: Set up JDK 1.8 + - name: Set up JDK uses: actions/setup-java@v1 with: - java-version: 1.8 + java-version: 11 - name: Cache Gradle packages - uses: actions/cache@v1 + uses: actions/cache@v2 with: - path: ~/.gradle/caches - key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }} + path: | + ~/.gradle/caches/modules-2 + ~/.konan + key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle.*') }} #hash based on *.gradle.kts and *.gradle.properties restore-keys: ${{ runner.os }}-gradle - - name: Grant execute permission for gradlew - run: chmod +x gradlew - - name: Build with Gradle - run: ./gradlew clean build + - if: ${{ matrix.os == 'windows-latest' }} + run: echo ("::set-env name=TAG_NAME::" + $env:GITHUB_REF.replace('refs/tags/', '')) + - if: ${{ matrix.os != 'windows-latest' }} + run: echo "##[set-env name=TAG_NAME;]$(echo ${GITHUB_REF#refs/tags/})" - name: Publish Packages to Bintray - run: ./gradlew -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" -Pversion="${githubRef#refs/tags/}" -PbuildNumber="${buildNumber}" bintrayUpload - env: - bintrayUser: ${{ secrets.bintrayUser }} - bintrayKey: ${{ secrets.bintrayKey }} - sonatypeUsername: ${{ secrets.sonatypeUsername }} - sonatypePassword: ${{ secrets.sonatypePassword }} - githubRef: ${{ github.ref }} - buildNumber: ${{ github.run_number }} + uses: eskatos/gradle-command-action@v1 + with: + arguments: bintrayUpload -PbintrayUser=${{ secrets.bintrayUser }} -PbintrayKey=${{ secrets.bintrayKey }} -PsonatypeUsername=${{ secrets.sonatypeUsername }} -PsonatypePassword=${{ secrets.sonatypePassword }} -Pversion=${{ env.TAG_NAME }} -PbuildNumber=${{ github.run_number }} --stacktrace --no-daemon diff --git a/build.gradle.kts b/build.gradle.kts index 0cae59533..1a18d143d 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -18,8 +18,8 @@ import com.jfrog.bintray.gradle.* import com.jfrog.bintray.gradle.tasks.* import org.gradle.api.publish.maven.internal.artifact.* import org.jetbrains.kotlin.gradle.dsl.* -import org.jetbrains.kotlin.gradle.targets.js.* -import org.jetbrains.kotlin.gradle.targets.jvm.* +import org.jetbrains.kotlin.gradle.plugin.mpp.* +import org.jetbrains.kotlin.konan.target.* import org.jfrog.gradle.plugin.artifactory.dsl.* buildscript { @@ -54,43 +54,149 @@ allprojects { } } -subprojects { - tasks.withType { - jvmArgs("-Xmx1g", "-XX:+UseParallelGC") +//true when on CI, and false when local dev. Needed for native build configuration +val ciRun = System.getenv("CI") == "true" + +//configure main host, for which jvm and js tests are enabled, true if locally, or linux on CI +val isMainHost: Boolean = !ciRun || HostManager.hostIsLinux + +// can be: macos, ios, watchos, tvos. If provided, compile and test only those targets +val macTargetsCompilation: String? by project + +println("Configuration: CI=${System.getenv("CI")}, ciRun=$ciRun, isMainHost=$isMainHost, macTargetsCompilation=$macTargetsCompilation") + +val Project.publicationNames: Array + get() { + val publishing: PublishingExtension by extensions + val all = publishing.publications.names + //publish js, jvm, metadata, linuxX64 and kotlinMultiplatform only once + return when { + isMainHost -> all + else -> all - "js" - "jvm" - "metadata" - "kotlinMultiplatform" - "linuxX64" + }.toTypedArray() } +subprojects { plugins.withId("org.jetbrains.kotlin.multiplatform") { + //targets configuration extensions.configure { -// explicitApiWarning() //TODO change to strict before release - targets.all { - when (this) { - is KotlinJsTarget -> { - useCommonJs() - //configure running tests for JS - nodejs { - testTask { - useMocha { - timeout = "600s" - } - } + val isAutoConfigurable = project.name.startsWith("rsocket") //manual config of others + val jvmOnly = project.name == "rsocket-transport-ktor-server" //server is jvm only + + //windows target isn't supported by ktor-network + val supportMingw = project.name != "rsocket-transport-ktor" && project.name != "rsocket-transport-ktor-client" + + + if (!isAutoConfigurable) return@configure + + jvm { + compilations.all { + kotlinOptions.jvmTarget = "1.6" + } + testRuns.all { + executionTask.configure { + enabled = isMainHost + jvmArgs("-Xmx4g", "-XX:+UseParallelGC") + } + } + } + + if (jvmOnly) return@configure + + js { + useCommonJs() + //configure running tests for JS + nodejs { + testTask { + enabled = isMainHost + useMocha { + timeout = "600s" } - browser { - testTask { - useKarma { - useConfigDirectory(rootDir.resolve("js").resolve("karma.config.d")) - useChromeHeadless() - } - } + } + } + browser { + testTask { + enabled = isMainHost + useKarma { + useConfigDirectory(rootDir.resolve("js").resolve("karma.config.d")) + useChromeHeadless() } } - is KotlinJvmTarget -> { - compilations.all { - kotlinOptions { jvmTarget = "1.6" } + } + } + + //native targets configuration + if (ciRun) { + fun KotlinNativeTarget.disableCompilation() { + compilations.all { compileKotlinTask.enabled = false } + binaries.all { linkTask.enabled = false } + } + + val hostTargets = listOfNotNull(linuxX64(), macosX64(), if (supportMingw) mingwX64() else null) + + val iosTargets = listOf(iosArm32(), iosArm64(), iosX64()) + val tvosTargets = listOf(tvosArm64(), tvosX64()) + val watchosTargets = listOf(watchosArm32(), watchosArm64(), watchosX86()) + val nativeTargets = hostTargets + iosTargets + tvosTargets + watchosTargets + + val nativeMain by sourceSets.creating { + dependsOn(sourceSets["commonMain"]) + } + val nativeTest by sourceSets.creating { + dependsOn(sourceSets["commonTest"]) + } + + nativeTargets.forEach { + sourceSets["${it.name}Main"].dependsOn(nativeMain) + sourceSets["${it.name}Test"].dependsOn(nativeTest) + } + + //disable cross compilation of linux target on non linux hosts + if (!HostManager.hostIsLinux) linuxX64().disableCompilation() + + //disable compilation of part of mac targets + if (HostManager.hostIsMac) when (macTargetsCompilation) { + "macos" -> iosTargets + tvosTargets + watchosTargets + "ios", "watchos", "tvos" -> { + //disable test compilation for macos, but leave main to compile examples and playground + macosX64 { + compilations.all { if (name == "test") compileKotlinTask.enabled = false } + binaries.all { linkTask.enabled = false } } + when (macTargetsCompilation) { + "ios" -> tvosTargets + watchosTargets + "watchos" -> iosTargets + tvosTargets + "tvos" -> iosTargets + watchosTargets + else -> emptyList() + } + } + else -> emptyList() + }.forEach(KotlinNativeTarget::disableCompilation) + } else { + //if not on CI, use only one native target same as host, DON'T PUBLISH IN THAT MODE LOCALLY!!! + when { + HostManager.hostIsLinux -> linuxX64("native") + HostManager.hostIsMingw && supportMingw -> mingwX64("native") + HostManager.hostIsMac -> macosX64("native") + } + } + + //run tests on release + mimalloc to reduce tests execution time + //compilation is slower in that mode + targets.all { + if (this is KotlinNativeTargetWithTests<*>) { + binaries.test(listOf(RELEASE)) + testRuns.all { setExecutionSourceFrom(binaries.getTest(RELEASE)) } + compilations.all { + kotlinOptions.freeCompilerArgs += "-Xallocator=mimalloc" } } } + } + //common configuration + extensions.configure { +// explicitApiWarning() //TODO change to strict before release sourceSets.all { languageSettings.apply { progressiveMode = true @@ -113,19 +219,15 @@ subprojects { } if (project.name != "rsocket-test") { - val commonTest by sourceSets.getting { - dependencies { - implementation(project(":rsocket-test")) - } + sourceSets["commonTest"].dependencies { + implementation(project(":rsocket-test")) } } //fix atomicfu for examples and playground if ("examples" in project.path || project.name == "playground") { - val commonMain by sourceSets.getting { - dependencies { - implementation("org.jetbrains.kotlinx:atomicfu:$kotlinxAtomicfuVersion") - } + sourceSets["commonMain"].dependencies { + implementation("org.jetbrains.kotlinx:atomicfu:$kotlinxAtomicfuVersion") } } } @@ -198,8 +300,9 @@ if (bintrayUser != null && bintrayKey != null) { setProperty("password", bintrayKey) setProperty("maven", true) }) + println("Artifactory: ${publicationNames.contentToString()}") defaults(delegateClosureOf { - invokeMethod("publications", arrayOf("kotlinMultiplatform", "metadata", "jvm", "js")) + invokeMethod("publications", publicationNames) }) }) @@ -221,7 +324,8 @@ if (bintrayUser != null && bintrayKey != null) { extensions.configure { user = bintrayUser key = bintrayKey - setPublications("kotlinMultiplatform", "metadata", "jvm", "js") + println("Bintray: ${publicationNames.contentToString()}") + setPublications(*publicationNames) publish = true override = true @@ -256,10 +360,13 @@ if (bintrayUser != null && bintrayKey != null) { //workaround for https://github.com/bintray/gradle-bintray-plugin/issues/229 tasks.withType { + dependsOn("publishToMavenLocal") doFirst { val publishing: PublishingExtension by extensions + val names = publicationNames publishing.publications .filterIsInstance() + .filter { it.name in names } .forEach { publication -> val moduleFile = buildDir.resolve("publications/${publication.name}/module.json") if (moduleFile.exists()) { @@ -267,7 +374,6 @@ if (bintrayUser != null && bintrayKey != null) { override fun getDefaultExtension() = "module" }) } - } } } diff --git a/examples/multiplatform-chat/build.gradle.kts b/examples/multiplatform-chat/build.gradle.kts index 5302b7020..3be0dd128 100644 --- a/examples/multiplatform-chat/build.gradle.kts +++ b/examples/multiplatform-chat/build.gradle.kts @@ -14,6 +14,8 @@ * limitations under the License. */ +import org.jetbrains.kotlin.konan.target.* + plugins { kotlin("multiplatform") kotlin("plugin.serialization") version "1.4.10" @@ -27,6 +29,19 @@ kotlin { browser { binaries.executable() } + nodejs { + binaries.executable() + } + } + when { + HostManager.hostIsLinux -> linuxX64("clientNative") + HostManager.hostIsMingw -> null //no native support for TCP mingwX64("clientNative") + HostManager.hostIsMac -> macosX64("clientNative") + else -> null + }?.binaries { + executable { + entryPoint = "main" + } } sourceSets { @@ -65,5 +80,11 @@ kotlin { implementation("io.ktor:ktor-client-js:1.4.1") } } + + if (!HostManager.hostIsMingw) { + val clientNativeMain by getting { + dependsOn(clientMain) + } + } } } diff --git a/examples/multiplatform-chat/src/clientJsMain/kotlin/App.kt b/examples/multiplatform-chat/src/clientJsMain/kotlin/App.kt index b7ed4a463..9cf47fe4a 100644 --- a/examples/multiplatform-chat/src/clientJsMain/kotlin/App.kt +++ b/examples/multiplatform-chat/src/clientJsMain/kotlin/App.kt @@ -17,7 +17,7 @@ import kotlinx.coroutines.flow.* suspend fun main() { - val api = connectToApi("Yuri") + val api = connectToApiUsingWS("Yuri") api.users.all().forEach { println(it) diff --git a/examples/multiplatform-chat/src/clientJvmMain/kotlin/AppTCP.kt b/examples/multiplatform-chat/src/clientJvmMain/kotlin/AppTCP.kt new file mode 100644 index 000000000..f781cea39 --- /dev/null +++ b/examples/multiplatform-chat/src/clientJvmMain/kotlin/AppTCP.kt @@ -0,0 +1,34 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import kotlinx.coroutines.flow.* + +suspend fun main() { + val api = connectToApiUsingTCP("Hanna") + + api.users.all().forEach { + println(it) + } + + val chat = api.chats.all().firstOrNull() ?: api.chats.new("rsocket-kotlin chat") + + val sentMessage = api.messages.send(chat.id, "RSocket is awesome! (from JVM TCP)") + println(sentMessage) + + api.messages.messages(chat.id, -1).collect { + println("Received: $it") + } +} diff --git a/examples/multiplatform-chat/src/clientJvmMain/kotlin/App.kt b/examples/multiplatform-chat/src/clientJvmMain/kotlin/AppWS.kt similarity index 93% rename from examples/multiplatform-chat/src/clientJvmMain/kotlin/App.kt rename to examples/multiplatform-chat/src/clientJvmMain/kotlin/AppWS.kt index 705e2f874..7ecfd1ce5 100644 --- a/examples/multiplatform-chat/src/clientJvmMain/kotlin/App.kt +++ b/examples/multiplatform-chat/src/clientJvmMain/kotlin/AppWS.kt @@ -17,7 +17,7 @@ import kotlinx.coroutines.flow.* suspend fun main() { - val api = connectToApi("Oleg") + val api = connectToApiUsingWS("Oleg") api.users.all().forEach { println(it) @@ -25,7 +25,7 @@ suspend fun main() { val chat = api.chats.all().firstOrNull() ?: api.chats.new("rsocket-kotlin chat") - val sentMessage = api.messages.send(chat.id, "RSocket is awesome! (from JVM)") + val sentMessage = api.messages.send(chat.id, "RSocket is awesome! (from JVM WS)") println(sentMessage) api.messages.messages(chat.id, -1).collect { diff --git a/examples/multiplatform-chat/src/clientMain/kotlin/Api.kt b/examples/multiplatform-chat/src/clientMain/kotlin/Api.kt index 7b0e22b43..50dcfef0a 100644 --- a/examples/multiplatform-chat/src/clientMain/kotlin/Api.kt +++ b/examples/multiplatform-chat/src/clientMain/kotlin/Api.kt @@ -16,7 +16,11 @@ import io.ktor.client.* import io.ktor.client.features.websocket.* +import io.ktor.network.selector.* +import io.ktor.network.sockets.* +import io.ktor.util.* import io.rsocket.kotlin.* +import io.rsocket.kotlin.connection.* import io.rsocket.kotlin.core.* import io.rsocket.kotlin.payload.* @@ -27,7 +31,7 @@ class Api(rSocket: RSocket) { val messages = MessageApi(rSocket, proto) } -suspend fun connectToApi(name: String): Api { +suspend fun connectToApiUsingWS(name: String): Api { val client = HttpClient { install(WebSockets) install(RSocketClientSupport) { @@ -37,3 +41,10 @@ suspend fun connectToApi(name: String): Api { return Api(client.rSocket(port = 9000)) } + +@OptIn(InternalAPI::class) +suspend fun connectToApiUsingTCP(name: String): Api { + val socket = aSocket(SelectorManager()).tcp().connect("0.0.0.0", 8000) + + return Api(socket.connection.connectClient(RSocketConnectorConfiguration(setupPayload = Payload(name)))) +} diff --git a/examples/multiplatform-chat/src/clientNativeMain/kotlin/AppTCP.kt b/examples/multiplatform-chat/src/clientNativeMain/kotlin/AppTCP.kt new file mode 100644 index 000000000..77b342847 --- /dev/null +++ b/examples/multiplatform-chat/src/clientNativeMain/kotlin/AppTCP.kt @@ -0,0 +1,35 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* + +fun main(): Unit = runBlocking { + val api = connectToApiUsingTCP("Gloria") + + api.users.all().forEach { + println(it) + } + + val chat = api.chats.all().firstOrNull() ?: api.chats.new("rsocket-kotlin chat") + + val sentMessage = api.messages.send(chat.id, "RSocket is awesome! (from Native)") + println(sentMessage) + + api.messages.messages(chat.id, -1).collect { + println("Received: $it") + } +} diff --git a/examples/multiplatform-chat/src/serverJvmMain/kotlin/App.kt b/examples/multiplatform-chat/src/serverJvmMain/kotlin/App.kt index 90c0e874e..17a03959b 100644 --- a/examples/multiplatform-chat/src/serverJvmMain/kotlin/App.kt +++ b/examples/multiplatform-chat/src/serverJvmMain/kotlin/App.kt @@ -15,9 +15,12 @@ */ import io.ktor.application.* +import io.ktor.network.selector.* +import io.ktor.network.sockets.* import io.ktor.routing.* import io.ktor.server.cio.* import io.ktor.server.engine.* +import io.ktor.util.* import io.ktor.websocket.* import io.rsocket.kotlin.* import io.rsocket.kotlin.core.* @@ -25,20 +28,8 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.serialization.* -//TODO add TCP server +@OptIn(KtorExperimentalAPI::class, ExperimentalSerializationApi::class) fun main() { - embeddedServer(CIO, port = 9000) { - install(WebSockets) - install(RSocketServerSupport) - - routing { - rSocketChat() - } - }.start(true) -} - -@OptIn(ExperimentalSerializationApi::class) -fun Routing.rSocketChat() { val proto = ConfiguredProtoBuf val users = Users() val chats = Chats() @@ -48,7 +39,8 @@ fun Routing.rSocketChat() { val chatsApi = ChatApi(chats, messages) val messagesApi = MessageApi(messages, chats) - rSocket { + //create acceptor + val acceptor: RSocketAcceptor = { val userName = payload.data.readText() val user = users.getOrCreate(userName) val session = Session(user.id) @@ -102,4 +94,19 @@ fun Routing.rSocketChat() { } } } + + //start TCP server + GlobalScope.launch { + aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().bind(port = 8000).rSocket(acceptor = acceptor) + } + + //start WS server + embeddedServer(CIO, port = 9000) { + install(WebSockets) + install(RSocketServerSupport) + + routing { + rSocket(acceptor = acceptor) + } + }.start(true) } diff --git a/gradle.properties b/gradle.properties index 7e7f286f1..8816db47b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -21,7 +21,7 @@ version=0.10.0 #Versions kotlinVersion=1.4.10 ktorVersion=1.4.1 -kotlinxCoroutinesVersion=1.3.9 +kotlinxCoroutinesVersion=1.3.9-native-mt-2 kotlinxAtomicfuVersion=0.14.4 turbineVersion=0.2.1 rsocketJavaVersion=1.1.0-M2 @@ -32,6 +32,7 @@ kotlin.caching.enabled=true kotlin.incremental=true kotlin.incremental.multiplatform=true kotlin.parallel.tasks.in.project=true +kotlin.mpp.stability.nowarn=true #HMPP support kotlin.mpp.enableGranularSourceSetsMetadata=true @@ -40,12 +41,16 @@ kotlin.mpp.enableCompatibilityMetadataVariant=true #JS and Native flags kotlin.js.compiler=both kotlin.native.enableDependencyPropagation=false +kotlin.native.ignoreIncorrectDependencies=true +kotlin.native.ignoreDisabledTargets=true +kotlin.native.distribution.type=prebuilt #Gradle org.gradle.parallel=false org.gradle.caching=true org.gradle.configureondemand=true +org.gradle.jvmargs=-Xmx4g #Bintray publish workaround systemProp.org.gradle.internal.publish.checksums.insecure=true diff --git a/playground/build.gradle.kts b/playground/build.gradle.kts index a7a77425e..63d299690 100644 --- a/playground/build.gradle.kts +++ b/playground/build.gradle.kts @@ -14,6 +14,8 @@ * limitations under the License. */ +import org.jetbrains.kotlin.konan.target.* + plugins { kotlin("multiplatform") } @@ -27,6 +29,16 @@ kotlin { binaries.executable() } } + when { + HostManager.hostIsLinux -> linuxX64("native") +// HostManager.hostIsMingw -> mingwX64("native") //no native support for TCP + HostManager.hostIsMac -> macosX64("native") + else -> null + }?.binaries { + executable { + entryPoint = "main" + } + } sourceSets { val commonMain by getting { diff --git a/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/ConcurrentMap.kt b/playground/src/nativeMain/kotlin/TcpApp.kt similarity index 72% rename from rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/ConcurrentMap.kt rename to playground/src/nativeMain/kotlin/TcpApp.kt index e6a442de4..3f300167e 100644 --- a/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/ConcurrentMap.kt +++ b/playground/src/nativeMain/kotlin/TcpApp.kt @@ -14,6 +14,15 @@ * limitations under the License. */ -package io.rsocket.kotlin.internal +import io.ktor.util.* +import kotlinx.coroutines.* +import kotlin.coroutines.* + +@OptIn(InternalAPI::class) +fun main() { + runBlocking { + runTcpClient(EmptyCoroutineContext) +// runTcpServer(EmptyCoroutineContext) + } +} -internal actual fun concurrentMap(): MutableMap = mutableMapOf() diff --git a/rsocket-core/build.gradle.kts b/rsocket-core/build.gradle.kts index 20e40f294..9d8a29339 100644 --- a/rsocket-core/build.gradle.kts +++ b/rsocket-core/build.gradle.kts @@ -27,14 +27,13 @@ val ktorVersion: String by rootProject val kotlinxCoroutinesVersion: String by rootProject kotlin { - jvm() - js() - sourceSets { val commonMain by getting { dependencies { api("io.ktor:ktor-io:$ktorVersion") - api("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion") + api("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion") { + version { strictly(kotlinxCoroutinesVersion) } + } } } val commonTest by getting { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt new file mode 100644 index 000000000..688a3664a --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt @@ -0,0 +1,254 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal + +import kotlinx.atomicfu.locks.* +import kotlin.math.* + +private fun safeFindNextPositivePowerOfTwo(value: Int): Int = when { + value <= 0 -> 1 + value >= 0x40000000 -> 0x40000000 + else -> 1 shl 32 - (value - 1).countLeadingZeroBits() +} + +/** + * Synchronized IntMap implementation based on Netty IntObjectHashMap. + * Uses atomics on K/N to support mutation. + * On JVM and JS no atomics used. + */ +internal class IntMap(initialCapacity: Int = 8, private val loadFactor: Float = 0.5f) : SynchronizedObject() { + init { + require(loadFactor > 0.0f && loadFactor <= 1.0f) { "loadFactor must be > 0 and <= 1" } + } + + private val store = MutableValue(MapState(0, safeFindNextPositivePowerOfTwo(initialCapacity))) + + val size: Int get() = store.value.store.size + + private inline fun sync(block: MapState.() -> T): T = synchronized(this) { block(store.value) } + + operator fun get(key: Int): V? = sync { get(key) } + operator fun set(key: Int, value: V): V? = sync { setAndGrow(key, value, this@IntMap.store.update) } + fun remove(key: Int): V? = sync { remove(key) } + fun clear() = sync { clear() } + operator fun contains(key: Int): Boolean = sync { contains(key) } + fun values(): List = sync { values() } + fun keys(): Set = sync { keys() } + + private inner class MapState(size: Int, private val capacity: Int) { + private val mask = capacity - 1 + + // Clip the upper bound so that there will always be at least one available slot. + private val maxSize = min(mask, (capacity * loadFactor).toInt()) + + val store: ValueStore = ValueStore(size, capacity) + + operator fun contains(key: Int): Boolean = indexOf(key) >= 0 + + operator fun get(key: Int): V? { + val index = indexOf(key) + if (index == -1) return null + return store.value(index) + } + + fun remove(key: Int): V? { + val index = indexOf(key) + if (index == -1) return null + val prev = store.value(index) + removeAt(index) + return prev + } + + fun setAndGrow(key: Int, value: V, grow: (newState: MapState) -> Unit): V? { + val startIndex = hashIndex(key) + var index = startIndex + while (true) { + if (store.value(index) == null) { + // Found empty slot, use it. + set(index, key, value) + growSize(grow) + return null + } + if (store.key(index) == key) { + // Found existing entry with this key, just replace the value. + val previousValue = store.value(index) + store.setValue(index, value) + return previousValue + } + + // Conflict, keep probing ... + index = probeNext(index) + + // Can only happen if the map was full at MAX_ARRAY_SIZE and couldn't grow. + check(index != startIndex) { "Unable to insert" } + } + } + + fun clear() { + repeat(capacity, this::clear) + store.clearSize() + } + + @OptIn(ExperimentalStdlibApi::class) + fun values(): List = buildList { + repeat(capacity) { + store.value(it)?.let(this::add) + } + } + + @OptIn(ExperimentalStdlibApi::class) + fun keys(): Set = buildSet { + repeat(capacity) { + val key = store.key(it) + if (this@MapState.contains(key)) add(key) + } + } + + @OptIn(ExperimentalStdlibApi::class) + override fun toString(): String { + return buildMap { + repeat(capacity) { + put(store.key(it), store.value(it)?.hashCode()) + } + }.toList().sortedBy { it.first }.toString() + } + + private fun set(index: Int, key: Int, value: V?) { + store.setKey(index, key) + store.setValue(index, value) + } + + private fun clear(index: Int): Unit = set(index, 0, null) + + /** + * Returns the hashed index for the given key. + * The array lengths are always a power of two, so we can use a bitmask to stay inside the array bounds. + */ + private fun hashIndex(key: Int): Int = key and mask + + /** + * Get the next sequential index after `index` and wraps if necessary. + * The array lengths are always a power of two, so we can use a bitmask to stay inside the array bounds. + */ + private fun probeNext(index: Int): Int = index + 1 and mask + + /** + * Locates the index for the given key. This method probes using double hashing. + * + * @param key the key for an entry in the map. + * @return the index where the key was found, or `-1` if no entry is found for that key. + */ + private fun indexOf(key: Int): Int { + val startIndex = hashIndex(key) + var index = startIndex + while (true) { + // It's available, so no chance that this value exists anywhere in the map. + if (store.value(index) == null) return -1 + if (store.key(index) == key) return index + + // Conflict, keep probing ... + index = probeNext(index) + if (index == startIndex) return -1 + } + } + + /** + * Removes entry at the given index position. Also performs opportunistic, incremental rehashing + * if necessary to not break conflict chains. + * + * @param index the index position of the element to remove. + * @return `true` if the next item was moved back. `false` otherwise. + */ + private fun removeAt(index: Int): Boolean { + val s = store.decrementSize() + // Clearing the key is not strictly necessary (for GC like in a regular collection), + // but recommended for security. The memory location is still fresh in the cache anyway. + clear(index) + + // In the interval from index to the next available entry, the arrays may have entries + // that are displaced from their base position due to prior conflicts. Iterate these + // entries and move them back if possible, optimizing future lookups. + // Knuth Section 6.4 Algorithm R, also used by the JDK's IdentityHashMap. + var nextFree = index + var i = probeNext(index) + var value = store.value(i) + while (value != null) { + val key = store.key(i) + val bucket = hashIndex(key) + if (i < bucket && (bucket <= nextFree || nextFree <= i) || bucket <= nextFree && nextFree <= i) { + // Move the displaced entry "back" to the first available position. + set(nextFree, key, value) + // Put the first entry after the displaced entry + clear(i) + nextFree = i + } + i = probeNext(i) + value = store.value(i) + } + return nextFree != index + } + + /** + * Grows the map size after an insertion. If necessary, performs a rehash of the map. + */ + private fun growSize(grow: (newState: MapState) -> Unit) { + val size = store.incrementSize() + if (size <= maxSize) return + + check(capacity != Int.MAX_VALUE) { "Max capacity reached at size=$size" } + grow(rehash()) + } + + /** + * Rehashes the map for the given capacity. + * Double the capacity. + */ + private fun rehash(): MapState = MapState(store.size, capacity shl 1).also { newState -> + // Insert to the new arrays. + repeat(capacity) { + val oldValue = store.value(it) ?: return@repeat + val oldKey = store.key(it) + var index = newState.hashIndex(oldKey) + while (true) { + if (newState.store.value(index) == null) { + newState.set(index, oldKey, oldValue) + break + } + // Conflict, keep probing. Can wrap around, but never reaches startIndex again. + index = probeNext(index) + } + } + clear() + } + } +} + +internal expect class MutableValue(initial: V) { + val value: V + val update: (V) -> Unit +} + +internal expect class ValueStore(size: Int, capacity: Int) { + val size: Int + fun key(index: Int): Int + fun value(index: Int): V? + fun setKey(index: Int, key: Int) + fun setValue(index: Int, value: V?) + fun incrementSize(): Int + fun decrementSize(): Int + fun clearSize() +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketState.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketState.kt index 3a1dcd927..83ff30401 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketState.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketState.kt @@ -39,9 +39,9 @@ internal class RSocketState( private val requestScope = CoroutineScope(SupervisorJob(job)) private val scope = CoroutineScope(job) - val receivers: MutableMap> = concurrentMap() - private val senders: MutableMap = concurrentMap() - private val limits: MutableMap = concurrentMap() + val receivers: IntMap> = IntMap() + private val senders: IntMap = IntMap() + private val limits: IntMap = IntMap() private val keepAliveHandler = KeepAliveHandler(keepAlive, this::sendPrioritized) @@ -95,7 +95,7 @@ internal class RSocketState( fun launchCancelable(streamId: Int, block: suspend CoroutineScope.() -> Unit): Job { val job = launch(block) - job.invokeOnCompletion { if (isActive) senders -= streamId } + job.invokeOnCompletion { if (isActive) senders.remove(streamId) } senders[streamId] = job return job } @@ -133,7 +133,7 @@ internal class RSocketState( requestHandler.job.invokeOnCompletion { cancel("Request handled stopped", it) } job.invokeOnCompletion { error -> requestHandler.cancel("Connection closed", error) - receivers.values.forEach { it.close((error as? CancellationException)?.cause ?: error) } + receivers.values().forEach { it.close((error as? CancellationException)?.cause ?: error) } receivers.clear() limits.clear() senders.clear() diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamId.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamId.kt index 3d2af2461..593ef4a6f 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamId.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamId.kt @@ -21,7 +21,7 @@ import kotlinx.atomicfu.* internal class StreamId(streamId: Int) { private val streamId = atomic(streamId) - fun next(streamIds: Map): Int { + fun next(streamIds: IntMap<*>): Int { var streamId: Int do { streamId = this.streamId.addAndGet(2) and MASK diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/flow/RequestChannelRequesterFlowCollector.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/flow/RequestChannelRequesterFlowCollector.kt index a1b2b0387..201349611 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/flow/RequestChannelRequesterFlowCollector.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/flow/RequestChannelRequesterFlowCollector.kt @@ -19,6 +19,7 @@ package io.rsocket.kotlin.internal.flow import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.internal.* import io.rsocket.kotlin.payload.* +import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* @@ -28,10 +29,10 @@ internal class RequestChannelRequesterFlowCollector( private val receiver: CompletableDeferred?>, private val requestSize: Int, ) : LimitingFlowCollector(1) { - private var firstRequest = true + private val firstRequest = atomic(true) //needed for K/N override suspend fun emitValue(value: Payload): Unit = with(state) { - if (firstRequest) { - firstRequest = false + if (firstRequest.value) { + firstRequest.value = false receiver.complete(createReceiverFor(streamId)) send(RequestChannelFrame(streamId, requestSize, value)) } else { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/keepalive/KeepAliveHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/keepalive/KeepAliveHandler.kt index 42d3608e0..0678faec8 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/keepalive/KeepAliveHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/keepalive/KeepAliveHandler.kt @@ -27,7 +27,7 @@ import kotlin.time.TimeSource.* @OptIn(ExperimentalTime::class) internal class KeepAliveHandler( private val keepAlive: KeepAlive, - private val offerFrame: (frame: Frame) -> Unit + private val offerFrame: (frame: Frame) -> Unit, ) { private val lastMark = atomic(null) @@ -45,7 +45,9 @@ internal class KeepAliveHandler( while (isActive) { delay(keepAlive.interval) if (lastMark.value!!.elapsedNow() >= keepAlive.maxLifetime) { - throw RSocketError.ConnectionError("No keep-alive for ${keepAlive.maxLifetime}") + //for K/N + scope.cancel("Keep alive failed", RSocketError.ConnectionError("No keep-alive for ${keepAlive.maxLifetime}")) + break } offerFrame(KeepAliveFrame(true, 0, ByteReadPacket.Empty)) } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/NoopLogger.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/NoopLogger.kt new file mode 100644 index 000000000..eb43ac305 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/NoopLogger.kt @@ -0,0 +1,27 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.logging + +/** + * Logger implementation, that never print + */ +object NoopLogger : Logger, LoggerFactory { + override val tag: String get() = "noop" + override fun isLoggable(level: LoggingLevel): Boolean = false + override fun rawLog(level: LoggingLevel, throwable: Throwable?, message: Any?): Unit = Unit + override fun logger(tag: String): Logger = this +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/PrintLogger.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/PrintLogger.kt new file mode 100644 index 000000000..2f80e94fd --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/PrintLogger.kt @@ -0,0 +1,34 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.logging + +class PrintLogger( + override val tag: String, + private val minLevel: LoggingLevel = LoggingLevel.INFO, +) : Logger { + override fun isLoggable(level: LoggingLevel): Boolean = level >= minLevel + override fun rawLog(level: LoggingLevel, throwable: Throwable?, message: Any?) { + val error = throwable?.stackTraceToString()?.let { "Error: $it" } ?: "" + println("[$level] ($tag) $message $error") + } + + companion object : LoggerFactory { + override fun logger(tag: String): Logger = PrintLogger(tag) + + fun withLevel(minLevel: LoggingLevel): LoggerFactory = LoggerFactory { PrintLogger(it, minLevel) } + } +} diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/IntMapTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/IntMapTest.kt new file mode 100644 index 000000000..24dc84c0d --- /dev/null +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/IntMapTest.kt @@ -0,0 +1,246 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal + +import kotlin.random.* +import kotlin.test.* + +class IntMapTest { + private data class Value(private val name: String?) + + private val map = IntMap() + + @Test + fun putNewMappingShouldSucceed() { + val v = Value("v") + val key = 1 + assertNull(map.set(key, v)) + assertEquals(1, map.size) + assertTrue(map.contains(key)) + assertEquals(v, map[key]) + } + + @Test + fun putShouldReplaceValue() { + val v1 = Value("v1") + val key = 1 + assertNull(map.set(key, v1)) + + // Replace the value. + val v2 = Value("v2") + assertSame(v1, map.set(key, v2)) + assertEquals(1, map.size.toLong()) + assertTrue(map.contains(key)) + assertEquals(v2, map[key]) + } + + @Test + fun setShouldGrowMap() { + for (key in 0 until 255) { + val v = Value(key.toString()) + assertNull(map.set(key, v)) + assertEquals(key + 1.toLong(), map.size.toLong()) + assertTrue(map.contains(key)) + assertEquals(v, map[key]) + } + } + + @Test + fun negativeKeyShouldSucceed() { + val v = Value("v") + map[-3] = v + assertEquals(1, map.size.toLong()) + assertEquals(v, map[-3]) + } + + @Test + fun removeMissingValueShouldReturnNull() { + assertNull(map.remove(1)) + assertEquals(0, map.size.toLong()) + } + + @Test + fun removeShouldReturnPreviousValue() { + val v = Value("v") + val key = 1 + map[key] = v + assertSame(v, map.remove(key)) + } + + /** + * This test is a bit internal-centric. We're just forcing a rehash to occur based on no longer + * having any FREE slots available. We do this by adding and then removing several keys up to + * the capacity, so that no rehash is done. We then add one more, which will cause the rehash + * due to a lack of free slots and verify that everything is still behaving properly + */ + @Test + fun noFreeSlotsShouldRehash() { + for (i in 0..9) { + map[i] = Value(i.toString()) + // Now mark it as REMOVED so that size won't cause the rehash. + map.remove(i) + assertEquals(0, map.size.toLong()) + } + + // Now add an entry to force the rehash since no FREE slots are available in the map. + val v = Value("v") + val key = 1 + map[key] = v + assertEquals(1, map.size.toLong()) + assertSame(v, map[key]) + } + + @Test + fun clearShouldSucceed() { + val v1 = Value("v1") + val v2 = Value("v2") + val v3 = Value("v3") + map[1] = v1 + map[2] = v2 + map[3] = v3 + map.clear() + assertEquals(0, map.size.toLong()) + assertTrue(map.size == 0) + } + + @Test + fun keysShouldBeReturned() { + val k1 = 1 + val k2 = 2 + val k3 = 3 + val k4 = 4 + map[k1] = Value("v1") + map[k2] = Value("v2") + map[k3] = Value("v3") + + // Add and then immediately remove another entry. + map[k4] = Value("v4") + map.remove(k4) + val keys = map.keys() + assertEquals(3, keys.size) + val expected = setOf(k1, k2, k3) + val found = mutableSetOf() + for (key in keys) { + assertTrue(found.add(key)) + } + assertEquals(expected, found) + } + + @Test + fun valuesShouldBeReturned() { + val k1 = 1 + val k2 = 2 + val k3 = 3 + val k4 = 4 + val v1 = Value("v1") + val v2 = Value("v2") + val v3 = Value("v3") + map[k1] = v1 + map[k2] = v2 + map[k3] = v3 + + // Add and then immediately remove another entry. + map[k4] = Value("v4") + map.remove(k4) + + // Ensure values() return all values. + val expected = setOf(v1, v2, v3) + val actual = map.values().toSet() + assertEquals(expected, actual) + } + + @Test + fun mapShouldSupportHashingConflicts() { + for (mod in 0..9) { + var sz = 1 + while (sz <= 101) { + val map = IntMap(sz) + for (i in 0..99) map[(i * mod)] = "" + sz += 2 + } + } + } + + @Test + fun fuzzTest() { + // This test is so extremely internals-dependent that I'm not even trying to + // minimize that. Any internal changes will not fail the test (so it's not flaky per se) + // but will possibly make it less effective (not test interesting scenarios anymore). + + // The RNG algorithm is specified and stable, so this will cause the same exact dataset + // to be used in every run and every JVM implementation. + val rnd = Random(0) + val baseSize = 1000 + // Empirically-determined size to expand the capacity exactly once, and before + // the step that creates the long conflict chain. We need to test rehash(), + // but also control when rehash happens because it cleans up the REMOVED entries. + // This size is also chosen so after the single rehash, the map will be densely + // populated, getting close to a second rehash but not triggering it. + val startTableSize = 1105 + val map = IntMap(startTableSize) + // Reference map which implementation we trust to be correct, will mirror all operations. + val goodMap = HashMap() + + // Add initial population. + for (i in 0 until baseSize / 4) { + var key = rnd.nextInt(baseSize) + assertEquals(goodMap.put(key, key), map.set(key, key)) + // 50% elements are multiple of a divisor of startTableSize => more conflicts. + key = (rnd.nextInt(baseSize) * 17) + assertEquals(goodMap.put(key, key), map.set(key, key)) + } + + // Now do some mixed adds and removes for further fuzzing + // Rehash will happen here, but only once, and the final size will be closer to max. + for (i in 0 until baseSize * 1000) { + val key = rnd.nextInt(baseSize) + if (rnd.nextDouble() >= 0.2) { + assertEquals(goodMap.put(key, key), map.set(key, key)) + } else { + assertEquals(goodMap.remove(key), map.remove(key)) + } + } + + // Final batch of fuzzing, only searches and removes. + var removeSize: Int = map.size / 2 + while (removeSize > 0) { + val key = rnd.nextInt(baseSize) + val found = goodMap.contains(key) + assertEquals(found, map.contains(key)) + assertEquals(goodMap.remove(key), map.remove(key)) + if (found) { + --removeSize + } + } + + // Now gotta write some code to compare the final maps, as equals() won't work. + assertEquals(goodMap.size, map.size) + val goodKeys = goodMap.keys.toTypedArray().sortedArray() + val keys = map.keys().toTypedArray().sortedArray() + assertEquals(goodKeys.size, keys.size) + for (i in goodKeys.indices) { + assertEquals(goodKeys[i], keys[i]) + } + + // Finally drain the map. + for (key in keys) { + assertEquals(goodMap.remove(key), map.remove(key)) + } + assertTrue(map.size == 0) + } + +} diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt index ceff568ef..9dacd7493 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt @@ -132,8 +132,8 @@ class RSocketRequesterTest : TestWithConnection() { } } - @Test - fun testStreamBufferWithAnotherDispatcher() = test { + @Test //ignored on native because of dispatcher switching + fun testStreamBufferWithAnotherDispatcher() = test(ignoreNative = true) { val flow = requester.requestStream(Payload.Empty) .buffer(2) @@ -233,8 +233,8 @@ class RSocketRequesterTest : TestWithConnection() { assertEquals(sent, connection.sentFrames.size) } - @Test - fun testChannelRequestServerSideCancellation() = test { + @Test //ignored on native because of coroutines bug with channels + fun testChannelRequestServerSideCancellation() = test(ignoreNative = true) { var ch: SendChannel? = null val request = channelFlow { ch = this diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketTest.kt index 49b26285e..75b31887b 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketTest.kt @@ -68,7 +68,12 @@ class RSocketTest : SuspendTest { } } } - clientConnection.connectClient(RSocketConnectorConfiguration(keepAlive = KeepAlive(1000.seconds, 1000.seconds))) + clientConnection.connectClient( + RSocketConnectorConfiguration( + keepAlive = KeepAlive(1000.seconds, 1000.seconds), + loggerFactory = TestLoggerFactory + ) + ) } @Test diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/StreamIdTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/StreamIdTest.kt index e8d594dc0..5ff8bf57e 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/StreamIdTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/StreamIdTest.kt @@ -19,7 +19,7 @@ package io.rsocket.kotlin.internal import kotlin.test.* class StreamIdTest { - private val map = mutableMapOf() + private val map = IntMap() @Test fun testClientSequence() { diff --git a/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt b/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt new file mode 100644 index 000000000..c033c962e --- /dev/null +++ b/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt @@ -0,0 +1,59 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal + +internal actual class MutableValue actual constructor(initial: V) { + private var _value = initial + actual val value: V get() = _value + actual val update: (V) -> Unit = { _value = it } +} + +internal actual class ValueStore actual constructor(size: Int, capacity: Int) { + private var _size = size + private val keys: IntArray = IntArray(capacity) + + @Suppress("UNCHECKED_CAST") + private val values: Array = arrayOfNulls(capacity) as Array + + actual val size: Int get() = _size + + actual fun key(index: Int): Int = keys[index] + + actual fun value(index: Int): V? = values[index] + + actual fun setKey(index: Int, key: Int) { + keys[index] = key + } + + actual fun setValue(index: Int, value: V?) { + values[index] = value + } + + actual fun incrementSize(): Int { + _size += 1 + return _size + } + + actual fun decrementSize(): Int { + _size -= 1 + return _size + } + + actual fun clearSize() { + _size = 0 + } +} diff --git a/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt b/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt new file mode 100644 index 000000000..c033c962e --- /dev/null +++ b/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt @@ -0,0 +1,59 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal + +internal actual class MutableValue actual constructor(initial: V) { + private var _value = initial + actual val value: V get() = _value + actual val update: (V) -> Unit = { _value = it } +} + +internal actual class ValueStore actual constructor(size: Int, capacity: Int) { + private var _size = size + private val keys: IntArray = IntArray(capacity) + + @Suppress("UNCHECKED_CAST") + private val values: Array = arrayOfNulls(capacity) as Array + + actual val size: Int get() = _size + + actual fun key(index: Int): Int = keys[index] + + actual fun value(index: Int): V? = values[index] + + actual fun setKey(index: Int, key: Int) { + keys[index] = key + } + + actual fun setValue(index: Int, value: V?) { + values[index] = value + } + + actual fun incrementSize(): Int { + _size += 1 + return _size + } + + actual fun decrementSize(): Int { + _size -= 1 + return _size + } + + actual fun clearSize() { + _size = 0 + } +} diff --git a/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt b/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt new file mode 100644 index 000000000..fef058edc --- /dev/null +++ b/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt @@ -0,0 +1,51 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal + +import kotlinx.atomicfu.* + +internal actual class MutableValue actual constructor(initial: V) { + private val _value = atomic(initial) + actual val value: V get() = _value.value + actual val update: (V) -> Unit = { _value.value = it } +} + +internal actual class ValueStore actual constructor(size: Int, capacity: Int) { + private val _size: AtomicInt = atomic(size) + private val keys: AtomicIntArray = AtomicIntArray(capacity) + private val values: AtomicArray = atomicArrayOfNulls(capacity) + + actual val size: Int get() = _size.value + + actual fun key(index: Int): Int = keys[index].value + + actual fun value(index: Int): V? = values[index].value + + actual fun setKey(index: Int, key: Int) { + keys[index].value = key + } + + actual fun setValue(index: Int, value: V?) { + values[index].value = value + } + + actual fun incrementSize(): Int = _size.incrementAndGet() + actual fun decrementSize(): Int = _size.decrementAndGet() + actual fun clearSize() { + _size.value = 0 + } +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/ConcurrentMap.kt b/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt similarity index 86% rename from rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/ConcurrentMap.kt rename to rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt index a5d8665ce..eff845a55 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/ConcurrentMap.kt +++ b/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt @@ -14,6 +14,6 @@ * limitations under the License. */ -package io.rsocket.kotlin.internal +package io.rsocket.kotlin.logging -internal expect fun concurrentMap(): MutableMap +actual val DefaultLoggerFactory: LoggerFactory get() = PrintLogger diff --git a/rsocket-test/build.gradle.kts b/rsocket-test/build.gradle.kts index 8ec5e75c9..ded9dccf2 100644 --- a/rsocket-test/build.gradle.kts +++ b/rsocket-test/build.gradle.kts @@ -23,9 +23,6 @@ val kotlinxCoroutinesVersion: String by rootProject val turbineVersion: String by rootProject kotlin { - jvm() - js() - sourceSets { val commonMain by getting { dependencies { @@ -33,7 +30,9 @@ kotlin { api(kotlin("test-common")) api(kotlin("test-annotations-common")) - api("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion") + api("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion") { + version { strictly(kotlinxCoroutinesVersion) } + } api("io.ktor:ktor-utils:$ktorVersion") api("app.cash.turbine:turbine:$turbineVersion") } diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/SuspendTest.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/SuspendTest.kt index 9853ae664..c0b533306 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/SuspendTest.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/SuspendTest.kt @@ -31,7 +31,11 @@ interface SuspendTest { suspend fun before(): Unit = Unit suspend fun after(): Unit = Unit - fun test(timeout: Duration = testTimeout, block: suspend CoroutineScope.() -> Unit) = runTest { + fun test( + timeout: Duration = testTimeout, + ignoreNative: Boolean = false, + block: suspend CoroutineScope.() -> Unit, + ) = runTest(ignoreNative = ignoreNative) { runCatching { if (debug) println("[TEST] BEFORE started") diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Test.common.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Test.common.kt index 854ec5924..1d6c52a03 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Test.common.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Test.common.kt @@ -16,8 +16,16 @@ package io.rsocket.kotlin.test +import io.rsocket.kotlin.logging.* import kotlinx.coroutines.* +import kotlin.time.* -internal expect fun runTest(block: suspend CoroutineScope.() -> Unit) +internal expect fun runTest(ignoreNative: Boolean, block: suspend CoroutineScope.() -> Unit) expect val anotherDispatcher: CoroutineDispatcher + +expect val TestLoggerFactory: LoggerFactory + +expect val TransportTestLongDuration: Duration + +expect val TransportTestDefaultDuration: Duration diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt index 8da4a2a5e..60f2ac923 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt @@ -27,8 +27,9 @@ class TestConnection : Connection { override val job: Job = Job() private val sender = Channel(Channel.UNLIMITED) private val receiver = Channel(Channel.UNLIMITED) - private val _sentFrames = mutableListOf() - val sentFrames: List get() = _sentFrames.map { it.copy().toFrame() } + + private val store = TestPacketStore() + val sentFrames: List get() = store.stored.map { it.copy().toFrame() } init { job.invokeOnCompletion { @@ -39,7 +40,7 @@ class TestConnection : Connection { override suspend fun send(packet: ByteReadPacket) { sender.send(packet) - _sentFrames += packet.copy() + store.store(packet.copy()) } override suspend fun receive(): ByteReadPacket { diff --git a/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/ConcurrentMap.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestPacketStore.kt similarity index 78% rename from rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/ConcurrentMap.kt rename to rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestPacketStore.kt index 9f1f91a49..488fea129 100644 --- a/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/ConcurrentMap.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestPacketStore.kt @@ -14,8 +14,11 @@ * limitations under the License. */ -package io.rsocket.kotlin.internal +package io.rsocket.kotlin.test -import java.util.concurrent.* +import io.ktor.utils.io.core.* -internal actual fun concurrentMap(): MutableMap = ConcurrentHashMap() +expect class TestPacketStore() { + val stored: List + fun store(packet: ByteReadPacket) +} diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TransportTest.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TransportTest.kt index ca15ab3df..a084f4f5e 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TransportTest.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TransportTest.kt @@ -20,6 +20,7 @@ import io.ktor.utils.io.core.* import io.rsocket.kotlin.* import io.rsocket.kotlin.core.* import io.rsocket.kotlin.keepalive.* +import io.rsocket.kotlin.logging.* import io.rsocket.kotlin.payload.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* @@ -28,7 +29,7 @@ import kotlin.time.* @OptIn(ExperimentalTime::class) abstract class TransportTest : SuspendTest { - override val testTimeout: Duration = 10.minutes + override val testTimeout: Duration = TransportTestDefaultDuration lateinit var client: RSocket //should be assigned in `before` @@ -78,7 +79,7 @@ abstract class TransportTest : SuspendTest { } @Test - fun largePayloadRequestChannel200() = test { + fun largePayloadRequestChannel200() = test(TransportTestLongDuration) { val request = flow { repeat(200) { emit(LARGE_PAYLOAD) } } @@ -87,7 +88,7 @@ abstract class TransportTest : SuspendTest { } @Test - fun requestChannel20000() = test { + fun requestChannel20000() = test(TransportTestLongDuration) { val request = flow { repeat(20_000) { emit(Payload(7)) } } @@ -99,7 +100,7 @@ abstract class TransportTest : SuspendTest { } @Test - fun requestChannel200000() = test { + fun requestChannel200000() = test(TransportTestLongDuration) { val request = flow { repeat(200_000) { emit(Payload(it)) } } @@ -108,7 +109,7 @@ abstract class TransportTest : SuspendTest { } @Test - fun requestChannel256x512() = test { + fun requestChannel256x512() = test(TransportTestLongDuration) { val request = flow { repeat(512) { emit(Payload(it)) @@ -148,7 +149,7 @@ abstract class TransportTest : SuspendTest { } @Test - fun requestResponse100000() = test { + fun requestResponse100000() = test(TransportTestLongDuration) { repeat(100000) { client.requestResponse(Payload(3)).let(Companion::checkPayload) } } @@ -167,8 +168,8 @@ abstract class TransportTest : SuspendTest { companion object { val ACCEPTOR: RSocketAcceptor = { TestRSocket() } - val CONNECTOR_CONFIG = RSocketConnectorConfiguration(keepAlive = KeepAlive(10.minutes, 100.minutes)) - val SERVER_CONFIG = RSocketServerConfiguration() + val CONNECTOR_CONFIG = RSocketConnectorConfiguration(keepAlive = KeepAlive(10.minutes, 100.minutes), loggerFactory = NoopLogger) + val SERVER_CONFIG = RSocketServerConfiguration(loggerFactory = NoopLogger) val MOCK_DATA: String = "test-data" val MOCK_METADATA: String = "metadata" diff --git a/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/Test.kt b/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/Test.kt index f4f97196e..8e21b49c1 100644 --- a/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/Test.kt +++ b/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/Test.kt @@ -16,9 +16,19 @@ package io.rsocket.kotlin.test +import io.rsocket.kotlin.logging.* import kotlinx.coroutines.* +import kotlin.time.* -internal actual fun runTest(block: suspend CoroutineScope.() -> Unit): dynamic = GlobalScope.promise(block = block) +internal actual fun runTest( + ignoreNative: Boolean, + block: suspend CoroutineScope.() -> Unit, +): dynamic = GlobalScope.promise(block = block) //JS is single threaded, so it have only one dispatcher backed by one threed actual val anotherDispatcher: CoroutineDispatcher get() = Dispatchers.Default + +actual val TestLoggerFactory: LoggerFactory = ConsoleLogger + +actual val TransportTestLongDuration: Duration = 10.minutes +actual val TransportTestDefaultDuration: Duration = 5.minutes diff --git a/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/TestPacketStore.kt b/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/TestPacketStore.kt new file mode 100644 index 000000000..33f6e2f3a --- /dev/null +++ b/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/TestPacketStore.kt @@ -0,0 +1,29 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.test + +import io.ktor.utils.io.core.* + +actual class TestPacketStore actual constructor() { + private val _stored = mutableListOf() + + actual val stored: List get() = _stored + + actual fun store(packet: ByteReadPacket) { + _stored += packet + } +} diff --git a/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Test.kt b/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Test.kt index fd7645189..79ba04fb8 100644 --- a/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Test.kt +++ b/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Test.kt @@ -16,16 +16,28 @@ package io.rsocket.kotlin.test +import io.rsocket.kotlin.logging.* import kotlinx.coroutines.* import java.io.* import java.util.logging.* +import kotlin.time.* -internal actual fun runTest(block: suspend CoroutineScope.() -> Unit) { +internal actual fun runTest( + ignoreNative: Boolean, + block: suspend CoroutineScope.() -> Unit, +) { + runBlocking(block = block) +} + +actual val anotherDispatcher: CoroutineDispatcher get() = Dispatchers.IO + +actual val TestLoggerFactory: LoggerFactory = run { //init logger val file = File("src/jvmTest/resources/logging.properties") if (file.exists()) LogManager.getLogManager().readConfiguration(file.inputStream()) - runBlocking(block = block) + JavaLogger } -actual val anotherDispatcher: CoroutineDispatcher get() = Dispatchers.IO +actual val TransportTestLongDuration: Duration = 10.minutes +actual val TransportTestDefaultDuration: Duration = 5.minutes diff --git a/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/TestPacketStore.kt b/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/TestPacketStore.kt new file mode 100644 index 000000000..33f6e2f3a --- /dev/null +++ b/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/TestPacketStore.kt @@ -0,0 +1,29 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.test + +import io.ktor.utils.io.core.* + +actual class TestPacketStore actual constructor() { + private val _stored = mutableListOf() + + actual val stored: List get() = _stored + + actual fun store(packet: ByteReadPacket) { + _stored += packet + } +} diff --git a/rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/Test.kt b/rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/Test.kt new file mode 100644 index 000000000..5ca674bcc --- /dev/null +++ b/rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/Test.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.test + +import io.rsocket.kotlin.logging.* +import kotlinx.coroutines.* +import kotlin.time.* + +internal actual fun runTest( + ignoreNative: Boolean, + block: suspend CoroutineScope.() -> Unit, +) { + if (ignoreNative) return + + runBlocking(block = block) +} + +actual val anotherDispatcher: CoroutineDispatcher get() = newSingleThreadContext("another") + +@SharedImmutable +actual val TestLoggerFactory: LoggerFactory = PrintLogger + +@SharedImmutable +actual val TransportTestLongDuration: Duration = 100.minutes + +@SharedImmutable +actual val TransportTestDefaultDuration: Duration = 10.minutes diff --git a/rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/TestPacketStore.kt b/rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/TestPacketStore.kt new file mode 100644 index 000000000..2cad54faf --- /dev/null +++ b/rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/TestPacketStore.kt @@ -0,0 +1,36 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.test + +import io.ktor.utils.io.core.* +import kotlinx.atomicfu.* + +actual class TestPacketStore { + private val sentIndex = atomic(0) + private val _stored = atomicArrayOfNulls(100) //max 100 in cache + + actual val stored: List + get() = buildList { + repeat(sentIndex.value) { + add(_stored[it].value!!) + } + } + + actual fun store(packet: ByteReadPacket) { + _stored[sentIndex.getAndIncrement()].value = packet + } +} diff --git a/rsocket-transport-ktor/build.gradle.kts b/rsocket-transport-ktor/build.gradle.kts index 79bd49db9..218c37596 100644 --- a/rsocket-transport-ktor/build.gradle.kts +++ b/rsocket-transport-ktor/build.gradle.kts @@ -23,26 +23,29 @@ plugins { } val ktorVersion: String by rootProject +val kotlinxAtomicfuVersion: String by rootProject kotlin { - jvm() - js() - sourceSets { val commonMain by getting { dependencies { + api(project(":rsocket-core")) + api("io.ktor:ktor-network:$ktorVersion") api("io.ktor:ktor-http-cio:$ktorVersion") - api(project(":rsocket-core")) } } val commonTest by getting { dependencies { implementation(project(":rsocket-transport-ktor-client")) + + implementation("org.jetbrains.kotlinx:atomicfu:$kotlinxAtomicfuVersion") } } val jvmTest by getting { dependencies { + implementation(project(":rsocket-transport-ktor-server")) + implementation("io.ktor:ktor-client-cio:$ktorVersion") implementation("io.ktor:ktor-client-okhttp:$ktorVersion") @@ -50,8 +53,6 @@ kotlin { implementation("io.ktor:ktor-server-netty:$ktorVersion") implementation("io.ktor:ktor-server-jetty:$ktorVersion") implementation("io.ktor:ktor-server-tomcat:$ktorVersion") - - implementation(project(":rsocket-transport-ktor-server")) } } } diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-client/build.gradle.kts b/rsocket-transport-ktor/rsocket-transport-ktor-client/build.gradle.kts index 45c95da39..3946c6f87 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-client/build.gradle.kts +++ b/rsocket-transport-ktor/rsocket-transport-ktor-client/build.gradle.kts @@ -25,9 +25,6 @@ plugins { val ktorVersion: String by rootProject kotlin { - jvm() - js() - sourceSets { val commonMain by getting { dependencies { diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-server/build.gradle.kts b/rsocket-transport-ktor/rsocket-transport-ktor-server/build.gradle.kts index 52bb42d4e..2646d9d4a 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-server/build.gradle.kts +++ b/rsocket-transport-ktor/rsocket-transport-ktor-server/build.gradle.kts @@ -25,8 +25,6 @@ plugins { val ktorVersion: String by rootProject kotlin { - jvm() - sourceSets { val commonMain by getting { dependencies { diff --git a/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/connection/KtorTcpConnection.kt b/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/connection/KtorTcpConnection.kt index 79f32f828..8e2f55872 100644 --- a/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/connection/KtorTcpConnection.kt +++ b/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/connection/KtorTcpConnection.kt @@ -18,29 +18,49 @@ package io.rsocket.kotlin.connection import io.ktor.network.sockets.* import io.ktor.util.* +import io.ktor.util.cio.* import io.ktor.utils.io.* import io.ktor.utils.io.core.* import io.rsocket.kotlin.frame.io.* import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlin.coroutines.* val Socket.connection: Connection get() = KtorTcpConnection(this) -//TODO need to check and extract length support!! -@OptIn(KtorExperimentalAPI::class) -private class KtorTcpConnection(private val socket: Socket) : Connection { - override val job: Job get() = socket.socketContext +@OptIn(KtorExperimentalAPI::class, ExperimentalCoroutinesApi::class) +private class KtorTcpConnection(private val socket: Socket) : Connection, CoroutineScope { + override val job: Job = Job(socket.socketContext) + override val coroutineContext: CoroutineContext = job + Dispatchers.Unconfined - private val readChannel = socket.openReadChannel() - private val writeChannel = socket.openWriteChannel(true) + private val sendChannel = Channel(8) + private val receiveChannel = Channel(8) - override suspend fun send(packet: ByteReadPacket): Unit = writeChannel.run { - val length = packet.remaining.toInt() - writePacket { writeLength(length) } - writePacket(packet) + init { + launch { + socket.openWriteChannel(autoFlush = true).use { + while (isActive) { + val packet = sendChannel.receive() + val length = packet.remaining.toInt() + writePacket { + writeLength(length) + writePacket(packet) + } + } + } + } + launch { + socket.openReadChannel().apply { + while (isActive) { + val length = readPacket(3).readLength() + val packet = readPacket(length) + receiveChannel.send(packet) + } + } + } } - override suspend fun receive(): ByteReadPacket = readChannel.run { - val length = readPacket(3).readLength() - readPacket(length) - } + override suspend fun send(packet: ByteReadPacket): Unit = sendChannel.send(packet) + + override suspend fun receive(): ByteReadPacket = receiveChannel.receive() } diff --git a/rsocket-transport-ktor/src/commonTest/kotlin/io/rsocket/kotlin/TcpTransportTest.kt b/rsocket-transport-ktor/src/commonTest/kotlin/io/rsocket/kotlin/TcpTransportTest.kt new file mode 100644 index 000000000..b5c4dcf42 --- /dev/null +++ b/rsocket-transport-ktor/src/commonTest/kotlin/io/rsocket/kotlin/TcpTransportTest.kt @@ -0,0 +1,56 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin + +import io.ktor.network.selector.* +import io.ktor.network.sockets.* +import io.rsocket.kotlin.connection.* +import io.rsocket.kotlin.test.* +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import kotlin.random.* + +abstract class TcpTransportTest : TransportTest() { + abstract val clientSelector: SelectorManager + abstract val serverSelector: SelectorManager + + private lateinit var server: ServerSocket + private lateinit var serverJob: Job + + override suspend fun before(): Unit = coroutineScope { + val tempServer = aSocket(serverSelector).tcp().bind("0.0.0.0", port.incrementAndGet()) + val serverDef = async { tempServer.accept() } + val clientSocket = aSocket(clientSelector).tcp().connect(tempServer.localAddress) + val serverSocket = serverDef.await() + val serverJobDef = async { serverSocket.connection.startServer(SERVER_CONFIG, ACCEPTOR) } + client = clientSocket.connection.connectClient(CONNECTOR_CONFIG) + serverJob = serverJobDef.await() + server = tempServer + } + + override suspend fun after() { + serverJob.cancel() + client.cancel() + server.close() + serverJob.join() + client.job.join() + } + + companion object { + private val port = atomic(Random.nextInt(20, 90) * 100) + } +} diff --git a/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/JvmTcpTransportTest.kt b/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/JvmTcpTransportTest.kt new file mode 100644 index 000000000..a6e4d5469 --- /dev/null +++ b/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/JvmTcpTransportTest.kt @@ -0,0 +1,29 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin + +import io.ktor.network.selector.* +import kotlinx.coroutines.* + +class JvmTcpTransportTest : TcpTransportTest() { + override val clientSelector: SelectorManager get() = selector + override val serverSelector: SelectorManager get() = selector + + companion object { + private val selector = SelectorManager(Dispatchers.IO) + } +} diff --git a/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/TcpTransportTest.kt b/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/TcpTransportTest.kt deleted file mode 100644 index 326abeeed..000000000 --- a/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/TcpTransportTest.kt +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin - -import io.ktor.network.selector.* -import io.ktor.network.sockets.* -import io.ktor.util.* -import io.rsocket.kotlin.connection.* -import io.rsocket.kotlin.test.* -import kotlinx.coroutines.* - -class TcpTransportTest : TransportTest() { - @OptIn(InternalAPI::class) - private val selector = SelectorManager(Dispatchers.IO) - private val builder = aSocket(selector).tcp() - private val server = builder.bind() - - override suspend fun before() { - super.before() - - GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) { - server.accept().connection.startServer(SERVER_CONFIG, ACCEPTOR) - } - - client = builder.connect(server.localAddress).connection.connectClient(CONNECTOR_CONFIG) - } - - override suspend fun after() { - super.after() - - server.close() - selector.close() - server.socketContext.join() - } - -} diff --git a/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/WebSocketTransportTest.kt b/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/WebSocketTransportTest.kt index c59ed6ca6..12560b5d0 100644 --- a/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/WebSocketTransportTest.kt +++ b/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/WebSocketTransportTest.kt @@ -23,7 +23,9 @@ import io.ktor.routing.* import io.ktor.server.engine.* import io.rsocket.kotlin.core.* import io.rsocket.kotlin.test.* +import kotlinx.atomicfu.* import kotlinx.coroutines.* +import kotlin.random.* import io.ktor.client.features.websocket.WebSockets as ClientWebSockets import io.ktor.websocket.WebSockets as ServerWebSockets @@ -39,7 +41,9 @@ abstract class WebSocketTransportTest( } } - private val server = embeddedServer(serverEngine, port = 9000) { + private val currentPort = port.incrementAndGet() + + private val server = embeddedServer(serverEngine, currentPort) { install(ServerWebSockets) install(RSocketServerSupport) { fromConfig(SERVER_CONFIG) @@ -52,8 +56,8 @@ abstract class WebSocketTransportTest( override suspend fun before() { super.before() - trySeveralTimes { server.start() } - client = trySeveralTimes { httpClient.rSocket(port = 9000) } + server.start() + client = trySeveralTimes { httpClient.rSocket(port = currentPort) } } override suspend fun after() { @@ -64,14 +68,18 @@ abstract class WebSocketTransportTest( private suspend inline fun trySeveralTimes(block: () -> R): R { lateinit var error: Throwable - repeat(5) { + repeat(10) { try { return block() } catch (e: Throwable) { error = e - delay(500) //sometimes address isn't yet available + delay(500) //sometimes address isn't yet available (server isn't started) } } throw error } + + companion object { + private val port = atomic(Random.nextInt(20, 90) * 100) + } } diff --git a/rsocket-transport-ktor/src/nativeTest/kotlin/io/rsocket/kotlin/NativeTcpTransportTest.kt b/rsocket-transport-ktor/src/nativeTest/kotlin/io/rsocket/kotlin/NativeTcpTransportTest.kt new file mode 100644 index 000000000..6eea0d6e9 --- /dev/null +++ b/rsocket-transport-ktor/src/nativeTest/kotlin/io/rsocket/kotlin/NativeTcpTransportTest.kt @@ -0,0 +1,29 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin + +import io.ktor.network.selector.* + +class NativeTcpTransportTest : TcpTransportTest() { + override val clientSelector: SelectorManager get() = selectorClient + override val serverSelector: SelectorManager get() = selectorServer + + companion object { + private val selectorClient = SelectorManager() + private val selectorServer = SelectorManager() + } +} diff --git a/rsocket-transport-local/build.gradle.kts b/rsocket-transport-local/build.gradle.kts index c6fd80329..09bf44c9b 100644 --- a/rsocket-transport-local/build.gradle.kts +++ b/rsocket-transport-local/build.gradle.kts @@ -23,9 +23,6 @@ plugins { } kotlin { - jvm() - js() - sourceSets { val commonMain by getting { dependencies { diff --git a/settings.gradle.kts b/settings.gradle.kts index 404fbb82b..273e3686b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -28,6 +28,10 @@ pluginManagement { } } +plugins { + id("com.gradle.enterprise") version "3.4.1" +} + rootProject.name = "rsocket-kotlin" include("benchmarks") @@ -51,3 +55,10 @@ fun includeExample(name: String) { includeExample("nodejs-tcp-transport") includeExample("interactions") includeExample("multiplatform-chat") + +gradleEnterprise { + buildScan { + termsOfServiceUrl = "https://gradle.com/terms-of-service" + termsOfServiceAgree = "yes" + } +}