Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative scalable MemoryScope #142

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.

Always

Just for now

@@ -187,7 +187,7 @@ public MemorySegment withOwnerThread(Thread newOwner) {
try {
return dup(0L, length, mask, newOwner, scope.dup());
} finally {
//flush read/writes to memory before returning the new segment
//flush read/writes to segment memory before returning the new segment
VarHandle.fullFence();
}
}

This comment has been minimized.

@plevart

plevart May 5, 2020
Author Contributor

Maybe you're asking why did I remove this fullFence() call? I don't think it is needed. I checked all implementations of MemorySegment and they all contain just final fields. It is therefore safe to publish such instances via data races and no explicit fences are needed.

This comment has been minimized.

@mcimadamore

mcimadamore May 5, 2020
Collaborator

I think we had a discussion on this; we knew publishing segments was safe (all fields final), but there were some perplexities that the fences added at the end of the constructor were enough to warrant flushing of writes to the off-heap memory backing the segment (on all platforms). This effect is not covered (afaik) by the JMM.

This comment has been minimized.

@plevart

plevart May 5, 2020
Author Contributor

Ah, I see. Will revert that back then. But why is AbstractMemorySegmentImpl.withOwnerThread() doing this differently than AbstractMemorySegmentImpl.acquire() ? acquire() too is handing access to a different thread and in acquire() there is no explicit fence.

This comment has been minimized.

@mcimadamore

mcimadamore May 6, 2020
Collaborator

But why is AbstractMemorySegmentImpl.withOwnerThread() doing this differently than AbstractMemorySegmentImpl.acquire() ? acquire() too is handing access to a different thread and in acquire() there is no explicit fence.

This is a valid point. I guess from the past, acquire() meant "racy" - whereas for handoff patterns (e.g. withOwnerAccess) we wanted to be extra careful that only one thread at a time has access.

Note that, currently, while all acquired segments slices can be operated concurrently in a race-free fashion, unfortunately there is still the main segment that can be written/read concurrently by the original thread. So adding fences in this situation just kicks the can down the road.

If we had a way so that the original segment became inaccessible while splitted with a spliterator, then I'd agree with you that a fence would be ideal (since this is another case of handoff).

But it is very hard to invalidate the original segment; few options I've considered:

  • make the 'valid state' check more complex - e.g. in your implementation we could check whether the two counters yield same value before allowing access to the original thread - but I think this would be slow, and I think it could still allow races (this is a case of check & act - where we have no guarantees that, by the time memory is read/written, the condition we checked still holds true)

  • as we do in other places of the API - just kill the original segment after you split (e.g. make it not alive). This will make it inaccessible, which is consistent with what we want. But now there's a problem: how do you get back the original segment after you're done with the spliterator? Joining slices is messy. And add synchronization to allow for spliterators to be 'closed' atomically, will just add more contention.

So, it seems like the cost for fixing this particular issue is higher than the actual problem; there's an access mode called ACQUIRE - if a segment owner is not OK with having races with different threads - that mode can be disabled.

This comment has been minimized.

@plevart

plevart May 6, 2020
Author Contributor

Yeah, I was just thinking of the situation where a segment is passed to parallel stream and the threads of parallel stream operate on slices (acquired children) in read-only way, since this is the source of information and should not be mutated. Now if the segment was created and written to in one thread before handed to parallel stream then workers from the FJPool may not see the content of their slices (acquired children) fully initialized since normal synchronization like dispatch of control to other thread presumably doesn't have the same effect on the off-heap memory (on all platforms)?

This comment has been minimized.

@mcimadamore

mcimadamore May 6, 2020
Collaborator

Yeah, I was just thinking of the situation where a segment is passed to parallel stream and the threads of parallel stream operate on slices (acquired children) in read-only way, since this is the source of information and should not be mutated. Now if the segment was created and written to in one thread before handed to parallel stream then workers from the FJPool may not see the content of their slices (acquired children) fully initialized since normal synchronization like dispatch of control to other thread presumably doesn't have the same effect on the off-heap memory (on all platforms)?

I'm fine with adding the fence in principle - if it adds too much overhead (which I didn't try), I think we should leave it out though, since the benefits are limited.

