Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unspill scheduling in join spill #9275

Merged
merged 1 commit into from Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -596,6 +596,7 @@ private void disposeUnspilledLookupSourceIfRequested()
localUserMemoryContext.setBytes(index.getEstimatedSize().toBytes());

close();
spilledLookupSourceHandle.setDisposeCompleted();
}

private LookupSourceSupplier buildLookupSource()
Expand Down
Expand Up @@ -43,7 +43,10 @@ default ListenableFuture<PartitionedConsumption<Supplier<LookupSource>>> finishP
i -> {
throw new UnsupportedOperationException();
},
i -> {}));
i -> {},
i -> {
throw new UnsupportedOperationException();
}));
}

/**
Expand Down
Expand Up @@ -46,28 +46,35 @@ public final class PartitionedConsumption<T>
@Nullable
private List<Partition<T>> partitions;

PartitionedConsumption(int consumersCount, Iterable<Integer> partitionNumbers, IntFunction<ListenableFuture<T>> loader, IntConsumer disposer)
PartitionedConsumption(
int consumersCount,
Iterable<Integer> partitionNumbers,
IntFunction<ListenableFuture<T>> loader,
IntConsumer disposer,
IntFunction<ListenableFuture<Void>> disposed)
{
this(consumersCount, immediateVoidFuture(), partitionNumbers, loader, disposer);
this(consumersCount, immediateVoidFuture(), partitionNumbers, loader, disposer, disposed);
}

private PartitionedConsumption(
int consumersCount,
ListenableFuture<Void> activator,
Iterable<Integer> partitionNumbers,
IntFunction<ListenableFuture<T>> loader,
IntConsumer disposer)
IntConsumer disposer,
IntFunction<ListenableFuture<Void>> disposed)
{
checkArgument(consumersCount > 0, "consumersCount must be positive");
this.consumersCount = consumersCount;
this.partitions = createPartitions(activator, partitionNumbers, loader, disposer);
this.partitions = createPartitions(activator, partitionNumbers, loader, disposer, disposed);
}

private List<Partition<T>> createPartitions(
ListenableFuture<Void> activator,
Iterable<Integer> partitionNumbers,
IntFunction<ListenableFuture<T>> loader,
IntConsumer disposer)
IntConsumer disposer,
IntFunction<ListenableFuture<Void>> disposed)
{
requireNonNull(partitionNumbers, "partitionNumbers is null");
requireNonNull(loader, "loader is null");
Expand All @@ -78,7 +85,7 @@ private List<Partition<T>> createPartitions(
for (Integer partitionNumber : partitionNumbers) {
Partition<T> partition = new Partition<>(consumersCount, partitionNumber, loader, partitionActivator, disposer);
partitions.add(partition);
partitionActivator = partition.released;
JunhyungSong marked this conversation as resolved.
Show resolved Hide resolved
partitionActivator = disposed.apply(partitionNumber);
}
return partitions.build();
}
Expand Down Expand Up @@ -111,7 +118,7 @@ public static class Partition<T>
private final int partitionNumber;
private final SettableFuture<Void> requested;
private final ListenableFuture<T> loaded;
private final SettableFuture<Void> released;
private final IntConsumer disposer;

@GuardedBy("this")
private int pendingReleases;
Expand All @@ -129,8 +136,7 @@ public Partition(
allAsList(requested, previousReleased),
ignored -> loader.apply(partitionNumber),
directExecutor());
this.released = SettableFuture.create();
released.addListener(() -> disposer.accept(partitionNumber), directExecutor());
this.disposer = disposer;
this.pendingReleases = consumersCount;
}

Expand All @@ -151,7 +157,7 @@ public synchronized void release()
pendingReleases--;
checkState(pendingReleases >= 0);
if (pendingReleases == 0) {
released.set(null);
disposer.accept(partitionNumber);
}
}
}
Expand Down
Expand Up @@ -317,7 +317,10 @@ public ListenableFuture<PartitionedConsumption<Supplier<LookupSource>>> finishPr
i -> {
throw new UnsupportedOperationException();
},
i -> {}));
i -> {},
i -> {
throw new UnsupportedOperationException();
}));
}

int operatorsCount = lookupJoinsCount
Expand All @@ -338,7 +341,8 @@ public ListenableFuture<PartitionedConsumption<Supplier<LookupSource>>> finishPr
partitionedConsumptionParticipants.getAsInt(),
spilledPartitions.keySet(),
this::loadSpilledLookupSource,
this::disposeSpilledLookupSource));
this::disposeSpilledLookupSource,
this::spilledLookupSourceDisposed));
}

return partitionedConsumption;
Expand All @@ -358,6 +362,11 @@ private void disposeSpilledLookupSource(int partitionNumber)
getSpilledLookupSourceHandle(partitionNumber).dispose();
}

private ListenableFuture<Void> spilledLookupSourceDisposed(int partitionNumber)
{
return getSpilledLookupSourceHandle(partitionNumber).getDisposeCompleted();
}

private SpilledLookupSourceHandle getSpilledLookupSourceHandle(int partitionNumber)
{
lock.readLock().lock();
Expand Down
Expand Up @@ -35,7 +35,7 @@ private enum State
SPILLED,
UNSPILLING,
PRODUCED,
DISPOSED
DISPOSE_REQUESTED
}

@GuardedBy("this")
Expand All @@ -48,6 +48,7 @@ private enum State
private SettableFuture<Supplier<LookupSource>> unspilledLookupSource;

private final SettableFuture<Void> disposeRequested = SettableFuture.create();
private final SettableFuture<Void> disposeCompleted = SettableFuture.create();

private final ListenableFuture<Void> unspillingOrDisposeRequested = whenAnyComplete(ImmutableList.of(unspillingRequested, disposeRequested));

Expand All @@ -70,7 +71,7 @@ public synchronized void setLookupSource(Supplier<LookupSource> lookupSource)
{
requireNonNull(lookupSource, "lookupSource is null");

if (state == State.DISPOSED) {
if (state == State.DISPOSE_REQUESTED) {
return;
}

Expand All @@ -85,14 +86,24 @@ public synchronized void dispose()
{
disposeRequested.set(null);
unspilledLookupSource = null; // let the memory go
setState(State.DISPOSED);
setState(State.DISPOSE_REQUESTED);
}

public SettableFuture<Void> getDisposeRequested()
{
return disposeRequested;
}

public synchronized void setDisposeCompleted()
{
disposeCompleted.set(null);
}

public SettableFuture<Void> getDisposeCompleted()
{
return disposeCompleted;
}

public ListenableFuture<Void> getUnspillingOrDisposeRequested()
{
return unspillingOrDisposeRequested;
Expand Down