Skip to content

Commit

Permalink
Merge pull request #1389 from tigerbeetle/batiati-java-close-fix
Browse files Browse the repository at this point in the history
Fix java context handle
  • Loading branch information
batiati committed Dec 21, 2023
2 parents 21ce500 + 59e1861 commit 614e7d0
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 19 deletions.
61 changes: 43 additions & 18 deletions src/clients/java/src/main/java/com/tigerbeetle/NativeClient.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package com.tigerbeetle;

import static com.tigerbeetle.AssertionError.assertTrue;
import java.util.concurrent.atomic.AtomicLong;

final class NativeClient {
static {
JNILoader.loadFromJar();
}

private volatile long contextHandle;
// Keeping the contextHandle and a reference counter guarded by
// atomics in order to prevent the client from using a disposed
// context during `close()`.
private final AtomicLong contextHandle;
private final AtomicLong contextHandleReferences;

public static NativeClient init(final byte[] clusterID, final String addresses,
final int concurrencyMax) {
Expand All @@ -31,31 +36,51 @@ private static void assertArgs(final byte[] clusterID, final String addresses,
}

private NativeClient(final long contextHandle) {
this.contextHandle = contextHandle;
this.contextHandle = new AtomicLong(contextHandle);
this.contextHandleReferences = new AtomicLong(0L);
}

public void submit(final Request<?> request) throws ConcurrencyExceededException {
if (contextHandle == 0L)
throw new IllegalStateException("Client is closed");

final var packet_acquire_status = submit(contextHandle, request);
if (packet_acquire_status == PacketAcquireStatus.ConcurrencyMaxExceeded.value) {
throw new ConcurrencyExceededException();
} else if (packet_acquire_status == PacketAcquireStatus.Shutdown.value) {
throw new IllegalStateException("Client is closing");
} else {
assertTrue(packet_acquire_status == PacketAcquireStatus.Ok.value,
"PacketAcquireStatus=%d is not implemented", packet_acquire_status);
try {
contextHandleReferences.incrementAndGet();
final var handle = contextHandle.getAcquire();

if (handle == 0L)
throw new IllegalStateException("Client is closed");

final var packet_acquire_status = submit(handle, request);
if (packet_acquire_status == PacketAcquireStatus.ConcurrencyMaxExceeded.value) {
throw new ConcurrencyExceededException();
} else if (packet_acquire_status == PacketAcquireStatus.Shutdown.value) {
throw new IllegalStateException("Client is closing");
} else {
assertTrue(packet_acquire_status == PacketAcquireStatus.Ok.value,
"PacketAcquireStatus=%d is not implemented", packet_acquire_status);
}
} finally {
contextHandleReferences.decrementAndGet();
}
}

public void close() {
if (contextHandle != 0L) {
if (contextHandle.getAcquire() != 0L) {
synchronized (this) {
if (contextHandle != 0L) {
// Deinit and signalize that this client is closed by setting the handles to 0.
clientDeinit(contextHandle);
this.contextHandle = 0L;
final var handle = contextHandle.getAcquire();
if (handle != 0L) {
// Signalize that this client is closed by setting the handler to 0,
// and spin wait until all references that might be using the old handle could
// be released.
contextHandle.setRelease(0L);
while (contextHandleReferences.getAcquire() > 0L) {
// Thread::onSpinWait method to give JVM a hint that the following code is
// in a spin loop. This has no side-effect and only provides a hint to
// optimize spin loops in a processor specific manner.
Thread.onSpinWait();
}

// This function waits until all submited requests are completed, and no more
// packets can be acquired after that.
clientDeinit(handle);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ public void testConcurrencyExceeded() throws Throwable {
* threads trying to submit a request after the client was closed will fail with
* IllegalStateException.
*/
// @Test
@Test
public void testCloseWithConcurrentTasks() throws Throwable {

try (var server = new Server()) {
Expand Down

0 comments on commit 614e7d0

Please sign in to comment.