@@ -203,7 +203,7 @@ public final void close() {
}

private final void closeNoCheck() {
scope.close(true);
scope.close();
}

final AbstractMemorySegmentImpl acquire() {
@@ -421,7 +421,7 @@ public static AbstractMemorySegmentImpl ofBuffer(ByteBuffer bb) {
modes = bufferSegment.mask;
owner = bufferSegment.owner;
} else {
bufferScope = new MemoryScope(bb, null);
bufferScope = MemoryScope.create(bb, null);
modes = defaultAccessModes(size);
owner = Thread.currentThread();
}
@@ -121,7 +121,7 @@ public static MemorySegment makeArraySegment(double[] arr) {

static <Z> HeapMemorySegmentImpl<Z> makeHeapSegment(Supplier<Z> obj, int length, int base, int scale) {
int byteSize = length * scale;
MemoryScope scope = new MemoryScope(null, null);
MemoryScope scope = MemoryScope.create(null, null);
return new HeapMemorySegmentImpl<>(base, obj, byteSize, defaultAccessModes(byteSize), Thread.currentThread(), scope);
}
}
@@ -103,7 +103,7 @@ public static MappedMemorySegment makeMappedSegment(Path path, long bytesSize, F
if (bytesSize <= 0) throw new IllegalArgumentException("Requested bytes size must be > 0.");
try (FileChannelImpl channelImpl = (FileChannelImpl)FileChannel.open(path, openOptions(mapMode))) {
UnmapperProxy unmapperProxy = channelImpl.mapInternal(mapMode, 0L, bytesSize);
MemoryScope scope = new MemoryScope(null, unmapperProxy::unmap);
MemoryScope scope = MemoryScope.create(null, unmapperProxy::unmap);
return new MappedMemorySegmentImpl(unmapperProxy.address(), unmapperProxy, bytesSize,
defaultAccessModes(bytesSize), Thread.currentThread(), scope);
}
@@ -28,99 +28,205 @@

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.concurrent.atomic.LongAdder;

/**
* This class manages the temporal bounds associated with a memory segment. A scope has a liveness bit, which is updated
* when the scope is closed (this operation is triggered by {@link AbstractMemorySegmentImpl#close()}). Furthermore, a scope is
* associated with an <em>atomic</em> counter which can be incremented (upon calling the {@link #acquire()} method),
* and is decremented (when a previously acquired segment is later closed).
* when the scope is closed (this operation is triggered by {@link AbstractMemorySegmentImpl#close()}). Furthermore,
* a scope is either root scope ({@link #create(Object, Runnable) created} when memory segment is allocated) or child scope
* ({@link #acquire() acquired} from root scope). When a child scope is acquired from another child scope, it is actually
* acquired from the root scope. There is only a single level of children. All child scopes are peers.
* A child scope can be {@link #close() closed} at any time, but root scope can only be closed after all its children
* have been closed, at which time any associated cleanup action is executed (the associated memory segment is freed).
*/
public final class MemoryScope {
abstract class MemoryScope {
public static final MemoryScope GLOBAL = new Root(null, null);

//reference to keep hold onto
final Object ref;
/**
* Creates a root MemoryScope with given ref and cleanupAction.

This comment has been minimized.

@mcimadamore

mcimadamore May 6, 2020
Collaborator

While this is true, note that it is possible to unsafely create unconfined segments, in which case the ownership restriction would not apply. Perhaps the doc should be tweaked a bit to reflect this.

* The returned instance may be published unsafely to and used in any thread, but methods that explicitly state that
* they may only be called in "owner" thread, must strictly be called in single thread that has been selected to be the
* "owner" thread.
*
* @param ref an optional reference to an instance that needs to be kept reachable
* @param cleanupAction an optional cleanup action to be executed when returned scope is closed
* @return a root MemoryScope
*/
static MemoryScope create(Object ref, Runnable cleanupAction) {
return new Root(ref, cleanupAction);
}

int activeCount = UNACQUIRED;
private static final int STATE_OPEN = 0;
private static final int STATE_CLOSING = 1;
private static final int STATE_CLOSED = 2;

final static VarHandle COUNT_HANDLE;
int state; // = STATE_OPEN
private static final VarHandle STATE;

static {
try {
COUNT_HANDLE = MethodHandles.lookup().findVarHandle(MemoryScope.class, "activeCount", int.class);
STATE = MethodHandles.lookup().findVarHandle(MemoryScope.class, "state", int.class);
} catch (Throwable ex) {
throw new ExceptionInInitializerError(ex);
}
}

final static int UNACQUIRED = 0;
final static int CLOSED = -1;
final static int MAX_ACQUIRE = Integer.MAX_VALUE;
private MemoryScope() {
}

final Runnable cleanupAction;
/**
* Acquires a child scope (or peer scope if this is a child).
* This method may be called in any thread.
* The returned instance may be published unsafely to and used in any thread, but methods that explicitly state that
* they may only be called in "owner" thread, must strictly be called in single thread that has been selected to be the
* "owner" thread.
*
* @return a child (or peer) scope
* @throws IllegalStateException if root scope is already closed
*/
abstract MemoryScope acquire();

final static MemoryScope GLOBAL = new MemoryScope(null, null);
/**
* Closes this scope, executing any cleanup action if this is the root scope.
* This method may only be called in "owner" thread.

This comment has been minimized.

@mcimadamore

mcimadamore May 6, 2020
Collaborator

Again - for unsafe segments this is not true. Not that we need to provide guarantees in that case - but I think it's better to make things clear.

*
* @throws IllegalStateException if this scope is already closed or if this is
* a root scope and there is/are still active child
* scope(s).
*/
abstract void close();

public MemoryScope(Object ref, Runnable cleanupAction) {
this.ref = ref;
this.cleanupAction = cleanupAction;
}
/**
* Duplicates this scope and {@link #close() closes} it. If this is a root scope,
* new root scope is returned. If this is a child scope, new child scope is returned.
* This method may only be called in "owner" thread.
* The returned instance may be published unsafely to and used in any thread, but methods that explicitly state that
* they may only be called in "owner" thread, must strictly be called in single thread that has been selected to be the
* "owner" thread.
*
* @return a duplicate of this scope
* @throws IllegalStateException if this scope is already closed or if this is
* a root scope and there is/are still active child
* scope(s).
*/
abstract MemoryScope dup();

/**
* This method performs a full, thread-safe liveness check; can be used outside confinement thread.
* This method may be called in any thread.
*
* @return {@code true} if this scope is not closed yet.
*/
final boolean isAliveThreadSafe() {
return ((int)COUNT_HANDLE.getVolatile(this)) != CLOSED;
return ((int) STATE.getVolatile(this)) < STATE_CLOSED;
}

/**
* This method performs a quick liveness check; must be called from the confinement thread.
* Checks that this scope is still alive.
* This method may only be called in "owner" thread.
*
* @throws IllegalStateException if this scope is already closed
*/
final void checkAliveConfined() {
if (activeCount == CLOSED) {
throw new IllegalStateException("Segment is not alive");
if (state == STATE_CLOSED) {
throw new IllegalStateException("This scope is already closed");
}
}

MemoryScope acquire() {
int value;
do {
value = (int)COUNT_HANDLE.getVolatile(this);
if (value == CLOSED) {
//segment is not alive!
throw new IllegalStateException("Segment is not alive");
} else if (value == MAX_ACQUIRE) {
//overflow
throw new IllegalStateException("Segment acquire limit exceeded");
}
} while (!COUNT_HANDLE.compareAndSet(this, value, value + 1));
return new MemoryScope(ref, this::release);
}
private static final class Root extends MemoryScope {
private final LongAdder acquires = new LongAdder();
private final LongAdder releases = new LongAdder();
private final Object ref;
private final Runnable cleanupAction;

private void release() {
int value;
do {
value = (int)COUNT_HANDLE.getVolatile(this);
if (value <= UNACQUIRED) {
//cannot get here - we can't close segment twice
throw new IllegalStateException();
private Root(Object ref, Runnable cleanupAction) {
this.ref = ref;
this.cleanupAction = cleanupAction;
}

@Override
MemoryScope acquire() {
// increment acquires 1st
acquires.increment();
// check state 2nd
int state;
while ((state = (int) STATE.getVolatile(this)) > STATE_OPEN) {
if (state == STATE_CLOSED) {
releases.increment();
throw new IllegalStateException("This scope is already closed");
}
Thread.onSpinWait();
}
} while (!COUNT_HANDLE.compareAndSet(this, value, value - 1));
}
return new Child();
}

void close(boolean doCleanup) {
if (!COUNT_HANDLE.compareAndSet(this, UNACQUIRED, CLOSED)) {
//first check if already closed...
checkAliveConfined();
//...if not, then we have acquired views that are still active
throw new IllegalStateException("Cannot close a segment that has active acquired views");
@Override
MemoryScope dup() { // always called in owner thread
return closeOrDup(false);
}
if (doCleanup && cleanupAction != null) {
cleanupAction.run();

@Override
void close() { // always called in owner thread
closeOrDup(true);
}
}

MemoryScope dup() {
close(false);
return new MemoryScope(ref, cleanupAction);
private MemoryScope closeOrDup(boolean close) {
if (state == STATE_CLOSED) {
throw new IllegalStateException("This scope is already closed");
}
// pre-allocate duped scope so we don't get OOME later and be left with this scope closed
var duped = close ? null : new Root(ref, cleanupAction);
// modify state to STATE_CLOSING 1st
STATE.setVolatile(this, STATE_CLOSING);
// check for absence of active acquired children 2nd
// IMPORTANT: 1st sum releases, then sum acquires !!!
if (releases.sum() != acquires.sum()) {
STATE.setVolatile(this, STATE_OPEN); // revert back to STATE_OPEN
throw new IllegalStateException("Cannot close this scope as it has active acquired children");
}
// now that we made sure there's no active acquired children, we modify to STATE_CLOSED
STATE.setVolatile(this, STATE_CLOSED);
// do close or dup
if (close) {
if (cleanupAction != null) {
cleanupAction.run();
}
return null;
} else {
return duped;
}
}

private final class Child extends MemoryScope {

private Child() {
}

@Override
MemoryScope acquire() {
return Root.this.acquire();
}

@Override
MemoryScope dup() { // always called in owner thread
if (state == STATE_CLOSED) {
throw new IllegalStateException("This scope is already closed");
}
// pre-allocate duped scope so we don't get OOME later and be left with this scope closed
var duped = new Child();
STATE.setVolatile(this, STATE_CLOSED);
return duped;
}

@Override
void close() { // always called in owner thread
if (state == STATE_CLOSED) {
throw new IllegalStateException("This scope is already closed");
}
state = STATE_CLOSED;
// following acts as a volatile write after plain write above so
// plain write gets flushed too (which is important for isAliveThreadSafe())
Root.this.releases.increment();
}
}
}
}
}
@@ -93,7 +93,7 @@ public static MemorySegment makeNativeSegment(long bytesSize, long alignmentByte
unsafe.setMemory(buf, alignedSize, (byte)0);
}
long alignedBuf = Utils.alignUp(buf, alignmentBytes);
MemoryScope scope = new MemoryScope(null, () -> unsafe.freeMemory(buf));
MemoryScope scope = MemoryScope.create(null, () -> unsafe.freeMemory(buf));
MemorySegment segment = new NativeMemorySegmentImpl(buf, alignedSize, defaultAccessModes(alignedSize),
Thread.currentThread(), scope);
if (alignedSize != bytesSize) {
@@ -104,7 +104,7 @@ public static MemorySegment makeNativeSegment(long bytesSize, long alignmentByte
}

public static MemorySegment makeNativeSegmentUnchecked(MemoryAddress min, long bytesSize, Thread owner, Runnable cleanup, Object attachment) {
MemoryScope scope = new MemoryScope(attachment, cleanup);
MemoryScope scope = MemoryScope.create(attachment, cleanup);
return new NativeMemorySegmentImpl(min.toRawLongValue(), bytesSize, defaultAccessModes(bytesSize), owner, scope);
}
}
ProTip! Use n and p to navigate between commits in a pull request.