Skip to content

Commit

Permalink
Reduce lock contention in OutputBufferMemoryManager
Browse files Browse the repository at this point in the history
Addresses lock contention bottlenecks in OutputBufferMemoryManager
by:
- Reducing the scope the critical section in updateMemoryUsage
- Removing synchronization from isBufferFull() and isOverutilized(),
  these methods are already atomic reads
- Avoiding redundant notifications when the blockedOnMemory future does
  not change or is already completed (aka: not blocked)
- Avoiding spurious notifications from setNoBlockOnFull() when still
  blocked on memory
- Adds an unsynchronized fast path in onMemoryAvailable() to skip the
  lock acquisition entirely when the buffer is full
  • Loading branch information
pettyjamesm authored and rschlussel committed Feb 3, 2021
1 parent dff92fe commit 641347b
Showing 1 changed file with 91 additions and 48 deletions.
Expand Up @@ -16,20 +16,20 @@
import com.facebook.presto.memory.context.LocalMemoryContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -40,16 +40,19 @@
@ThreadSafe
public class OutputBufferMemoryManager
{
private static final ListenableFuture<?> NOT_BLOCKED = immediateFuture(null);

private final long maxBufferedBytes;
private final AtomicLong bufferedBytes = new AtomicLong();
private final AtomicLong peakMemoryUsage = new AtomicLong();

@GuardedBy("this")
private boolean closed;
@Nullable
@GuardedBy("this")
private SettableFuture<?> bufferBlockedFuture;
private SettableFuture<?> bufferBlockedFuture; // null indicates "no listener registered"
@GuardedBy("this")
private ListenableFuture<?> blockedOnMemory = Futures.immediateFuture(null);
private ListenableFuture<?> blockedOnMemory = NOT_BLOCKED;

private final AtomicBoolean blockOnFull = new AtomicBoolean(true);

Expand All @@ -63,53 +66,79 @@ public OutputBufferMemoryManager(long maxBufferedBytes, Supplier<LocalMemoryCont
this.maxBufferedBytes = maxBufferedBytes;
this.systemMemoryContextSupplier = Suppliers.memoize(systemMemoryContextSupplier::get);
this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null");

bufferBlockedFuture = SettableFuture.create();
bufferBlockedFuture.set(null);
}

public synchronized void updateMemoryUsage(long bytesAdded)
public void updateMemoryUsage(long bytesAdded)
{
Optional<LocalMemoryContext> systemMemoryContext = getSystemMemoryContext();

// If closed is true, that means the task is completed. In that state,
// the output buffers already ignore the newly added pages, and therefore
// we can also safely ignore any calls after OutputBufferMemoryManager is closed.
// If the systemMemoryContext doesn't exist, the task is probably already
// aborted, so we can just return (see the comment in getSystemMemoryContext()).
if (closed || !systemMemoryContext.isPresent()) {
LocalMemoryContext systemMemoryContext = getSystemMemoryContextOrNull();
if (systemMemoryContext == null) {
// If the systemMemoryContext doesn't exist, the task is probably already
// aborted, so we can just return (see the comment in getSystemMemoryContextOrNull()).
return;
}

long currentBufferedBytes = bufferedBytes.addAndGet(bytesAdded);
ListenableFuture<?> waitForMemory = null;
SettableFuture<?> notifyUnblocked = null;
long currentBufferedBytes;
synchronized (this) {
// If closed is true, that means the task is completed. In that state,
// the output buffers already ignore the newly added pages, and therefore
// we can also safely ignore any calls after OutputBufferMemoryManager is closed.
if (closed) {
return;
}

currentBufferedBytes = bufferedBytes.addAndGet(bytesAdded);
ListenableFuture<?> blockedOnMemory = systemMemoryContext.setBytes(currentBufferedBytes);
if (!blockedOnMemory.isDone()) {
if (this.blockedOnMemory != blockedOnMemory) {
this.blockedOnMemory = blockedOnMemory;
waitForMemory = blockedOnMemory; // only register a callback when blocked and the future is different
}
}
else {
this.blockedOnMemory = NOT_BLOCKED;
if (currentBufferedBytes <= maxBufferedBytes || !blockOnFull.get()) {
// Complete future in a new thread to avoid making a callback on the caller thread.
// This make is easier for callers to use this class since they can update the memory
// usage while holding locks.
notifyUnblocked = this.bufferBlockedFuture;
this.bufferBlockedFuture = null;
}
}
}
peakMemoryUsage.accumulateAndGet(currentBufferedBytes, Math::max);
this.blockedOnMemory = systemMemoryContext.get().setBytes(currentBufferedBytes);
if (!isBufferFull() && !isBlockedOnMemory() && !bufferBlockedFuture.isDone()) {
// Complete future in a new thread to avoid making a callback on the caller thread.
// This make is easier for callers to use this class since they can update the memory
// usage while holding locks.
SettableFuture<?> future = this.bufferBlockedFuture;
notificationExecutor.execute(() -> future.set(null));
return;
// Notify listeners outside of the critical section
notifyListener(notifyUnblocked);
if (waitForMemory != null) {
waitForMemory.addListener(this::onMemoryAvailable, notificationExecutor);
}
this.blockedOnMemory.addListener(this::onMemoryAvailable, notificationExecutor);
}

public synchronized ListenableFuture<?> getBufferBlockedFuture()
{
if ((isBufferFull() || isBlockedOnMemory()) && bufferBlockedFuture.isDone()) {
if (bufferBlockedFuture == null) {
if (blockedOnMemory.isDone() && !isBufferFull()) {
return NOT_BLOCKED;
}
bufferBlockedFuture = SettableFuture.create();
}
return bufferBlockedFuture;
}

public synchronized void setNoBlockOnFull()
public void setNoBlockOnFull()
{
blockOnFull.set(false);

SettableFuture<?> future = null;
synchronized (this) {
blockOnFull.set(false);

if (blockedOnMemory.isDone()) {
future = this.bufferBlockedFuture;
this.bufferBlockedFuture = null;
}
}
// Complete future in a new thread to avoid making a callback on the caller thread.
SettableFuture<?> future = this.bufferBlockedFuture;
notificationExecutor.execute(() -> future.set(null));
notifyListener(future);
}

public long getBufferedBytes()
Expand All @@ -122,32 +151,35 @@ public double getUtilization()
return bufferedBytes.get() / (double) maxBufferedBytes;
}

public synchronized boolean isOverutilized()
public boolean isOverutilized()
{
return isBufferFull();
}

private synchronized boolean isBufferFull()
private boolean isBufferFull()
{
return bufferedBytes.get() > maxBufferedBytes && blockOnFull.get();
}

private synchronized boolean isBlockedOnMemory()
{
return !blockedOnMemory.isDone();
}

@VisibleForTesting
synchronized void onMemoryAvailable()
void onMemoryAvailable()
{
// Do not notify the listeners if the buffer is full
if (bufferedBytes.get() > maxBufferedBytes) {
// Check if the buffer is full before synchronizing and skip notifying listeners
if (isBufferFull()) {
return;
}

SettableFuture<?> future;
synchronized (this) {
// re-check after synchronizing and ensure the current memory future is completed
if (isBufferFull() || !blockedOnMemory.isDone()) {
return;
}
future = this.bufferBlockedFuture;
this.bufferBlockedFuture = null;
}
// notify listeners if the buffer is not full
SettableFuture<?> future = this.bufferBlockedFuture;
notificationExecutor.execute(() -> future.set(null));
notifyListener(future);
}

public long getPeakMemoryUsage()
Expand All @@ -158,19 +190,30 @@ public long getPeakMemoryUsage()
public synchronized void close()
{
updateMemoryUsage(-bufferedBytes.get());
getSystemMemoryContext().ifPresent(LocalMemoryContext::close);
LocalMemoryContext memoryContext = getSystemMemoryContextOrNull();
if (memoryContext != null) {
memoryContext.close();
}
closed = true;
}

private Optional<LocalMemoryContext> getSystemMemoryContext()
private void notifyListener(@Nullable SettableFuture<?> future)
{
if (future != null) {
notificationExecutor.execute(() -> future.set(null));
}
}

@Nullable
private LocalMemoryContext getSystemMemoryContextOrNull()
{
try {
return Optional.of(systemMemoryContextSupplier.get());
return systemMemoryContextSupplier.get();
}
catch (RuntimeException ignored) {
// This is possible with races, e.g., a task is created and then immediately aborted,
// so that the task context hasn't been created yet (as a result there's no memory context available).
return null;
}
return Optional.empty();
}
}

0 comments on commit 641347b

Please sign in to comment.