diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt index 039354c9..1ffc0cdf 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt @@ -275,11 +275,7 @@ class BackwardCompatibilityTest { @InjectLocalEndpointURI localEndpointURI: URI ) { // Create Admin API client with the provided admin URI - val adminApi = - DeploymentApi( - ApiClient() - .setHost(adminURI.host) - .setPort(adminURI.port)) + val adminApi = DeploymentApi(ApiClient().setHost(adminURI.host).setPort(adminURI.port)) // List all deployments val deployments = adminApi.listDeployments() diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt index 49466714..348c3a51 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt @@ -263,10 +263,7 @@ class ForwardCompatibilityTest { @InjectLocalEndpointURI localEndpointURI: URI ) { // Create Admin API client with the provided admin URI - val adminClient = - ApiClient() - .setHost(adminURI.host) - .setPort(adminURI.port) + val adminClient = ApiClient().setHost(adminURI.host).setPort(adminURI.port) val adminApi = DeploymentApi(adminClient) // List all deployments diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/InvokerMemoryTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/InvokerMemoryTest.kt index 8d45b34a..c481fc29 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/InvokerMemoryTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/InvokerMemoryTest.kt @@ -8,6 +8,7 @@ // https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE package dev.restate.sdktesting.tests +import dev.restate.admin.api.InvocationApi import dev.restate.admin.api.ServiceApi import dev.restate.admin.client.ApiClient import dev.restate.admin.model.ModifyServiceStateRequest @@ -39,10 +40,12 @@ import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.runBlocking import kotlinx.serialization.json.Json import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias +import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test @@ -160,6 +163,37 @@ class InvokerMemoryTest { } } + // Tests in this class share a single deployment (companion-scoped @RegisterExtension), so an + // invocation left over from a failed/timed-out test would otherwise hold memory leases across + // the global pool and cascade into the next test. Kill everything still in flight and wait for + // the invoker memory pool to drain before yielding control to the next @Test. + @AfterEach + fun cleanupInvocations( + @InjectAdminURI adminURI: URI, + @InjectContainerPort(hostName = RESTATE_RUNTIME, port = RUNTIME_NODE_PORT) metricsPort: Int, + ) = runBlocking { + val nonTerminalFilter = "status != 'completed'" + val invocationApi = InvocationApi(ApiClient().setHost(adminURI.host).setPort(adminURI.port)) + + // Each poll iteration re-lists pending invocations and re-issues kill for everything + // still alive, then asserts the list is empty. Awaitility's ignoreExceptions() (set in + // the untilAsserted wrapper in utils.kt) absorbs transient failures of either the list + // query or individual kill calls (e.g. 503s during leadership changes, or 404s for + // invocations that finished between query and kill), so the loop converges on its own. + await withAlias + "all invocations killed and terminated" untilAsserted + { + val pending = getAllInvocations(adminURI, nonTerminalFilter) + pending.forEach { invocationApi.killInvocation(it.id) } + assertThat(pending).isEmpty() + } + await withAlias + "invoker memory pool drains" untilAsserted + { + assertThat(getInvokerMemoryPoolUsage(metricsPort)).isEqualTo(0.0) + } + } + @Test @Timeout(120) @DisplayName("All invocations complete under memory pressure with invoker yield")