From c93c7b097cf76057571267c4ed4fdb0938d8ad97 Mon Sep 17 00:00:00 2001 From: Giselle van Dongen Date: Thu, 27 Feb 2025 16:03:34 +0100 Subject: [PATCH 1/9] Add event processing patterns for Kotlin --- kotlin/patterns-use-cases/README.md | 174 ++++++++++++++++++ kotlin/patterns-use-cases/docker-compose.yaml | 39 ++++ kotlin/patterns-use-cases/restate.toml | 3 + .../example/eventenrichment/PackageTracker.kt | 49 +++++ .../my/example/eventtransactions/UserFeed.kt | 37 ++++ .../example/eventtransactions/utils/stubs.kt | 28 +++ 6 files changed, 330 insertions(+) create mode 100644 kotlin/patterns-use-cases/docker-compose.yaml create mode 100644 kotlin/patterns-use-cases/restate.toml create mode 100644 kotlin/patterns-use-cases/src/main/kotlin/my/example/eventenrichment/PackageTracker.kt create mode 100644 kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/UserFeed.kt create mode 100644 kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/utils/stubs.kt diff --git a/kotlin/patterns-use-cases/README.md b/kotlin/patterns-use-cases/README.md index 3c905378..850774cd 100644 --- a/kotlin/patterns-use-cases/README.md +++ b/kotlin/patterns-use-cases/README.md @@ -73,4 +73,178 @@ dev.restate.sdk.common.TerminalException: Failed to reserve the trip: 👻 Payme ``` + + +## Transactional Event Processing +[](src/main/kotlin/my/example/eventtransactions/UserFeed.kt) + +Processing events (from Kafka) to update various downstream systems. +- Durable side effects with retries and recovery of partial progress +- Events get sent to objects based on the Kafka key. + For each key, Restate ensures that events are processed sequentially and in order. + Slow events on other keys do not block processing (high fan-out, no head-of-line waiting). +- Ability to delay events when the downstream systems are busy, without blocking + entire partitions. + +
+Running the example + +1. Start the Kafka broker via Docker Compose: `docker compose up -d`. +2. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) with the Kafka broker configuration in a separate shell: `restate-server --config-file restate.toml` +3. Start the service: `./gradlew -PmainClass=my.example.eventtransactions.UserFeedKt run` +4. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080` +5. Let Restate subscribe to the Kafka topic `social-media-posts` and invoke `UserFeed/processPost` on each message. + ```shell + curl localhost:9070/subscriptions -H 'content-type: application/json' \ + -d '{ + "source": "kafka://my-cluster/social-media-posts", + "sink": "service://UserFeed/processPost", + "options": {"auto.offset.reset": "earliest"} + }' + ``` + +Start a Kafka producer and send some messages to the `social-media-posts` topic: +```shell +docker exec -it broker kafka-console-producer --bootstrap-server broker:29092 --topic social-media-posts --property parse.key=true --property key.separator=: +``` + +Let's submit some posts for two different users: +``` +userid1:{"content": "Hi! This is my first post!", "metadata": "public"} +userid2:{"content": "Hi! This is my first post!", "metadata": "public"} +userid1:{"content": "Hi! This is my second post!", "metadata": "public"} +``` + +Our Kafka broker only has a single partition so all these messages end up on the same partition. +You can see in the logs how events for different users are processed in parallel, but events for the same user are processed sequentially. + + +
+View logs + +```shell +2025-02-27 15:53:37 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Creating post ee5b9dde-fc81-4819-a411-916e5c2b0c0d for user userid2 +2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] UserFeed - Creating post ea2eb2e4-aeb1-4cee-a903-a6399f0ee6ca for user userid1 +2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] UserFeed - Content moderation for post ea2eb2e4-aeb1-4cee-a903-a6399f0ee6ca is still pending... Will check again in 5 seconds +2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] UserFeed - Content moderation for post ea2eb2e4-aeb1-4cee-a903-a6399f0ee6ca is done +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] UserFeed - Updating user feed for user userid1 with post ea2eb2e4-aeb1-4cee-a903-a6399f0ee6ca +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] dev.restate.sdk.core.InvocationStateMachine - End invocation +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] UserFeed - Creating post 382f3687-fb11-49fa-912c-18a886dd1ecd for user userid1 +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] UserFeed - Content moderation for post 382f3687-fb11-49fa-912c-18a886dd1ecd is still pending... Will check again in 5 seconds +2025-02-27 15:53:48 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:54:23 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:54:28 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:54:28 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] UserFeed - Content moderation for post 382f3687-fb11-49fa-912c-18a886dd1ecd is done +2025-02-27 15:54:28 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] UserFeed - Updating user feed for user userid1 with post 382f3687-fb11-49fa-912c-18a886dd1ecd +2025-02-27 15:54:28 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] dev.restate.sdk.core.InvocationStateMachine - End invocation +2025-02-27 15:54:33 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:54:38 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:55:03 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is done +2025-02-27 15:55:03 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Updating user feed for user userid2 with post ee5b9dde-fc81-4819-a411-916e5c2b0c0d +2025-02-27 15:55:03 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] dev.restate.sdk.core.InvocationStateMachine - End invocation +``` + +As you see, slow events do not block other slow events. +Restate effectively created a queue per user ID. + +The handler creates the social media post and waits for content moderation to finish. +If the moderation takes long, and there is an infrastructure crash, then Restate will trigger a retry. +The handler will fast-forward to where it was, will recover the post ID and will continue waiting for moderation to finish. + +You can try it out by killing Restate or the service halfway through processing a post. + +
+
+ +## Event Enrichment / Joins +[](src/main/kotlin/my/example/eventenrichment/PackageTracker.kt) + +This example shows an example of: +- **Event enrichment** over different sources: RPC and Kafka +- **Stateful actors / Digital twins** updated over Kafka +- **Streaming join** +- Populating state from events and making it queryable via RPC handlers. + +The example implements a package delivery tracking service. +Packages are registered via an RPC handler, and their location is updated via Kafka events. +The Package Tracker Virtual Object tracks the package details and its location history. + +
+Running the example + +1. Start the Kafka broker via Docker Compose: `docker compose up -d`. + +2. Start Restate Server with the Kafka broker configuration in a separate shell: `restate-server --config-file restate.toml` + +3. Start the service: `./gradlew -PmainClass=my.example.eventenrichment.PackageTrackerKt run` + +4. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080` + +5. Let Restate subscribe to the Kafka topic `package-location-updates` and invoke `PackageTracker/updateLocation` on each message. + ```shell + curl localhost:9070/subscriptions -H 'content-type: application/json' \ + -d '{ + "source": "kafka://my-cluster/package-location-updates", + "sink": "service://PackageTracker/updateLocation", + "options": {"auto.offset.reset": "earliest"} + }' + ``` + +6. Register a new package via the RPC handler: + ```shell + curl localhost:8080/PackageTracker/package1/registerPackage \ + -H 'content-type: application/json' -d '{"finalDestination": "Bridge 6, Amsterdam"}' + ``` + +7. Start a Kafka producer and publish some messages to update the location of the package on the `package-location-updates` topic: + ```shell + docker exec -it broker kafka-console-producer --bootstrap-server broker:29092 --topic package-location-updates --property parse.key=true --property key.separator=: + ``` + Send messages like + ``` + package1:{"timestamp": "2024-10-10 13:00", "location": "Pinetree Road 5, Paris"} + package1:{"timestamp": "2024-10-10 14:00", "location": "Mountain Road 155, Brussels"} + ``` + +8. Query the package location via the RPC handler: + ```shell + curl localhost:8080/PackageTracker/package1/getPackageInfo + ``` + or via the CLI: `restate kv get PackageTracker package1` + + You can see how the state was enriched by the initial RPC event and the subsequent Kafka events: + +
+ See Output + + ``` + 🤖 State: + ――――――――― + + Service PackageTracker + Key package1 + + KEY VALUE + package-info { + "finalDestination": "Bridge 6, Amsterdam", + "locations": [ + { + "location": "Pinetree Road 5, Paris", + "timestamp": "2024-10-10 13:00" + }, + { + "location": "Mountain Road 155, Brussels", + "timestamp": "2024-10-10 14:00" + } + ] + } + ``` + +
+
\ No newline at end of file diff --git a/kotlin/patterns-use-cases/docker-compose.yaml b/kotlin/patterns-use-cases/docker-compose.yaml new file mode 100644 index 00000000..2e33e605 --- /dev/null +++ b/kotlin/patterns-use-cases/docker-compose.yaml @@ -0,0 +1,39 @@ +version: '3' +services: + broker: + image: confluentinc/cp-kafka:7.5.0 + container_name: broker + ports: + - "9092:9092" + - "9101:9101" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_NODE_ID: 1 + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093 + KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LOG_DIRS: /tmp/kraft-combined-logs + CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk + + init-kafka: + image: confluentinc/cp-kafka:7.5.0 + depends_on: + - broker + entrypoint: [ '/bin/sh', '-c' ] + command: | + "# blocks until kafka is reachable + kafka-topics --bootstrap-server broker:29092 --list + echo -e 'Creating kafka topics' + kafka-topics --bootstrap-server broker:29092 --create --if-not-exists --topic social-media-posts --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server broker:29092 --create --if-not-exists --topic package-location-updates --replication-factor 1 --partitions 1 + + echo -e 'Successfully created the following topics:' + kafka-topics --bootstrap-server broker:29092 --list" \ No newline at end of file diff --git a/kotlin/patterns-use-cases/restate.toml b/kotlin/patterns-use-cases/restate.toml new file mode 100644 index 00000000..8a0bde1c --- /dev/null +++ b/kotlin/patterns-use-cases/restate.toml @@ -0,0 +1,3 @@ +[[ingress.kafka-clusters]] +name = "my-cluster" +brokers = ["PLAINTEXT://localhost:9092"] \ No newline at end of file diff --git a/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventenrichment/PackageTracker.kt b/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventenrichment/PackageTracker.kt new file mode 100644 index 00000000..bf69c5e8 --- /dev/null +++ b/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventenrichment/PackageTracker.kt @@ -0,0 +1,49 @@ +package my.example.eventenrichment + +import dev.restate.sdk.annotation.Handler +import dev.restate.sdk.annotation.Shared +import dev.restate.sdk.annotation.VirtualObject +import dev.restate.sdk.common.StateKey +import dev.restate.sdk.common.TerminalException +import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder +import dev.restate.sdk.kotlin.KtSerdes +import dev.restate.sdk.kotlin.ObjectContext +import dev.restate.sdk.kotlin.SharedObjectContext +import kotlinx.serialization.Serializable + +@Serializable +data class PackageInfo(val finalDestination: String, val locations: MutableList = mutableListOf()) +@Serializable +data class LocationUpdate(val timestamp: String, val location: String) + +@VirtualObject +class PackageTracker { + + companion object { + private val PACKAGE_INFO = StateKey.of("package-info", KtSerdes.json()) + } + + @Handler + suspend fun registerPackage(ctx: ObjectContext, packageInfo: PackageInfo) { + ctx.set(PACKAGE_INFO, packageInfo) + } + + @Handler + suspend fun updateLocation(ctx: ObjectContext, locationUpdate: LocationUpdate) { + val packageInfo = ctx.get(PACKAGE_INFO) + ?: throw TerminalException("Package not found") + + packageInfo.locations.add(locationUpdate) + ctx.set(PACKAGE_INFO, packageInfo) + } + + @Shared + suspend fun getPackageInfo(ctx: SharedObjectContext): PackageInfo { + return ctx.get(PACKAGE_INFO) + ?: throw TerminalException("Package not found") + } +} + +fun main() { + RestateHttpEndpointBuilder.builder().bind(PackageTracker()).buildAndListen() +} diff --git a/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/UserFeed.kt b/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/UserFeed.kt new file mode 100644 index 00000000..7dd04515 --- /dev/null +++ b/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/UserFeed.kt @@ -0,0 +1,37 @@ +package my.example.eventtransactions + +import dev.restate.sdk.annotation.Handler +import dev.restate.sdk.annotation.VirtualObject +import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder +import dev.restate.sdk.kotlin.ObjectContext +import dev.restate.sdk.kotlin.runBlock +import kotlinx.serialization.Serializable +import my.example.eventtransactions.utils.createPost +import my.example.eventtransactions.utils.getPostStatus +import my.example.eventtransactions.utils.updateUserFeed +import kotlin.time.Duration.Companion.milliseconds + +@VirtualObject +class UserFeed { + + @Serializable + data class SocialMediaPost(val content: String, val metadata: String) + + @Handler + suspend fun processPost(ctx: ObjectContext, post: SocialMediaPost) { + val userId = ctx.key() + + val postId = ctx.runBlock { createPost(userId, post) } + + while (ctx.runBlock { getPostStatus(postId) } == "PENDING") { + ctx.sleep(5000.milliseconds) + } + + ctx.runBlock { updateUserFeed(userId, postId) } + } +} + +fun main() { + RestateHttpEndpointBuilder.builder().bind(UserFeed()).buildAndListen() +} + diff --git a/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/utils/stubs.kt b/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/utils/stubs.kt new file mode 100644 index 00000000..e940cbb1 --- /dev/null +++ b/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/utils/stubs.kt @@ -0,0 +1,28 @@ +package my.example.eventtransactions.utils + +import my.example.eventtransactions.UserFeed +import org.apache.logging.log4j.LogManager +import org.apache.logging.log4j.Logger +import java.util.UUID + +private val logger: Logger = LogManager.getLogger("UserFeed") + +fun createPost(userId: String, post: UserFeed.SocialMediaPost): String { + val postId = UUID.randomUUID().toString() + logger.info("Creating post {} for user {}", postId, userId) + return postId +} + +fun getPostStatus(postId: String): String { + return if (Math.random() < 0.8) { + logger.info("Content moderation for post {} is still pending... Will check again in 5 seconds", postId) + "PENDING" + } else { + logger.info("Content moderation for post {} is done", postId) + "DONE" + } +} + +fun updateUserFeed(userId: String, postId: String) { + logger.info("Updating user feed for user {} with post {}", userId, postId) +} \ No newline at end of file From b57fefc82f797db2c0224a82d1f39f9c1011419b Mon Sep 17 00:00:00 2001 From: Giselle van Dongen Date: Thu, 27 Feb 2025 16:11:50 +0100 Subject: [PATCH 2/9] Add Kotlin event processing examples to the readmes --- kotlin/patterns-use-cases/README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kotlin/patterns-use-cases/README.md b/kotlin/patterns-use-cases/README.md index 850774cd..48a9eb6c 100644 --- a/kotlin/patterns-use-cases/README.md +++ b/kotlin/patterns-use-cases/README.md @@ -1,7 +1,12 @@ # Kotlin Patterns and Use Cases +#### Orchestration patterns - **[Sagas](README.md#sagas)**: Preserve consistency by tracking undo actions and running them when code fails halfway through. [](src/main/kotlin/my/example/sagas/BookingWorkflow.kt) +#### Event processing +- **[Transactional Event Processing](README.md#transactional-event-processing)**: Processing events (from Kafka) to update various downstream systems in a transactional way. [](src/main/kotlin/my/example/eventtransactions/UserFeed.kt) +- **[Event Enrichment / Joins](README.md#event-enrichment--joins)**: Stateful functions/actors connected to Kafka and callable over RPC. [](src/main/kotlin/my/example/eventenrichment/PackageTracker.kt) + ## Sagas [](src/main/kotlin/my/example/sagas/BookingWorkflow.kt) From 2a3d67e1918166a7a46a7d6dad6d1f0eecb736f3 Mon Sep 17 00:00:00 2001 From: Giselle van Dongen Date: Thu, 27 Feb 2025 16:12:37 +0100 Subject: [PATCH 3/9] Add Kotlin event processing examples to the readmes --- README.md | 34 +++++++++++++++++----------------- kotlin/README.md | 5 +++++ 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 59c6e14a..d604f4d3 100644 --- a/README.md +++ b/README.md @@ -52,23 +52,23 @@ Or have a look at the general catalog below: #### Use Cases and Patterns -| Example Name | Languages | -|---------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| Durable RPC, Idempotency & Concurrency | [](typescript/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](go/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](python/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](java/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) | -| \(Delayed\) Message Queue | [](typescript/patterns-use-cases/README.md#delayed-message-queue) [](go/patterns-use-cases/README.md#delayed-message-queue) [](python/patterns-use-cases/README.md#delayed-message-queue) [](java/patterns-use-cases/README.md#delayed-message-queue) | -| Webhook Callbacks | [](typescript/patterns-use-cases/README.md#webhook-callbacks) [](go/patterns-use-cases/README.md#webhook-callbacks) | -| Database Interaction Patterns | [](typescript/patterns-use-cases/README.md#database-interaction-patterns) | -| Convert Sync Tasks to Async | [](typescript/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](go/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](python/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](java/patterns-use-cases/README.md#convert-sync-tasks-to-async) | -| Payments Signals \(Advanced\) | [](typescript/patterns-use-cases/README.md#payment-signals) [](python/patterns-use-cases/README.md#payment-signals) [](java/patterns-use-cases/README.md#payment-signals) | -| Sagas | [](typescript/patterns-use-cases/README.md#sagas) [](go/patterns-use-cases/README.md#sagas) [](python/patterns-use-cases/README.md#sagas) [](java/patterns-use-cases/README.md#sagas) [](kotlin/patterns-use-cases/README.md#sagas) | -| Stateful Actors and State Machines | [](typescript/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](go/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](python/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](java/patterns-use-cases/README.md#stateful-actors-and-state-machines) | -| Payment State Machines \(Advanced\) | [](typescript/patterns-use-cases/README.md#payment-state-machines) [](python/patterns-use-cases/README.md#payment-state-machines) [](java/patterns-use-cases/README.md#payment-state-machines) | -| Scheduling Tasks | [](typescript/patterns-use-cases/README.md#scheduling-tasks) [](go/patterns-use-cases/README.md#scheduling-tasks) | -| Parallelizing Work | [](typescript/patterns-use-cases/README.md#parallelizing-work) [](go/patterns-use-cases/README.md#parallelizing-work) [](python/patterns-use-cases/README.md#parallelizing-work) [](java/patterns-use-cases/README.md#parallelizing-work) | -| Transactional Event Processing | [](typescript/patterns-use-cases/README.md#transactional-event-processing) [](go/patterns-use-cases/README.md#transactional-event-processing) [](python/patterns-use-cases/README.md#transactional-event-processing) [](java/patterns-use-cases/README.md#transactional-event-processing) | -| Event Enrichment / Joins | [](typescript/patterns-use-cases/README.md#event-enrichment--joins) [](go/patterns-use-cases/README.md#event-enrichment--joins) [](python/patterns-use-cases/README.md#event-enrichment--joins) [](java/patterns-use-cases/README.md#event-enrichment--joins) | -| Durable Promises as a Service | [](typescript/patterns-use-cases/README.md#durable-promises-as-a-service) | -| Priority Queue | [](typescript/patterns-use-cases/README.md#priority-queue) | +| Example Name | Languages | +|---------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Durable RPC, Idempotency & Concurrency | [](typescript/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](go/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](python/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](java/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) | +| \(Delayed\) Message Queue | [](typescript/patterns-use-cases/README.md#delayed-message-queue) [](go/patterns-use-cases/README.md#delayed-message-queue) [](python/patterns-use-cases/README.md#delayed-message-queue) [](java/patterns-use-cases/README.md#delayed-message-queue) | +| Webhook Callbacks | [](typescript/patterns-use-cases/README.md#webhook-callbacks) [](go/patterns-use-cases/README.md#webhook-callbacks) | +| Database Interaction Patterns | [](typescript/patterns-use-cases/README.md#database-interaction-patterns) | +| Convert Sync Tasks to Async | [](typescript/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](go/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](python/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](java/patterns-use-cases/README.md#convert-sync-tasks-to-async) | +| Payments Signals \(Advanced\) | [](typescript/patterns-use-cases/README.md#payment-signals) [](python/patterns-use-cases/README.md#payment-signals) [](java/patterns-use-cases/README.md#payment-signals) | +| Sagas | [](typescript/patterns-use-cases/README.md#sagas) [](go/patterns-use-cases/README.md#sagas) [](python/patterns-use-cases/README.md#sagas) [](java/patterns-use-cases/README.md#sagas) [](kotlin/patterns-use-cases/README.md#sagas) | +| Stateful Actors and State Machines | [](typescript/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](go/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](python/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](java/patterns-use-cases/README.md#stateful-actors-and-state-machines) | +| Payment State Machines \(Advanced\) | [](typescript/patterns-use-cases/README.md#payment-state-machines) [](python/patterns-use-cases/README.md#payment-state-machines) [](java/patterns-use-cases/README.md#payment-state-machines) | +| Scheduling Tasks | [](typescript/patterns-use-cases/README.md#scheduling-tasks) [](go/patterns-use-cases/README.md#scheduling-tasks) | +| Parallelizing Work | [](typescript/patterns-use-cases/README.md#parallelizing-work) [](go/patterns-use-cases/README.md#parallelizing-work) [](python/patterns-use-cases/README.md#parallelizing-work) [](java/patterns-use-cases/README.md#parallelizing-work) | +| Transactional Event Processing | [](typescript/patterns-use-cases/README.md#transactional-event-processing) [](go/patterns-use-cases/README.md#transactional-event-processing) [](python/patterns-use-cases/README.md#transactional-event-processing) [](java/patterns-use-cases/README.md#transactional-event-processing) [](kotlin/patterns-use-cases/README.md#transactional-event-processing) | +| Event Enrichment / Joins | [](typescript/patterns-use-cases/README.md#event-enrichment--joins) [](go/patterns-use-cases/README.md#event-enrichment--joins) [](python/patterns-use-cases/README.md#event-enrichment--joins) [](java/patterns-use-cases/README.md#event-enrichment--joins) [](kotlin/patterns-use-cases/README.md#event-enrichment--joins) | +| Durable Promises as a Service | [](typescript/patterns-use-cases/README.md#durable-promises-as-a-service) | +| Priority Queue | [](typescript/patterns-use-cases/README.md#priority-queue) | #### Integrations diff --git a/kotlin/README.md b/kotlin/README.md index fb65446f..7c7ec5d8 100644 --- a/kotlin/README.md +++ b/kotlin/README.md @@ -9,8 +9,13 @@ ## Use Cases and Patterns +#### Orchestration patterns - **[Sagas](patterns-use-cases/README.md#sagas)**: Preserve consistency by tracking undo actions and running them when code fails halfway through. [](patterns-use-cases/src/main/kotlin/my/example/sagas/BookingWorkflow.kt) +#### Event processing +- **[Transactional Event Processing](patterns-use-cases/README.md#transactional-event-processing)**: Processing events (from Kafka) to update various downstream systems in a transactional way. [](patterns-use-cases/src/main/kotlin/my/example/eventtransactions/UserFeed.kt) +- **[Event Enrichment / Joins](patterns-use-cases/README.md#event-enrichment--joins)**: Stateful functions/actors connected to Kafka and callable over RPC. [](patterns-use-cases/src/main/kotlin/my/example/eventenrichment/PackageTracker.kt) + ## Integrations Examples integrating Restate with other tools and frameworks: From c819360e8fb7e30c67b783ac9c670a3810dd1a41 Mon Sep 17 00:00:00 2001 From: Giselle van Dongen Date: Thu, 27 Feb 2025 16:14:08 +0100 Subject: [PATCH 4/9] Fix kotlin side effect run to runBlock --- .../src/main/kotlin/durable_execution/SubscriptionService.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlin/basics/src/main/kotlin/durable_execution/SubscriptionService.kt b/kotlin/basics/src/main/kotlin/durable_execution/SubscriptionService.kt index aa395783..8d9c2ed1 100644 --- a/kotlin/basics/src/main/kotlin/durable_execution/SubscriptionService.kt +++ b/kotlin/basics/src/main/kotlin/durable_execution/SubscriptionService.kt @@ -45,7 +45,7 @@ class SubscriptionService { } for (subscription in req.subscriptions) { - ctx.run { + ctx.runBlock { createSubscription(req.userId, subscription, payRef) } } From afc20cf6e56df2be3c787a703161cfd3c41d0965 Mon Sep 17 00:00:00 2001 From: Giselle van Dongen Date: Mon, 3 Mar 2025 10:59:09 +0100 Subject: [PATCH 5/9] Add queue example for Kotlin --- kotlin/patterns-use-cases/README.md | 14 ++++++ .../my/example/queue/AsyncTaskService.kt | 26 +++++++++++ .../kotlin/my/example/queue/TaskSubmitter.kt | 46 +++++++++++++++++++ 3 files changed, 86 insertions(+) create mode 100644 kotlin/patterns-use-cases/src/main/kotlin/my/example/queue/AsyncTaskService.kt create mode 100644 kotlin/patterns-use-cases/src/main/kotlin/my/example/queue/TaskSubmitter.kt diff --git a/kotlin/patterns-use-cases/README.md b/kotlin/patterns-use-cases/README.md index 48a9eb6c..c99cb663 100644 --- a/kotlin/patterns-use-cases/README.md +++ b/kotlin/patterns-use-cases/README.md @@ -1,5 +1,8 @@ # Kotlin Patterns and Use Cases +### Communication +- **[(Delayed) Message Queue](README.md#delayed-message-queue)**: Use Restate as a queue. Schedule tasks for now or later and ensure the task is only executed once. [](src/main/kotlin/my/example/queue/TaskSubmitter.kt) + #### Orchestration patterns - **[Sagas](README.md#sagas)**: Preserve consistency by tracking undo actions and running them when code fails halfway through. [](src/main/kotlin/my/example/sagas/BookingWorkflow.kt) @@ -7,6 +10,17 @@ - **[Transactional Event Processing](README.md#transactional-event-processing)**: Processing events (from Kafka) to update various downstream systems in a transactional way. [](src/main/kotlin/my/example/eventtransactions/UserFeed.kt) - **[Event Enrichment / Joins](README.md#event-enrichment--joins)**: Stateful functions/actors connected to Kafka and callable over RPC. [](src/main/kotlin/my/example/eventenrichment/PackageTracker.kt) +## (Delayed) Message Queue +[](src/main/kotlin/my/example/queue/TaskSubmitter.kt) + +Use Restate as a queue. Schedule tasks for now or later and ensure the task is only executed once. + +- [Task Submitter](src/main/kotlin/my/example/queue/TaskSubmitter.kt): schedules tasks via send requests with and idempotency key. + - The **send requests** put the tasks in Restate's queue. The task submitter does not wait for the task response. + - The **idempotency key** in the header is used by Restate to deduplicate requests. + - If a delay is set, the task will be executed later and Restate will track the timer durably, like a **delayed task queue**. +- [Async Task Worker](src/main/kotlin/my/example/queue/AsyncTaskWorker.kt): gets invoked by Restate for each task in the queue. + ## Sagas [](src/main/kotlin/my/example/sagas/BookingWorkflow.kt) diff --git a/kotlin/patterns-use-cases/src/main/kotlin/my/example/queue/AsyncTaskService.kt b/kotlin/patterns-use-cases/src/main/kotlin/my/example/queue/AsyncTaskService.kt new file mode 100644 index 00000000..150a7fcc --- /dev/null +++ b/kotlin/patterns-use-cases/src/main/kotlin/my/example/queue/AsyncTaskService.kt @@ -0,0 +1,26 @@ +package my.example.queue + +import dev.restate.sdk.annotation.Handler +import dev.restate.sdk.annotation.Service +import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder +import dev.restate.sdk.kotlin.Context +import kotlinx.serialization.Serializable + +@Service +class AsyncTaskService { + @Handler + suspend fun runTask(ctx: Context, params: TaskOpts): String { + return someHeavyWork(params) + } +} + +fun main() { + RestateHttpEndpointBuilder.builder().bind(AsyncTaskService()).buildAndListen() +} + +@Serializable +class TaskOpts + +fun someHeavyWork(task: TaskOpts): String { + return "someHeavyWork" +} diff --git a/kotlin/patterns-use-cases/src/main/kotlin/my/example/queue/TaskSubmitter.kt b/kotlin/patterns-use-cases/src/main/kotlin/my/example/queue/TaskSubmitter.kt new file mode 100644 index 00000000..19e26e87 --- /dev/null +++ b/kotlin/patterns-use-cases/src/main/kotlin/my/example/queue/TaskSubmitter.kt @@ -0,0 +1,46 @@ +package my.example.queue + +import dev.restate.sdk.client.CallRequestOptions +import dev.restate.sdk.client.Client +import dev.restate.sdk.kotlin.KtSerdes + +/* + * Restate is as a sophisticated task queue, with extra features like: + * - delaying execution and reliable timers + * - stateful tasks + * - queues per key (>< per partition; slow tasks for a key don't block others) + * - retries and recovery upon failures + * + * Every handler in Restate is executed asynchronously and can be treated + * as a reliable asynchronous task. + */ +class TaskSubmitter { + companion object { + private val restateClient: Client = Client.connect("http://localhost:8080") + } + + suspend fun scheduleTask(taskOpts: TaskOpts) { + // submit the task; similar to publishing a message to a queue + // Restate ensures the task is executed exactly once + val handle = + AsyncTaskServiceClient.fromClient(restateClient) + // optionally add a delay to execute the task later + .send(/*5.days*/) + .runTask( + taskOpts, + // use a stable uuid as an idempotency key; Restate deduplicates for us + CallRequestOptions.DEFAULT.withIdempotency("dQw4w9WgXcQ"), + ) + + + // ... do other things while the task is being processed ... + + // await the handler's result; optionally from another process + val result = + restateClient.invocationHandle( + handle.invocationId, + KtSerdes.json(), + ) + .attach() + } +} From 3515ab79194c09740e5266ca56d341ac1f10072e Mon Sep 17 00:00:00 2001 From: Giselle van Dongen Date: Mon, 3 Mar 2025 11:07:55 +0100 Subject: [PATCH 6/9] Add to readmes --- README.md | 2 +- kotlin/README.md | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index d604f4d3..24267560 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ Or have a look at the general catalog below: | Example Name | Languages | |---------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | Durable RPC, Idempotency & Concurrency | [](typescript/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](go/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](python/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](java/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) | -| \(Delayed\) Message Queue | [](typescript/patterns-use-cases/README.md#delayed-message-queue) [](go/patterns-use-cases/README.md#delayed-message-queue) [](python/patterns-use-cases/README.md#delayed-message-queue) [](java/patterns-use-cases/README.md#delayed-message-queue) | +| \(Delayed\) Message Queue | [](typescript/patterns-use-cases/README.md#delayed-message-queue) [](go/patterns-use-cases/README.md#delayed-message-queue) [](python/patterns-use-cases/README.md#delayed-message-queue) [](java/patterns-use-cases/README.md#delayed-message-queue) [](kotlin/patterns-use-cases/README.md#delayed-message-queue) | | Webhook Callbacks | [](typescript/patterns-use-cases/README.md#webhook-callbacks) [](go/patterns-use-cases/README.md#webhook-callbacks) | | Database Interaction Patterns | [](typescript/patterns-use-cases/README.md#database-interaction-patterns) | | Convert Sync Tasks to Async | [](typescript/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](go/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](python/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](java/patterns-use-cases/README.md#convert-sync-tasks-to-async) | diff --git a/kotlin/README.md b/kotlin/README.md index 7c7ec5d8..094f414d 100644 --- a/kotlin/README.md +++ b/kotlin/README.md @@ -9,6 +9,9 @@ ## Use Cases and Patterns +#### Communication +- **[(Delayed) Message Queue](patterns-use-cases/README.md#delayed-message-queue)**: Use Restate as a queue. Schedule tasks for now or later and ensure the task is only executed once. [](patterns-use-cases/src/main/kotlin/my/example/queue/TaskSubmitter.kt) + #### Orchestration patterns - **[Sagas](patterns-use-cases/README.md#sagas)**: Preserve consistency by tracking undo actions and running them when code fails halfway through. [](patterns-use-cases/src/main/kotlin/my/example/sagas/BookingWorkflow.kt) From 0926a037cad6ad6444910fdee976dd689231b3af Mon Sep 17 00:00:00 2001 From: Giselle van Dongen Date: Wed, 5 Mar 2025 09:16:53 +0100 Subject: [PATCH 7/9] Add prerequisites --- go/README.md | 3 +++ java/README.md | 3 +++ kotlin/README.md | 3 +++ python/README.md | 3 +++ rust/README.md | 3 +++ typescript/README.md | 4 ++++ 6 files changed, 19 insertions(+) diff --git a/go/README.md b/go/README.md index 848fe43c..cc0c69f4 100644 --- a/go/README.md +++ b/go/README.md @@ -1,5 +1,8 @@ # Go Example Catalog +## Prerequisites +- Go >= 1.21.0 + ## Basics - **[Services - Durable Execution](basics)**: Making code resilient to failures via automatic retries and recovery of previously finished actions. [](basics/part0/durableexecution.go) diff --git a/java/README.md b/java/README.md index 193d13c9..849c26b8 100644 --- a/java/README.md +++ b/java/README.md @@ -1,5 +1,8 @@ # Java Example Catalog +## Prerequisites +- JDK >= 17 + ## Basics - **[Services - Durable Execution](basics)**: Making code resilient to failures via automatic retries and recovery of previously finished actions. [](basics/src/main/java/durable_execution/SubscriptionService.java) diff --git a/kotlin/README.md b/kotlin/README.md index 094f414d..92121a31 100644 --- a/kotlin/README.md +++ b/kotlin/README.md @@ -1,5 +1,8 @@ # Kotlin Example Catalog +## Prerequisites +- JDK >= 17 + ## Basics - **[Services - Durable Execution](basics)**: Making code resilient to failures via automatic retries and recovery of previously finished actions. [](basics/src/main/kotlin/durable_execution/SubscriptionService.kt) diff --git a/python/README.md b/python/README.md index b97f7642..903c4841 100644 --- a/python/README.md +++ b/python/README.md @@ -1,5 +1,8 @@ # Python Example Catalog +## Prerequisites +- Python >= v3.11 + ## Basics Learn the key concepts of Restate: diff --git a/rust/README.md b/rust/README.md index 19caa048..84995d28 100644 --- a/rust/README.md +++ b/rust/README.md @@ -1,5 +1,8 @@ # Rust Example Catalog +## Prerequisites +- [Rust](https://rustup.rs/) + ## Basics - **[Services - Durable Execution](basics)**: Making code resilient to failures via automatic retries and recovery of previously finished actions. [](basics/src/p0_durable_execution.rs) diff --git a/typescript/README.md b/typescript/README.md index 6daf2282..977b13c9 100644 --- a/typescript/README.md +++ b/typescript/README.md @@ -1,5 +1,9 @@ # TypeScript Example Catalog +## Prerequisites +- NodeJS >= v18.17.1 +- npm CLI >= 9.6.7 + ## Basics Learn the key concepts of Restate: From c6375412ef3738116d8d984f0f28ab1a3218d9d4 Mon Sep 17 00:00:00 2001 From: Giselle van Dongen Date: Wed, 5 Mar 2025 11:30:09 +0100 Subject: [PATCH 8/9] Remove unused state key from java basics workflow --- java/basics/src/main/java/workflows/SignupWorkflow.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/java/basics/src/main/java/workflows/SignupWorkflow.java b/java/basics/src/main/java/workflows/SignupWorkflow.java index c7239e79..ba0442e5 100644 --- a/java/basics/src/main/java/workflows/SignupWorkflow.java +++ b/java/basics/src/main/java/workflows/SignupWorkflow.java @@ -39,8 +39,6 @@ public class SignupWorkflow { // References to K/V state and promises stored in Restate private static final DurablePromiseKey EMAIL_CLICKED = DurablePromiseKey.of("email_clicked", JsonSerdes.STRING); - private static final StateKey ONBOARDING_STATUS = - StateKey.of("status", JsonSerdes.STRING); // --- The workflow logic --- @Workflow From 6f1b382e54bd3c43f97ddba89ca81bbea474498a Mon Sep 17 00:00:00 2001 From: Giselle van Dongen Date: Thu, 6 Mar 2025 10:01:59 +0100 Subject: [PATCH 9/9] Small fixes for readability --- go/basics/part3/workflows.go | 4 ++-- java/basics/src/main/java/workflows/SignupWorkflow.java | 9 ++++----- .../basics/src/main/kotlin/workflows/SignupWorkflow.kt | 7 +++---- python/basics/app/3_workflows.py | 4 ++-- typescript/basics/src/3_workflows.ts | 4 ++-- 5 files changed, 13 insertions(+), 15 deletions(-) diff --git a/go/basics/part3/workflows.go b/go/basics/part3/workflows.go index 37dc3023..86eefbec 100644 --- a/go/basics/part3/workflows.go +++ b/go/basics/part3/workflows.go @@ -49,7 +49,7 @@ func (SignupWorkflow) Run(ctx restate.WorkflowContext, user User) (bool, error) // Wait until user clicked email verification link // Promise gets resolved or rejected by the other handlers - clickSecret, err := restate.Promise[string](ctx, "email-link").Result() + clickSecret, err := restate.Promise[string](ctx, "link-clicked").Result() if err != nil { return false, err } @@ -61,7 +61,7 @@ func (SignupWorkflow) Run(ctx restate.WorkflowContext, user User) (bool, error) func (SignupWorkflow) Click(ctx restate.WorkflowSharedContext, secret string) error { // Send data to the workflow via a durable promise - return restate.Promise[string](ctx, "email-link").Resolve(secret) + return restate.Promise[string](ctx, "link-clicked").Resolve(secret) } func main() { diff --git a/java/basics/src/main/java/workflows/SignupWorkflow.java b/java/basics/src/main/java/workflows/SignupWorkflow.java index ba0442e5..24041ff6 100644 --- a/java/basics/src/main/java/workflows/SignupWorkflow.java +++ b/java/basics/src/main/java/workflows/SignupWorkflow.java @@ -16,7 +16,6 @@ import dev.restate.sdk.annotation.Shared; import dev.restate.sdk.annotation.Workflow; import dev.restate.sdk.common.DurablePromiseKey; -import dev.restate.sdk.common.StateKey; import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder; import utils.User; @@ -37,8 +36,8 @@ public class SignupWorkflow { // References to K/V state and promises stored in Restate - private static final DurablePromiseKey EMAIL_CLICKED = - DurablePromiseKey.of("email_clicked", JsonSerdes.STRING); + private static final DurablePromiseKey LINK_CLICKED = + DurablePromiseKey.of("link_clicked", JsonSerdes.STRING); // --- The workflow logic --- @Workflow @@ -56,7 +55,7 @@ public boolean run(WorkflowContext ctx, User user) { // Wait until user clicked email verification link // Promise gets resolved or rejected by the other handlers String clickSecret = - ctx.promise(EMAIL_CLICKED) + ctx.promise(LINK_CLICKED) .awaitable() .await(); @@ -68,7 +67,7 @@ public boolean run(WorkflowContext ctx, User user) { @Shared public void click(SharedWorkflowContext ctx, String secret) { // Send data to the workflow via a durable promise - ctx.promiseHandle(EMAIL_CLICKED).resolve(secret); + ctx.promiseHandle(LINK_CLICKED).resolve(secret); } public static void main(String[] args) { diff --git a/kotlin/basics/src/main/kotlin/workflows/SignupWorkflow.kt b/kotlin/basics/src/main/kotlin/workflows/SignupWorkflow.kt index e64a40f5..ac90e304 100644 --- a/kotlin/basics/src/main/kotlin/workflows/SignupWorkflow.kt +++ b/kotlin/basics/src/main/kotlin/workflows/SignupWorkflow.kt @@ -21,8 +21,7 @@ class SignupWorkflow { companion object { // References to K/V state and promises stored in Restate - private val EMAIL_CLICKED = KtDurablePromiseKey.json("email_clicked") - private val ONBOARDING_STATUS = KtStateKey.json("status") + private val LINK_CLICKED = KtDurablePromiseKey.json("email_clicked") } @Workflow @@ -40,7 +39,7 @@ class SignupWorkflow { // Wait until user clicked email verification link // Promise gets resolved or rejected by the other handlers val clickSecret: String = - ctx.promise(EMAIL_CLICKED) + ctx.promise(LINK_CLICKED) .awaitable() .await() @@ -51,7 +50,7 @@ class SignupWorkflow { @Shared suspend fun click(ctx: SharedWorkflowContext, secret: String) { // Send data to the workflow via a durable promise - ctx.promiseHandle(EMAIL_CLICKED).resolve(secret) + ctx.promiseHandle(LINK_CLICKED).resolve(secret) } } diff --git a/python/basics/app/3_workflows.py b/python/basics/app/3_workflows.py index eb98088c..6d18311f 100644 --- a/python/basics/app/3_workflows.py +++ b/python/basics/app/3_workflows.py @@ -40,7 +40,7 @@ async def create_user(): # Wait until user clicked email verification link # Promise gets resolved or rejected by the other handlers - click_secret = await ctx.promise("email_link").value() + click_secret = await ctx.promise("link_clicked").value() return click_secret == secret @@ -48,7 +48,7 @@ async def create_user(): @user_signup.handler() async def click(ctx: WorkflowSharedContext, secret: str): # Send data to the workflow via a durable promise - await ctx.promise("email_link").resolve(secret) + await ctx.promise("link_clicked").resolve(secret) app = restate.app(services=[user_signup]) diff --git a/typescript/basics/src/3_workflows.ts b/typescript/basics/src/3_workflows.ts index 4c43f80a..7399b1a0 100644 --- a/typescript/basics/src/3_workflows.ts +++ b/typescript/basics/src/3_workflows.ts @@ -32,7 +32,7 @@ const signupWorkflow = restate.workflow({ // Wait until user clicked email verification link // Promise gets resolved or rejected by the other handlers - const clickSecret = await ctx.promise("email-link"); + const clickSecret = await ctx.promise("link-clicked"); return clickSecret === secret; }, @@ -42,7 +42,7 @@ const signupWorkflow = restate.workflow({ request: { secret: string }, ) => { // Send data to the workflow via a durable promise - await ctx.promise("email-link").resolve(request.secret); + await ctx.promise("link-clicked").resolve(request.secret); }, }, });