Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix java context handle #1389

Merged
merged 1 commit into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 43 additions & 18 deletions src/clients/java/src/main/java/com/tigerbeetle/NativeClient.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package com.tigerbeetle;

import static com.tigerbeetle.AssertionError.assertTrue;
import java.util.concurrent.atomic.AtomicLong;

final class NativeClient {
static {
JNILoader.loadFromJar();
}

private volatile long contextHandle;
// Keeping the contextHandle and a reference counter guarded by
// atomics in order to prevent the client from using a disposed
// context during `close()`.
private final AtomicLong contextHandle;
private final AtomicLong contextHandleReferences;

public static NativeClient init(final byte[] clusterID, final String addresses,
final int concurrencyMax) {
Expand All @@ -31,31 +36,51 @@ private static void assertArgs(final byte[] clusterID, final String addresses,
}

private NativeClient(final long contextHandle) {
this.contextHandle = contextHandle;
this.contextHandle = new AtomicLong(contextHandle);
this.contextHandleReferences = new AtomicLong(0L);
}

public void submit(final Request<?> request) throws ConcurrencyExceededException {
if (contextHandle == 0L)
throw new IllegalStateException("Client is closed");

final var packet_acquire_status = submit(contextHandle, request);
if (packet_acquire_status == PacketAcquireStatus.ConcurrencyMaxExceeded.value) {
throw new ConcurrencyExceededException();
} else if (packet_acquire_status == PacketAcquireStatus.Shutdown.value) {
throw new IllegalStateException("Client is closing");
} else {
assertTrue(packet_acquire_status == PacketAcquireStatus.Ok.value,
"PacketAcquireStatus=%d is not implemented", packet_acquire_status);
try {
contextHandleReferences.incrementAndGet();
final var handle = contextHandle.getAcquire();

if (handle == 0L)
throw new IllegalStateException("Client is closed");

final var packet_acquire_status = submit(handle, request);
if (packet_acquire_status == PacketAcquireStatus.ConcurrencyMaxExceeded.value) {
throw new ConcurrencyExceededException();
} else if (packet_acquire_status == PacketAcquireStatus.Shutdown.value) {
throw new IllegalStateException("Client is closing");
} else {
assertTrue(packet_acquire_status == PacketAcquireStatus.Ok.value,
"PacketAcquireStatus=%d is not implemented", packet_acquire_status);
}
} finally {
contextHandleReferences.decrementAndGet();
}
}

public void close() {
if (contextHandle != 0L) {
if (contextHandle.getAcquire() != 0L) {
synchronized (this) {
if (contextHandle != 0L) {
// Deinit and signalize that this client is closed by setting the handles to 0.
clientDeinit(contextHandle);
this.contextHandle = 0L;
final var handle = contextHandle.getAcquire();
if (handle != 0L) {
// Signalize that this client is closed by setting the handler to 0,
// and spin wait until all references that might be using the old handle could
// be released.
contextHandle.setRelease(0L);
while (contextHandleReferences.getAcquire() > 0L) {
// Thread::onSpinWait method to give JVM a hint that the following code is
// in a spin loop. This has no side-effect and only provides a hint to
// optimize spin loops in a processor specific manner.
Thread.onSpinWait();
}

// This function waits until all submited requests are completed, and no more
// packets can be acquired after that.
clientDeinit(handle);
Comment on lines +66 to +83
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, overall, I can't say I am too happy about this:

  • abstractly, nesting synchronized and a spinloop (well, and spinloop by itself https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html :D) doesn't feel right. Ideally, I'd love to see something more principled here
  • concretely, I think a flaw in this implementation is that, with many concurrent calls to close, only one would actually wait for the stuff to get close. That is, it can be the case that a .close() call returns, but there's still some outstanding work.

However, if I got this on the sync correctly, we are going to nix this code anyway soon, so I guess that's fine! Certainly better than an outright race we have at the moment!

Copy link
Contributor Author

@batiati batiati Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my first thought was to encapsulate the context on a thread-safe class controlling all aspects more elegantly, but as you said, we are going to nuke this entire logic very soon.

About the close call, I don't think it can have side effects; the semantics are the same: only the first call to close will wait, no matter if concurrent or sequential calls. If we want to solve that, we can just remove the first verification before the lock.

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ public void testConcurrencyExceeded() throws Throwable {
* threads trying to submit a request after the client was closed will fail with
* IllegalStateException.
*/
// @Test
@Test
public void testCloseWithConcurrentTasks() throws Throwable {

try (var server = new Server()) {
Expand Down
Loading