diff --git a/kotlin-java-client/.gitignore b/kotlin-java-client/.gitignore new file mode 100644 index 00000000..e51bc30b --- /dev/null +++ b/kotlin-java-client/.gitignore @@ -0,0 +1,2 @@ +build/* +.gradle/* diff --git a/kotlin-java-client/README.md b/kotlin-java-client/README.md new file mode 100644 index 00000000..91b51dec --- /dev/null +++ b/kotlin-java-client/README.md @@ -0,0 +1,95 @@ +# RabbitMQ Tutorials in Kotlin + +This is a minimalistic Kotlin port of the [RabbitMQ tutorials in Java](https://www.rabbitmq.com/getstarted.html). +The port is admittedly quite close to Java in terms of code style. + + +## Compiling the Code + +``` shell +gradle clean compileKotlin +``` + +## Running the Tutorials + +### Tutorial 1 + +Execute the following command to start a Hello, world consumer + +``` shell +gradle run -P main=Recv +``` + +Execute the following in a separate shell to publish a Hello, world messge: + +``` shell +gradle run -P main=Send +``` + +### Tutorial 2 + +Send a task message. The task will be completed immediately + +``` shell +gradle run -P main=NewTask +``` + +To start a worker (run in a separate shell): + +``` shell +gradle run -P main=Worker +``` + +Send a task message. It will wait for 1 second for each dot in the payload. + +``` shell +gradle run -P main=NewTask -P argv="rabbit1 ...." +``` + +Add more workers to the same queue, message will be distributed in the +round robin manner. + +### Tutorial 3 + +``` shell +gradle run -P main=ReceiveLogs +``` + + +``` shell +gradle run -P main=EmitLog -P argv="rabbit1, msg1" +``` + +### Tutorial 4 + +``` shell +gradle run -P main="ReceiveLogsDirect" -P argv="info,error" +``` + +``` shell +gradle run -P main=EmitLogDirect" +``` + +### Tutorial 5 + +``` shell +gradle run -P main=ReceiveLogsTopic -P argv="anonymous.*" +``` + +``` shell +gradle run -P main=EmitLogTopic -P argv="anonymous.info" +``` + +### Tutorial 6 + +In one shell: + +``` shell +gradle run -P main=RPCServer +``` + +In another shell: + +``` shell +gradle run -P main=RPCClient +``` diff --git a/kotlin/build.gradle b/kotlin-java-client/build.gradle similarity index 100% rename from kotlin/build.gradle rename to kotlin-java-client/build.gradle diff --git a/kotlin-java-client/gradle.properties b/kotlin-java-client/gradle.properties new file mode 100644 index 00000000..19b3681c --- /dev/null +++ b/kotlin-java-client/gradle.properties @@ -0,0 +1,2 @@ +main = +argv = diff --git a/kotlin/settings.gradle b/kotlin-java-client/settings.gradle similarity index 100% rename from kotlin/settings.gradle rename to kotlin-java-client/settings.gradle diff --git a/kotlin/src/main/kotlin/EmitLog.kt b/kotlin-java-client/src/main/kotlin/EmitLog.kt similarity index 100% rename from kotlin/src/main/kotlin/EmitLog.kt rename to kotlin-java-client/src/main/kotlin/EmitLog.kt diff --git a/kotlin/src/main/kotlin/EmitLogDirect.kt b/kotlin-java-client/src/main/kotlin/EmitLogDirect.kt similarity index 100% rename from kotlin/src/main/kotlin/EmitLogDirect.kt rename to kotlin-java-client/src/main/kotlin/EmitLogDirect.kt diff --git a/kotlin/src/main/kotlin/EmitLogHeader.kt b/kotlin-java-client/src/main/kotlin/EmitLogHeader.kt similarity index 100% rename from kotlin/src/main/kotlin/EmitLogHeader.kt rename to kotlin-java-client/src/main/kotlin/EmitLogHeader.kt diff --git a/kotlin/src/main/kotlin/EmitLogTopic.kt b/kotlin-java-client/src/main/kotlin/EmitLogTopic.kt similarity index 100% rename from kotlin/src/main/kotlin/EmitLogTopic.kt rename to kotlin-java-client/src/main/kotlin/EmitLogTopic.kt diff --git a/kotlin/src/main/kotlin/NewTask.kt b/kotlin-java-client/src/main/kotlin/NewTask.kt similarity index 100% rename from kotlin/src/main/kotlin/NewTask.kt rename to kotlin-java-client/src/main/kotlin/NewTask.kt diff --git a/kotlin/src/main/kotlin/RPCClient.kt b/kotlin-java-client/src/main/kotlin/RPCClient.kt similarity index 100% rename from kotlin/src/main/kotlin/RPCClient.kt rename to kotlin-java-client/src/main/kotlin/RPCClient.kt diff --git a/kotlin/src/main/kotlin/RPCServer.kt b/kotlin-java-client/src/main/kotlin/RPCServer.kt similarity index 100% rename from kotlin/src/main/kotlin/RPCServer.kt rename to kotlin-java-client/src/main/kotlin/RPCServer.kt diff --git a/kotlin/src/main/kotlin/ReceiveLogHeader.kt b/kotlin-java-client/src/main/kotlin/ReceiveLogHeader.kt similarity index 100% rename from kotlin/src/main/kotlin/ReceiveLogHeader.kt rename to kotlin-java-client/src/main/kotlin/ReceiveLogHeader.kt diff --git a/kotlin/src/main/kotlin/ReceiveLogs.kt b/kotlin-java-client/src/main/kotlin/ReceiveLogs.kt similarity index 100% rename from kotlin/src/main/kotlin/ReceiveLogs.kt rename to kotlin-java-client/src/main/kotlin/ReceiveLogs.kt diff --git a/kotlin/src/main/kotlin/ReceiveLogsDirect.kt b/kotlin-java-client/src/main/kotlin/ReceiveLogsDirect.kt similarity index 100% rename from kotlin/src/main/kotlin/ReceiveLogsDirect.kt rename to kotlin-java-client/src/main/kotlin/ReceiveLogsDirect.kt diff --git a/kotlin/src/main/kotlin/ReceiveLogsTopic.kt b/kotlin-java-client/src/main/kotlin/ReceiveLogsTopic.kt similarity index 100% rename from kotlin/src/main/kotlin/ReceiveLogsTopic.kt rename to kotlin-java-client/src/main/kotlin/ReceiveLogsTopic.kt diff --git a/kotlin/src/main/kotlin/Recv.kt b/kotlin-java-client/src/main/kotlin/Recv.kt similarity index 100% rename from kotlin/src/main/kotlin/Recv.kt rename to kotlin-java-client/src/main/kotlin/Recv.kt diff --git a/kotlin/src/main/kotlin/Send.kt b/kotlin-java-client/src/main/kotlin/Send.kt similarity index 100% rename from kotlin/src/main/kotlin/Send.kt rename to kotlin-java-client/src/main/kotlin/Send.kt diff --git a/kotlin/src/main/kotlin/Worker.kt b/kotlin-java-client/src/main/kotlin/Worker.kt similarity index 100% rename from kotlin/src/main/kotlin/Worker.kt rename to kotlin-java-client/src/main/kotlin/Worker.kt diff --git a/kotlin/.gitignore b/kotlin/.gitignore index e51bc30b..ba8d33ea 100644 --- a/kotlin/.gitignore +++ b/kotlin/.gitignore @@ -1,2 +1,3 @@ build/* .gradle/* +.kotlin diff --git a/kotlin/README.md b/kotlin/README.md index 91b51dec..44e270ef 100644 --- a/kotlin/README.md +++ b/kotlin/README.md @@ -1,95 +1,116 @@ -# RabbitMQ Tutorials in Kotlin +# RabbitMQ Tutorials in Kotlin (JVM & Native) -This is a minimalistic Kotlin port of the [RabbitMQ tutorials in Java](https://www.rabbitmq.com/getstarted.html). -The port is admittedly quite close to Java in terms of code style. +This repository contains RabbitMQ tutorials implemented using [Kourier](https://kourier.dev), a pure Kotlin multiplatform AMQP client. +## About -## Compiling the Code +These tutorials demonstrate the core concepts of RabbitMQ using Kotlin and the Kourier AMQP client library. All examples work on JVM, macOS (ARM64), Linux (x64), and Windows (x64) platforms through Kotlin Multiplatform. -``` shell -gradle clean compileKotlin +## Prerequisites + +- RabbitMQ server running on localhost (default port 5672) +- Kotlin 2.1.21 +- Gradle + +## Building + +```bash +./gradlew build ``` -## Running the Tutorials +## Tutorials -### Tutorial 1 +### 1. Hello World (HelloWorld.kt) -Execute the following command to start a Hello, world consumer +The simplest thing that does something - sending and receiving messages from a named queue. -``` shell -gradle run -P main=Recv -``` +Functions: +- `send()` - Sends "Hello World!" message to the queue +- `receive()` - Receives and prints messages from the queue -Execute the following in a separate shell to publish a Hello, world messge: +### 2. Work Queues (WorkQueues.kt) -``` shell -gradle run -P main=Send -``` +Distributing time-consuming tasks among multiple workers. -### Tutorial 2 +Functions: +- `newTask(message)` - Sends a task to the work queue (dots represent work time) +- `worker(workerName)` - Processes tasks with fair dispatch and manual acknowledgment -Send a task message. The task will be completed immediately +Key concepts: +- Message durability +- Fair dispatch with `basicQos` +- Manual acknowledgments -``` shell -gradle run -P main=NewTask -``` +### 3. Publish/Subscribe (PublishSubscribe.kt) -To start a worker (run in a separate shell): +Sending messages to many consumers at once using fanout exchanges. -``` shell -gradle run -P main=Worker -``` +Functions: +- `emitLog(message)` - Publishes log messages to all subscribers +- `receiveLogs(subscriberName)` - Subscribes to all log messages -Send a task message. It will wait for 1 second for each dot in the payload. +Key concepts: +- Fanout exchanges +- Temporary queues +- Broadcasting messages -``` shell -gradle run -P main=NewTask -P argv="rabbit1 ...." -``` +### 4. Routing (Routing.kt) -Add more workers to the same queue, message will be distributed in the -round robin manner. +Receiving messages selectively using direct exchanges and routing keys. -### Tutorial 3 +Functions: +- `emitLogDirect(severity, message)` - Publishes log with specific severity +- `receiveLogsDirect(subscriberName, severities)` - Subscribes to specific severities -``` shell -gradle run -P main=ReceiveLogs -``` +Key concepts: +- Direct exchanges +- Routing keys +- Multiple bindings per queue +### 5. Topics (Topics.kt) -``` shell -gradle run -P main=EmitLog -P argv="rabbit1, msg1" -``` +Receiving messages based on patterns using topic exchanges. -### Tutorial 4 +Functions: +- `emitLogTopic(routingKey, message)` - Publishes with topic routing key +- `receiveLogsTopic(subscriberName, bindingKeys)` - Subscribes using patterns -``` shell -gradle run -P main="ReceiveLogsDirect" -P argv="info,error" -``` +Key concepts: +- Topic exchanges +- Wildcard patterns (`*` = one word, `#` = zero or more words) +- Pattern-based routing -``` shell -gradle run -P main=EmitLogDirect" -``` +### 6. RPC (RPC.kt) -### Tutorial 5 +Request/reply pattern for remote procedure calls. -``` shell -gradle run -P main=ReceiveLogsTopic -P argv="anonymous.*" -``` +Functions: +- `rpcServer()` - Processes Fibonacci number requests +- `rpcClient(n)` - Sends RPC request and waits for response -``` shell -gradle run -P main=EmitLogTopic -P argv="anonymous.info" -``` +Key concepts: +- Callback queues +- Correlation IDs +- Reply-to pattern -### Tutorial 6 +## Running Examples -In one shell: +These tutorials are designed as library functions. You can call them from your own code or tests. For example: -``` shell -gradle run -P main=RPCServer -``` +```kotlin +import kotlinx.coroutines.runBlocking + +fun main() = runBlocking { + // Example: Run work queue tutorial + launch { worker(this, "Worker-1") } + launch { worker(this, "Worker-2") } -In another shell: + delay(1000) // Give workers time to start -``` shell -gradle run -P main=RPCClient + newTask(this, "Task with work...") +} ``` + +## More Information + +For detailed explanations of each tutorial, see the [Kourier documentation](https://kourier.dev/tutorials/). diff --git a/kotlin/build.gradle.kts b/kotlin/build.gradle.kts new file mode 100644 index 00000000..f33985ee --- /dev/null +++ b/kotlin/build.gradle.kts @@ -0,0 +1,34 @@ +plugins { + kotlin("multiplatform") version "2.1.21" +} + +repositories { + mavenCentral() +} + +kotlin { + // Examples can be executed with JVM or natively for macOS, Linux, or Windows + jvm() + macosArm64() + linuxX64() + mingwX64() + + applyDefaultHierarchyTemplate() + sourceSets { + all { + languageSettings.apply { + optIn("kotlin.uuid.ExperimentalUuidApi") + } + } + val commonMain by getting { + dependencies { + api("dev.kourier:amqp-client:0.3.1") + } + } + val commonTest by getting { + dependencies { + implementation(kotlin("test")) + } + } + } +} diff --git a/kotlin/gradle/wrapper/gradle-wrapper.jar b/kotlin/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 00000000..1b33c55b Binary files /dev/null and b/kotlin/gradle/wrapper/gradle-wrapper.jar differ diff --git a/kotlin/gradle/wrapper/gradle-wrapper.properties b/kotlin/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 00000000..ca025c83 --- /dev/null +++ b/kotlin/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,7 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip +networkTimeout=10000 +validateDistributionUrl=true +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/kotlin/gradlew b/kotlin/gradlew new file mode 100755 index 00000000..23d15a93 --- /dev/null +++ b/kotlin/gradlew @@ -0,0 +1,251 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +# This is normally unused +# shellcheck disable=SC2034 +APP_BASE_NAME=${0##*/} +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s\n' "$PWD" ) || exit + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH="\\\"\\\"" + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + -jar "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/kotlin/gradlew.bat b/kotlin/gradlew.bat new file mode 100644 index 00000000..db3a6ac2 --- /dev/null +++ b/kotlin/gradlew.bat @@ -0,0 +1,94 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem +@rem SPDX-License-Identifier: Apache-2.0 +@rem + +@if "%DEBUG%"=="" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if %ERRORLEVEL% equ 0 goto execute + +echo. 1>&2 +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. 1>&2 +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH= + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %* + +:end +@rem End local scope for the variables with windows NT shell +if %ERRORLEVEL% equ 0 goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/kotlin/settings.gradle.kts b/kotlin/settings.gradle.kts new file mode 100644 index 00000000..fc6d81fb --- /dev/null +++ b/kotlin/settings.gradle.kts @@ -0,0 +1 @@ +rootProject.name = "rabbitmq-tutorial" diff --git a/kotlin/src/commonMain/kotlin/HelloWorld.kt b/kotlin/src/commonMain/kotlin/HelloWorld.kt new file mode 100644 index 00000000..8d1e1cac --- /dev/null +++ b/kotlin/src/commonMain/kotlin/HelloWorld.kt @@ -0,0 +1,60 @@ +import dev.kourier.amqp.Properties +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import io.ktor.utils.io.core.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking + +val queueName = "hello" + +suspend fun send(coroutineScope: CoroutineScope) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + channel.queueDeclare(queueName, durable = false, exclusive = false, autoDelete = false, arguments = emptyMap()) + val message = "Hello World!" + channel.basicPublish(message.toByteArray(), exchange = "", routingKey = queueName, properties = Properties()) + println("[x] Sent '$message'") + + channel.close() + connection.close() +} + +suspend fun receive(coroutineScope: CoroutineScope) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + channel.queueDeclare(queueName, durable = false, exclusive = false, autoDelete = false, arguments = emptyMap()) + println("[*] Waiting for messages. To exit press CTRL+C") + + val consumer = channel.basicConsume(queueName, noAck = true) + for (delivery in consumer) { + val message: String = delivery.message.body.decodeToString() + println("[x] Received '$message'") + } + + channel.close() + connection.close() +} + +fun main() = runBlocking { + val coroutineScope = this + + launch { send(coroutineScope) } + launch { receive(coroutineScope) } + + delay(Long.MAX_VALUE) // Keep the main thread alive to allow the consumer to run +} + diff --git a/kotlin/src/commonMain/kotlin/PublishSubscribe.kt b/kotlin/src/commonMain/kotlin/PublishSubscribe.kt new file mode 100644 index 00000000..fcd04eba --- /dev/null +++ b/kotlin/src/commonMain/kotlin/PublishSubscribe.kt @@ -0,0 +1,89 @@ +import dev.kourier.amqp.BuiltinExchangeType +import dev.kourier.amqp.Properties +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import io.ktor.utils.io.core.* +import kotlinx.coroutines.* + +suspend fun emitLog(coroutineScope: CoroutineScope, message: String) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + // Declare a fanout exchange - broadcasts to all bound queues + channel.exchangeDeclare( + "logs", + BuiltinExchangeType.FANOUT, + durable = false, + autoDelete = false, + internal = false, + arguments = emptyMap() + ) + + // Publish to the exchange (routing key is ignored for fanout) + channel.basicPublish( + message.toByteArray(), + exchange = "logs", + routingKey = "", // Routing key is ignored by fanout exchanges + properties = Properties() + ) + println(" [x] Sent '$message'") + + channel.close() + connection.close() +} + +suspend fun receiveLogs(coroutineScope: CoroutineScope, subscriberName: String) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + // Declare the same fanout exchange + channel.exchangeDeclare( + "logs", + BuiltinExchangeType.FANOUT, + durable = false, + autoDelete = false, + internal = false, + arguments = emptyMap() + ) + + // Declare a temporary, exclusive, auto-delete queue + // The server generates a unique name for us + val queueDeclared = channel.queueDeclare( + name = "", // Empty name = server generates a unique name + durable = false, + exclusive = true, // Queue is deleted when connection closes + autoDelete = true, // Queue is deleted when no consumers + arguments = emptyMap() + ) + val queueName = queueDeclared.queueName + println(" [$subscriberName] Created temporary queue: $queueName") + + // Bind the queue to the exchange + channel.queueBind( + queue = queueName, + exchange = "logs", + routingKey = "" // Routing key is ignored for fanout + ) + println(" [$subscriberName] Waiting for logs. To exit press CTRL+C") + + // Consume messages with auto-ack (since these are just logs) + val consumer = channel.basicConsume(queueName, noAck = true) + + for (delivery in consumer) { + val message = delivery.message.body.decodeToString() + println(" [$subscriberName] $message") + } + + channel.close() + connection.close() +} diff --git a/kotlin/src/commonMain/kotlin/RPC.kt b/kotlin/src/commonMain/kotlin/RPC.kt new file mode 100644 index 00000000..460d49b8 --- /dev/null +++ b/kotlin/src/commonMain/kotlin/RPC.kt @@ -0,0 +1,161 @@ +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import dev.kourier.amqp.properties +import io.ktor.utils.io.core.* +import kotlinx.coroutines.* +import kotlin.uuid.Uuid + +/** + * Fibonacci function - calculates Fibonacci number recursively. + * Note: This is a simple recursive implementation for demonstration. + * Not suitable for large numbers in production. + */ +private fun fib(n: Int): Int { + return when { + n == 0 -> 0 + n == 1 -> 1 + else -> fib(n - 1) + fib(n - 2) + } +} + +/** + * RPC Server - Processes Fibonacci requests and sends responses. + */ +suspend fun rpcServer(coroutineScope: CoroutineScope) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + try { + // Declare the RPC queue + channel.queueDeclare( + "rpc_queue", + durable = false, + exclusive = false, + autoDelete = false, + arguments = emptyMap() + ) + + // Fair dispatch - don't give more than one message at a time + channel.basicQos(count = 1u, global = false) + + println(" [x] Awaiting RPC requests") + + // Consume RPC requests + val consumer = channel.basicConsume("rpc_queue", noAck = false) + + for (delivery in consumer) { + try { + val props = delivery.message.properties + val correlationId = props.correlationId + val replyTo = props.replyTo + + // Parse request (expecting an integer) + val requestMessage = delivery.message.body.decodeToString() + val n = requestMessage.toIntOrNull() ?: 0 + + println(" [.] fib($n)") + + // Calculate Fibonacci + val response = fib(n) + + // Build response properties with correlation ID + val replyProps = properties { + this.correlationId = correlationId + } + + // Send response to the callback queue + if (replyTo != null) { + channel.basicPublish( + response.toString().toByteArray(), + exchange = "", + routingKey = replyTo, + properties = replyProps + ) + } + + // Acknowledge the request + channel.basicAck(delivery.message, multiple = false) + } catch (e: Exception) { + println(" [.] Error processing request: ${e.message}") + e.printStackTrace() + channel.basicAck(delivery.message, multiple = false) + } + } + } finally { + // Ensure cleanup happens even when coroutine is cancelled + channel.close() + connection.close() + } +} + +/** + * RPC Client - Sends Fibonacci requests and waits for responses. + */ +suspend fun rpcClient(coroutineScope: CoroutineScope, n: Int): Int { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + try { + // Create an exclusive callback queue for receiving responses + val callbackQueueDeclared = channel.queueDeclare( + name = "", + durable = false, + exclusive = true, // Exclusive to this connection + autoDelete = true, + arguments = emptyMap() + ) + val callbackQueueName = callbackQueueDeclared.queueName + + // Generate a unique correlation ID for this request + val correlationId = Uuid.random().toString() + + // Start consuming BEFORE sending the request to avoid race condition + val consumer = channel.basicConsume(callbackQueueName, noAck = true) + var result = 0 + + // Build request properties + val requestProps = properties { + this.correlationId = correlationId + this.replyTo = callbackQueueName + } + + // Send the RPC request + channel.basicPublish( + n.toString().toByteArray(), + exchange = "", + routingKey = "rpc_queue", + properties = requestProps + ) + println(" [x] Requesting fib($n)") + + withTimeout(10000) { // 10 second timeout + for (delivery in consumer) { + val responseCorrelationId = delivery.message.properties.correlationId + + if (responseCorrelationId == correlationId) { + // Found matching response + val responseMessage = delivery.message.body.decodeToString() + result = responseMessage.toInt() + println(" [.] Got $result") + break + } + } + } + + return result + } finally { + // Ensure cleanup happens even on timeout or error + channel.close() + connection.close() + } +} diff --git a/kotlin/src/commonMain/kotlin/Routing.kt b/kotlin/src/commonMain/kotlin/Routing.kt new file mode 100644 index 00000000..df33a1bb --- /dev/null +++ b/kotlin/src/commonMain/kotlin/Routing.kt @@ -0,0 +1,92 @@ +import dev.kourier.amqp.BuiltinExchangeType +import dev.kourier.amqp.Properties +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import io.ktor.utils.io.core.* +import kotlinx.coroutines.* + +suspend fun emitLogDirect(coroutineScope: CoroutineScope, severity: String, message: String) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + // Declare a direct exchange - routes based on exact routing key match + channel.exchangeDeclare( + "direct_logs", + BuiltinExchangeType.DIRECT, + durable = false, + autoDelete = false, + internal = false, + arguments = emptyMap() + ) + + // Publish with severity as the routing key + channel.basicPublish( + message.toByteArray(), + exchange = "direct_logs", + routingKey = severity, // Routing key determines which queues receive the message + properties = Properties() + ) + println(" [x] Sent '$severity':'$message'") + + channel.close() + connection.close() +} + +suspend fun receiveLogsDirect(coroutineScope: CoroutineScope, subscriberName: String, severities: List) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + // Declare the same direct exchange + channel.exchangeDeclare( + "direct_logs", + BuiltinExchangeType.DIRECT, + durable = false, + autoDelete = false, + internal = false, + arguments = emptyMap() + ) + + // Declare a temporary queue + val queueDeclared = channel.queueDeclare( + name = "", + durable = false, + exclusive = true, + autoDelete = true, + arguments = emptyMap() + ) + val queueName = queueDeclared.queueName + println(" [$subscriberName] Created temporary queue: $queueName") + + // Bind the queue to the exchange with each severity + for (severity in severities) { + channel.queueBind( + queue = queueName, + exchange = "direct_logs", + routingKey = severity // Only receive messages with this routing key + ) + println(" [$subscriberName] Binding queue to '$severity'") + } + println(" [$subscriberName] Waiting for ${severities.joinToString(", ")} logs. To exit press CTRL+C") + + // Consume messages + val consumer = channel.basicConsume(queueName, noAck = true) + + for (delivery in consumer) { + val routingKey = delivery.message.routingKey + val message = delivery.message.body.decodeToString() + println(" [$subscriberName] Received '$routingKey':'$message'") + } + + channel.close() + connection.close() +} diff --git a/kotlin/src/commonMain/kotlin/Topics.kt b/kotlin/src/commonMain/kotlin/Topics.kt new file mode 100644 index 00000000..aa158ee7 --- /dev/null +++ b/kotlin/src/commonMain/kotlin/Topics.kt @@ -0,0 +1,92 @@ +import dev.kourier.amqp.BuiltinExchangeType +import dev.kourier.amqp.Properties +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import io.ktor.utils.io.core.* +import kotlinx.coroutines.* + +suspend fun emitLogTopic(coroutineScope: CoroutineScope, routingKey: String, message: String) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + // Declare a topic exchange - routes based on pattern matching + channel.exchangeDeclare( + "topic_logs", + BuiltinExchangeType.TOPIC, + durable = false, + autoDelete = false, + internal = false, + arguments = emptyMap() + ) + + // Publish with a topic routing key (dot-separated words) + channel.basicPublish( + message.toByteArray(), + exchange = "topic_logs", + routingKey = routingKey, // Format: . or similar + properties = Properties() + ) + println(" [x] Sent '$routingKey':'$message'") + + channel.close() + connection.close() +} + +suspend fun receiveLogsTopic(coroutineScope: CoroutineScope, subscriberName: String, bindingKeys: List) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + // Declare the same topic exchange + channel.exchangeDeclare( + "topic_logs", + BuiltinExchangeType.TOPIC, + durable = false, + autoDelete = false, + internal = false, + arguments = emptyMap() + ) + + // Declare a temporary queue + val queueDeclared = channel.queueDeclare( + name = "", + durable = false, + exclusive = true, + autoDelete = true, + arguments = emptyMap() + ) + val queueName = queueDeclared.queueName + println(" [$subscriberName] Created temporary queue: $queueName") + + // Bind the queue with topic patterns + for (bindingKey in bindingKeys) { + channel.queueBind( + queue = queueName, + exchange = "topic_logs", + routingKey = bindingKey // Pattern with wildcards: * or # + ) + println(" [$subscriberName] Binding queue to pattern '$bindingKey'") + } + println(" [$subscriberName] Waiting for messages matching ${bindingKeys.joinToString(", ")}") + + // Consume messages + val consumer = channel.basicConsume(queueName, noAck = true) + + for (delivery in consumer) { + val routingKey = delivery.message.routingKey + val message = delivery.message.body.decodeToString() + println(" [$subscriberName] Received '$routingKey':'$message'") + } + + channel.close() + connection.close() +} diff --git a/kotlin/src/commonMain/kotlin/WorkQueues.kt b/kotlin/src/commonMain/kotlin/WorkQueues.kt new file mode 100644 index 00000000..4d53e8a7 --- /dev/null +++ b/kotlin/src/commonMain/kotlin/WorkQueues.kt @@ -0,0 +1,95 @@ +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import dev.kourier.amqp.properties +import io.ktor.utils.io.core.* +import kotlinx.coroutines.* + +suspend fun newTask(coroutineScope: CoroutineScope, message: String) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + // Declare a durable queue to survive broker restarts + channel.queueDeclare( + "task_queue", + durable = true, // Queue survives broker restart + exclusive = false, + autoDelete = false, + arguments = emptyMap() + ) + + // Mark messages as persistent (deliveryMode = 2) + val properties = properties { + deliveryMode = 2u // Persistent message + } + + channel.basicPublish( + message.toByteArray(), + exchange = "", + routingKey = "task_queue", + properties = properties + ) + println(" [x] Sent '$message'") + + channel.close() + connection.close() +} + +suspend fun worker(coroutineScope: CoroutineScope, workerName: String) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + // Declare the same durable queue + channel.queueDeclare( + "task_queue", + durable = true, + exclusive = false, + autoDelete = false, + arguments = emptyMap() + ) + println(" [$workerName] Waiting for messages. To exit press CTRL+C") + + // Fair dispatch: don't give more than one message to a worker at a time + channel.basicQos(count = 1u, global = false) + + // Consume with manual acknowledgment (noAck = false) + val consumer = channel.basicConsume("task_queue", noAck = false) + + for (delivery in consumer) { + val message = delivery.message.body.decodeToString() + println(" [$workerName] Received '$message'") + + try { + // Simulate work - each dot represents 1 second of work + doWork(message) + println(" [$workerName] Done") + } finally { + // Manual acknowledgment - message is removed from queue + channel.basicAck(delivery.message, multiple = false) + } + } + + channel.close() + connection.close() +} + +/** + * Simulates time-consuming work. + * Each dot in the task string represents 1 second of work. + */ +private suspend fun doWork(task: String) { + for (ch in task) { + if (ch == '.') { + delay(1000) // Sleep for 1 second per dot + } + } +}