Skip to content

Commit

Permalink
Convert Split to record
Browse files Browse the repository at this point in the history
Since the records have generates equals and hashCode methods, we need to switch away from using Set<Split> in the scheduler
implementation which seems incorrect anyway as it suggests that we want to remove duplicates but hashCode and equals are not
part of the ConnectorSplit contract.
  • Loading branch information
wendigo committed Apr 18, 2024
1 parent 03b7e6c commit f6ad259
Show file tree
Hide file tree
Showing 32 changed files with 113 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ public void close()

private static String formatSplitInfo(Split split)
{
return split.getConnectorSplit().getClass().getSimpleName() + "{" + JOINER.join(split.getInfo()) + "}";
return split.connectorSplit().getClass().getSimpleName() + "{" + JOINER.join(split.getInfo()) + "}";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.trino.metadata.Split;

import java.util.List;
import java.util.Set;
import java.util.function.Supplier;

import static java.util.Objects.requireNonNull;
Expand All @@ -36,7 +35,7 @@ public DynamicSplitPlacementPolicy(NodeSelector nodeSelector, Supplier<? extends
}

@Override
public SplitPlacementResult computeAssignments(Set<Split> splits)
public SplitPlacementResult computeAssignments(List<Split> splits)
{
return nodeSelector.computeAssignments(splits, remoteTasks.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -187,7 +186,7 @@ public BucketedSplitPlacementPolicy(
}

@Override
public SplitPlacementResult computeAssignments(Set<Split> splits)
public SplitPlacementResult computeAssignments(List<Split> splits)
{
return nodeSelector.computeAssignments(splits, remoteTasks.get(), bucketNodeMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public static SplitPlacementResult selectDistributionNodes(
long maxSplitsWeightPerNode,
long minPendingSplitsWeightPerTask,
int maxUnacknowledgedSplitsPerTask,
Set<Split> splits,
List<Split> splits,
List<RemoteTask> existingTasks,
BucketNodeMap bucketNodeMap)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ default List<InternalNode> selectRandomNodes(int limit)
* If we cannot find an assignment for a split, it is not included in the map. Also returns a future indicating when
* to reattempt scheduling of this batch of splits, if some of them could not be scheduled.
*/
SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks);
SplitPlacementResult computeAssignments(List<Split> splits, List<RemoteTask> existingTasks);

/**
* Identifies the nodes for running the specified splits based on a precomputed fixed partitioning.
Expand All @@ -54,5 +54,5 @@ default List<InternalNode> selectRandomNodes(int limit)
* If we cannot find an assignment for a split, it is not included in the map. Also returns a future indicating when
* to reattempt scheduling of this batch of splits, if some of them could not be scheduled.
*/
SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks, BucketNodeMap bucketNodeMap);
SplitPlacementResult computeAssignments(List<Split> splits, List<RemoteTask> existingTasks, BucketNodeMap bucketNodeMap);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import io.trino.split.SplitSource.SplitBatch;
import io.trino.sql.planner.plan.PlanNodeId;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -92,7 +92,7 @@ private enum State
private final BooleanSupplier anySourceTaskBlocked;
private final PartitionIdAllocator partitionIdAllocator;
private final Map<InternalNode, RemoteTask> scheduledTasks;
private final Set<Split> pendingSplits = new HashSet<>();
private final List<Split> pendingSplits = new ArrayList<>();

private ListenableFuture<SplitBatch> nextSplitBatchFuture;
private ListenableFuture<Void> placementFuture = immediateVoidFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
import io.trino.metadata.Split;

import java.util.List;
import java.util.Set;

public interface SplitPlacementPolicy
{
SplitPlacementResult computeAssignments(Set<Split> splits);
SplitPlacementResult computeAssignments(List<Split> splits);

void lockDownNodes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public List<InternalNode> selectRandomNodes(int limit, Set<InternalNode> exclude
}

@Override
public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks)
public SplitPlacementResult computeAssignments(List<Split> splits, List<RemoteTask> existingTasks)
{
NodeMap nodeMap = this.nodeMap.get().get();
Multimap<InternalNode, Split> assignment = HashMultimap.create();
Expand Down Expand Up @@ -222,7 +222,7 @@ private long calculateMinPendingSplitsWeightPerTask(int splitAffinity, int total
}

@Override
public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks, BucketNodeMap bucketNodeMap)
public SplitPlacementResult computeAssignments(List<Split> splits, List<RemoteTask> existingTasks, BucketNodeMap bucketNodeMap)
{
return selectDistributionNodes(nodeMap.get().get(), nodeTaskMap, maxSplitsWeightPerNode, maxPendingSplitsWeightPerTask, maxUnacknowledgedSplitsPerTask, splits, existingTasks, bucketNodeMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -160,7 +161,7 @@ public List<InternalNode> selectRandomNodes(int limit, Set<InternalNode> exclude
}

@Override
public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks)
public SplitPlacementResult computeAssignments(List<Split> splits, List<RemoteTask> existingTasks)
{
Multimap<InternalNode, Split> assignment = HashMultimap.create();
NodeMap nodeMap = this.nodeMap.get().get();
Expand All @@ -170,7 +171,7 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
boolean splitWaitingForAnyNode = false;
// splitsToBeRedistributed becomes true only when splits go through locality-based assignment
boolean splitsToBeRedistributed = false;
Set<Split> remainingSplits = new HashSet<>(splits.size());
List<Split> remainingSplits = new ArrayList<>(splits.size());

List<InternalNode> filteredNodes = filterNodes(nodeMap, includeCoordinator, ImmutableSet.of());
ResettableRandomizedIterator<InternalNode> randomCandidates = new ResettableRandomizedIterator<>(filteredNodes);
Expand Down Expand Up @@ -268,7 +269,7 @@ else if (!splitWaitingForAnyNode) {
}

@Override
public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks, BucketNodeMap bucketNodeMap)
public SplitPlacementResult computeAssignments(List<Split> splits, List<RemoteTask> existingTasks, BucketNodeMap bucketNodeMap)
{
// TODO: Implement split assignment adjustment based on how quick node is able to process splits. More information https://github.com/trinodb/trino/pull/15168
return selectDistributionNodes(nodeMap.get().get(), nodeTaskMap, maxSplitsWeightPerNode, minPendingSplitsWeightPerTask, maxUnacknowledgedSplitsPerTask, splits, existingTasks, bucketNodeMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class ArbitraryDistributionSplitAssigner
public AssignmentResult assign(PlanNodeId planNodeId, ListMultimap<Integer, Split> splits, boolean noMoreSplits)
{
for (Split split : splits.values()) {
Optional<CatalogHandle> splitCatalogRequirement = Optional.of(split.getCatalogHandle())
Optional<CatalogHandle> splitCatalogRequirement = Optional.of(split.catalogHandle())
.filter(catalog -> !catalog.getType().isInternal() && !catalog.equals(REMOTE_CATALOG_HANDLE));
checkArgument(
catalogRequirement.isEmpty() || catalogRequirement.equals(splitCatalogRequirement),
Expand Down Expand Up @@ -343,7 +343,7 @@ public String toString()

private Optional<HostAddress> getHostRequirement(Split split)
{
if (split.getConnectorSplit().isRemotelyAccessible()) {
if (split.connectorSplit().isRemotelyAccessible()) {
return Optional.empty();
}
List<HostAddress> addresses = split.getAddresses();
Expand All @@ -369,8 +369,8 @@ private Optional<HostAddress> getHostRequirement(Split split)

private long getSplitSizeInBytes(Split split)
{
if (split.getCatalogHandle().equals(REMOTE_CATALOG_HANDLE)) {
RemoteSplit remoteSplit = (RemoteSplit) split.getConnectorSplit();
if (split.catalogHandle().equals(REMOTE_CATALOG_HANDLE)) {
RemoteSplit remoteSplit = (RemoteSplit) split.connectorSplit();
SpoolingExchangeInput exchangeInput = (SpoolingExchangeInput) remoteSplit.getExchangeInput();
long size = 0;
for (ExchangeSourceHandle handle : exchangeInput.getExchangeSourceHandles()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ private synchronized AssignmentResult process(IdempotentSplitSource.SplitBatchRe

private int getSplitPartition(Split split)
{
if (split.getConnectorSplit() instanceof RemoteSplit remoteSplit) {
if (split.connectorSplit() instanceof RemoteSplit remoteSplit) {
SpoolingExchangeInput exchangeInput = (SpoolingExchangeInput) remoteSplit.getExchangeInput();
List<ExchangeSourceHandle> handles = exchangeInput.getExchangeSourceHandles();
return handles.get(0).getPartitionId();
Expand Down
41 changes: 7 additions & 34 deletions core/trino-main/src/main/java/io/trino/metadata/Split.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
*/
package io.trino.metadata;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import io.trino.spi.HostAddress;
import io.trino.spi.SplitWeight;
Expand All @@ -26,36 +24,17 @@
import java.util.Map;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.MoreObjects.toStringHelper;
import static io.airlift.slice.SizeOf.instanceSize;
import static java.util.Objects.requireNonNull;

public final class Split
public record Split(CatalogHandle catalogHandle, ConnectorSplit connectorSplit)
{
private static final int INSTANCE_SIZE = instanceSize(Split.class);

private final CatalogHandle catalogHandle;
private final ConnectorSplit connectorSplit;

@JsonCreator
public Split(
@JsonProperty("catalogHandle") CatalogHandle catalogHandle,
@JsonProperty("connectorSplit") ConnectorSplit connectorSplit)
{
this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null");
this.connectorSplit = requireNonNull(connectorSplit, "connectorSplit is null");
}

@JsonProperty
public CatalogHandle getCatalogHandle()
{
return catalogHandle;
}

@JsonProperty
public ConnectorSplit getConnectorSplit()
public Split
{
return connectorSplit;
requireNonNull(catalogHandle, "catalogHandle is null");
requireNonNull(connectorSplit, "connectorSplit is null");
}

@JsonIgnore
Expand All @@ -64,30 +43,24 @@ public Map<String, String> getInfo()
return firstNonNull(connectorSplit.getSplitInfo(), ImmutableMap.of());
}

@JsonIgnore
public List<HostAddress> getAddresses()
{
return connectorSplit.getAddresses();
}

@JsonIgnore
public boolean isRemotelyAccessible()
{
return connectorSplit.isRemotelyAccessible();
}

@JsonIgnore
public SplitWeight getSplitWeight()
{
return connectorSplit.getSplitWeight();
}

@Override
public String toString()
{
return toStringHelper(this)
.add("catalogHandle", catalogHandle)
.add("connectorSplit", connectorSplit)
.toString();
}

public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ public PlanNodeId getSourceId()
public void addSplit(Split split)
{
requireNonNull(split, "split is null");
checkArgument(split.getCatalogHandle().equals(REMOTE_CATALOG_HANDLE), "split is not a remote split");
checkArgument(split.catalogHandle().equals(REMOTE_CATALOG_HANDLE), "split is not a remote split");

RemoteSplit remoteSplit = (RemoteSplit) split.getConnectorSplit();
RemoteSplit remoteSplit = (RemoteSplit) split.connectorSplit();
exchangeDataSource.addInput(remoteSplit.getExchangeInput());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void addInput(Page page)
public void addSplit(Split split)
{
checkState(!noMoreSplits, "no more splits expected");
pendingSplits.add(split.getConnectorSplit());
pendingSplits.add(split.connectorSplit());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public PlanNodeId getSourceId()
public void addSplit(Split split)
{
requireNonNull(split, "split is null");
checkArgument(split.getConnectorSplit() instanceof RemoteSplit, "split is not a remote split");
checkArgument(split.connectorSplit() instanceof RemoteSplit, "split is not a remote split");
checkState(!blockedOnSplits.isDone(), "noMoreSplits has been called already");

TaskContext taskContext = operatorContext.getDriverContext().getPipelineContext().getTaskContext();
Expand All @@ -173,7 +173,7 @@ public void addSplit(Split split)
operatorContext.localUserMemoryContext(),
taskContext::sourceTaskFailed,
RetryPolicy.NONE));
RemoteSplit remoteSplit = (RemoteSplit) split.getConnectorSplit();
RemoteSplit remoteSplit = (RemoteSplit) split.connectorSplit();
// Only fault tolerant execution mode is expected to execute external exchanges.
// MergeOperator is used for distributed sort only and it is not compatible (and disabled) with fault tolerant execution mode.
DirectExchangeInput exchangeInput = (DirectExchangeInput) remoteSplit.getExchangeInput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public TransformationState<WorkProcessor<Page>> process(Split split)
}

ConnectorPageSource source;
if (split.getConnectorSplit() instanceof EmptySplit) {
if (split.connectorSplit() instanceof EmptySplit) {
source = new EmptyPageSource();
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,12 @@ public void addSplit(Split split)

Map<String, String> splitInfo = split.getInfo();
if (!splitInfo.isEmpty()) {
operatorContext.setInfoSupplier(Suppliers.ofInstance(new SplitOperatorInfo(split.getCatalogHandle(), splitInfo)));
operatorContext.setInfoSupplier(Suppliers.ofInstance(new SplitOperatorInfo(split.catalogHandle(), splitInfo)));
}

blocked.set(null);

if (split.getConnectorSplit() instanceof EmptySplit) {
if (split.connectorSplit() instanceof EmptySplit) {
source = new EmptyPageSource();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void addSplit(Split split)

Map<String, String> splitInfo = split.getInfo();
if (!splitInfo.isEmpty()) {
operatorContext.setInfoSupplier(Suppliers.ofInstance(new SplitOperatorInfo(split.getCatalogHandle(), splitInfo)));
operatorContext.setInfoSupplier(Suppliers.ofInstance(new SplitOperatorInfo(split.catalogHandle(), splitInfo)));
}

splitBuffer.add(split);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void addSplit(Split split)
requireNonNull(split, "split is null");
checkState(source == null, "Index source split already set");

IndexSplit indexSplit = (IndexSplit) split.getConnectorSplit();
IndexSplit indexSplit = (IndexSplit) split.connectorSplit();

// Normalize the incoming RecordSet to something that can be consumed by the index
RecordSet normalizedRecordSet = probeKeyNormalizer.apply(indexSplit.getKeyRecordSet());
Expand All @@ -130,7 +130,7 @@ public void addSplit(Split split)

Map<String, String> splitInfo = split.getInfo();
if (!splitInfo.isEmpty()) {
operatorContext.setInfoSupplier(Suppliers.ofInstance(new SplitOperatorInfo(split.getCatalogHandle(), splitInfo)));
operatorContext.setInfoSupplier(Suppliers.ofInstance(new SplitOperatorInfo(split.catalogHandle(), splitInfo)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public PageSourceManager(CatalogServiceProvider<ConnectorPageSourceProvider> pag
public ConnectorPageSource createPageSource(Session session, Split split, TableHandle table, List<ColumnHandle> columns, DynamicFilter dynamicFilter)
{
requireNonNull(columns, "columns is null");
checkArgument(split.getCatalogHandle().equals(table.catalogHandle()), "mismatched split and table");
CatalogHandle catalogHandle = split.getCatalogHandle();
checkArgument(split.catalogHandle().equals(table.catalogHandle()), "mismatched split and table");
CatalogHandle catalogHandle = split.catalogHandle();

ConnectorPageSourceProvider provider = pageSourceProvider.getService(catalogHandle);
TupleDomain<ColumnHandle> constraint = dynamicFilter.getCurrentPredicate();
Expand All @@ -61,7 +61,7 @@ public ConnectorPageSource createPageSource(Session session, Split split, TableH
return provider.createPageSource(
table.transaction(),
session.toConnectorSession(catalogHandle),
split.getConnectorSplit(),
split.connectorSplit(),
table.connectorHandle(),
columns,
dynamicFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,11 @@ public ToIntFunction<Split> getSplitToBucket(Session session, PartitioningHandle

return split -> {
int bucket;
if (split.getConnectorSplit() instanceof EmptySplit) {
if (split.connectorSplit() instanceof EmptySplit) {
bucket = 0;
}
else {
bucket = splitBucketFunction.applyAsInt(split.getConnectorSplit());
bucket = splitBucketFunction.applyAsInt(split.connectorSplit());
}
return bucket;
};
Expand Down
Loading

0 comments on commit f6ad259

Please sign in to comment.