Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup uses of node apis #578

Merged
merged 3 commits into from
Apr 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ else if (insertExistingPartitionsBehavior == InsertExistingPartitionsBehavior.ER
outputStorageFormat.getOutputFormat(),
writerImplementation,
nodeManager.getCurrentNode().getVersion(),
nodeManager.getCurrentNode().getHttpUri().getHost(),
nodeManager.getCurrentNode().getHost(),
session.getIdentity().getPrincipal().map(Principal::getName).orElse(null),
nodeManager.getEnvironment(),
sessionProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.google.common.collect.ImmutableSet;
import io.airlift.units.Duration;
import io.prestosql.client.NodeVersion;
import io.prestosql.metadata.PrestoNode;
import io.prestosql.metadata.InternalNode;
import io.prestosql.spi.HostAddress;
import io.prestosql.spi.Node;
import io.prestosql.spi.NodeManager;
Expand Down Expand Up @@ -218,6 +218,6 @@ private static List<ConnectorSplit> getAllSplits(ConnectorSplitSource splitSourc

private static Node createTestingNode(String hostname)
{
return new PrestoNode(hostname, URI.create(format("http://%s:8080", hostname)), NodeVersion.UNKNOWN, false);
return new InternalNode(hostname, URI.create(format("http://%s:8080", hostname)), NodeVersion.UNKNOWN, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public Set<Node> getAllNodes()
@Override
public Set<Node> getWorkerNodes()
{
return nodeManager.getActiveConnectorNodes(catalogName);
return ImmutableSet.copyOf(nodeManager.getActiveConnectorNodes(catalogName));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
package io.prestosql.connector.system;

import io.prestosql.metadata.AllNodes;
import io.prestosql.metadata.InternalNode;
import io.prestosql.metadata.InternalNodeManager;
import io.prestosql.metadata.NodeState;
import io.prestosql.metadata.PrestoNode;
import io.prestosql.spi.Node;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorTableMetadata;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
Expand Down Expand Up @@ -86,22 +85,19 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
return table.build().cursor();
}

private void addRows(Builder table, Set<Node> nodes, NodeState state)
private void addRows(Builder table, Set<InternalNode> nodes, NodeState state)
{
for (Node node : nodes) {
table.addRow(node.getNodeIdentifier(), node.getHttpUri().toString(), getNodeVersion(node), isCoordinator(node), state.toString().toLowerCase(Locale.ENGLISH));
for (InternalNode node : nodes) {
table.addRow(node.getNodeIdentifier(), node.getInternalUri().toString(), getNodeVersion(node), isCoordinator(node), state.toString().toLowerCase(Locale.ENGLISH));
}
}

private static String getNodeVersion(Node node)
private static String getNodeVersion(InternalNode node)
{
if (node instanceof PrestoNode) {
return ((PrestoNode) node).getNodeVersion().toString();
}
return "";
return node.getNodeVersion().toString();
}

private boolean isCoordinator(Node node)
private boolean isCoordinator(InternalNode node)
{
return nodeManager.getCoordinators().contains(node);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.prestosql.metadata.InternalNode;
import io.prestosql.metadata.InternalNodeManager;
import io.prestosql.spi.HostAddress;
import io.prestosql.spi.Node;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorSession;
Expand Down Expand Up @@ -72,15 +72,15 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand
}

ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder();
ImmutableSet.Builder<Node> nodes = ImmutableSet.builder();
ImmutableSet.Builder<InternalNode> nodes = ImmutableSet.builder();
if (tableDistributionMode == ALL_COORDINATORS) {
nodes.addAll(nodeManager.getCoordinators());
}
else if (tableDistributionMode == ALL_NODES) {
nodes.addAll(nodeManager.getNodes(ACTIVE));
}
Set<Node> nodeSet = nodes.build();
for (Node node : nodeSet) {
Set<InternalNode> nodeSet = nodes.build();
for (InternalNode node : nodeSet) {
splits.add(new SystemSplit(node.getHostAndPort(), constraint));
}
return new FixedSplitSource(splits.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

import io.prestosql.Session;
import io.prestosql.execution.scheduler.NodeSchedulerConfig;
import io.prestosql.metadata.InternalNode;
import io.prestosql.metadata.InternalNodeManager;
import io.prestosql.spi.Node;

import javax.inject.Inject;

Expand All @@ -38,7 +38,7 @@ public TaskCountEstimator(NodeSchedulerConfig nodeSchedulerConfig, InternalNodeM
requireNonNull(nodeSchedulerConfig, "nodeSchedulerConfig is null");
requireNonNull(nodeManager, "nodeManager is null");
this.numberOfNodes = () -> {
Set<Node> activeNodes = nodeManager.getAllNodes().getActiveNodes();
Set<InternalNode> activeNodes = nodeManager.getAllNodes().getActiveNodes();
if (nodeSchedulerConfig.isIncludeCoordinator()) {
return activeNodes.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
package io.prestosql.execution;

import io.prestosql.spi.Node;
import io.prestosql.metadata.InternalNode;
import io.prestosql.spi.QueryId;

import java.net.URI;
Expand All @@ -26,7 +26,7 @@ public interface LocationFactory

URI createLocalTaskLocation(TaskId taskId);

URI createTaskLocation(Node node, TaskId taskId);
URI createTaskLocation(InternalNode node, TaskId taskId);

URI createMemoryInfoLocation(Node node);
URI createMemoryInfoLocation(InternalNode node);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import io.prestosql.execution.NodeTaskMap.PartitionedSplitCountTracker;
import io.prestosql.execution.StateMachine.StateChangeListener;
import io.prestosql.execution.buffer.OutputBuffers;
import io.prestosql.metadata.InternalNode;
import io.prestosql.metadata.Split;
import io.prestosql.spi.Node;
import io.prestosql.sql.planner.PlanFragment;
import io.prestosql.sql.planner.plan.PlanNodeId;

Expand All @@ -42,7 +42,7 @@ public MemoryTrackingRemoteTaskFactory(RemoteTaskFactory remoteTaskFactory, Quer
@Override
public RemoteTask createRemoteTask(Session session,
TaskId taskId,
Node node,
InternalNode node,
PlanFragment fragment,
Multimap<PlanNodeId, Split> initialSplits,
OptionalInt totalPartitions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import com.google.common.collect.Sets;
import io.airlift.log.Logger;
import io.prestosql.spi.Node;
import io.prestosql.metadata.InternalNode;
import io.prestosql.util.FinalizerService;

import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -33,7 +33,7 @@
public class NodeTaskMap
{
private static final Logger log = Logger.get(NodeTaskMap.class);
private final ConcurrentHashMap<Node, NodeTasks> nodeTasksMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<InternalNode, NodeTasks> nodeTasksMap = new ConcurrentHashMap<>();
private final FinalizerService finalizerService;

@Inject
Expand All @@ -42,22 +42,22 @@ public NodeTaskMap(FinalizerService finalizerService)
this.finalizerService = requireNonNull(finalizerService, "finalizerService is null");
}

public void addTask(Node node, RemoteTask task)
public void addTask(InternalNode node, RemoteTask task)
{
createOrGetNodeTasks(node).addTask(task);
}

public int getPartitionedSplitsOnNode(Node node)
public int getPartitionedSplitsOnNode(InternalNode node)
{
return createOrGetNodeTasks(node).getPartitionedSplitCount();
}

public PartitionedSplitCountTracker createPartitionedSplitCountTracker(Node node, TaskId taskId)
public PartitionedSplitCountTracker createPartitionedSplitCountTracker(InternalNode node, TaskId taskId)
{
return createOrGetNodeTasks(node).createPartitionedSplitCountTracker(taskId);
}

private NodeTasks createOrGetNodeTasks(Node node)
private NodeTasks createOrGetNodeTasks(InternalNode node)
{
NodeTasks nodeTasks = nodeTasksMap.get(node);
if (nodeTasks == null) {
Expand All @@ -66,7 +66,7 @@ private NodeTasks createOrGetNodeTasks(Node node)
return nodeTasks;
}

private NodeTasks addNodeTask(Node node)
private NodeTasks addNodeTask(InternalNode node)
{
NodeTasks newNodeTasks = new NodeTasks(finalizerService);
NodeTasks nodeTasks = nodeTasksMap.putIfAbsent(node, newNodeTasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import io.prestosql.Session;
import io.prestosql.execution.NodeTaskMap.PartitionedSplitCountTracker;
import io.prestosql.execution.buffer.OutputBuffers;
import io.prestosql.metadata.InternalNode;
import io.prestosql.metadata.Split;
import io.prestosql.spi.Node;
import io.prestosql.sql.planner.PlanFragment;
import io.prestosql.sql.planner.plan.PlanNodeId;

Expand All @@ -28,7 +28,7 @@ public interface RemoteTaskFactory
{
RemoteTask createRemoteTask(Session session,
TaskId taskId,
Node node,
InternalNode node,
PlanFragment fragment,
Multimap<PlanNodeId, Split> initialSplits,
OptionalInt totalPartitions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import io.prestosql.execution.buffer.OutputBuffers;
import io.prestosql.execution.scheduler.SplitSchedulerStats;
import io.prestosql.failuredetector.FailureDetector;
import io.prestosql.metadata.InternalNode;
import io.prestosql.metadata.Split;
import io.prestosql.spi.Node;
import io.prestosql.spi.PrestoException;
import io.prestosql.split.RemoteSplit;
import io.prestosql.sql.planner.PlanFragment;
Expand Down Expand Up @@ -79,7 +79,7 @@ public final class SqlStageExecution

private final Map<PlanFragmentId, RemoteSourceNode> exchangeSources;

private final Map<Node, Set<RemoteTask>> tasks = new ConcurrentHashMap<>();
private final Map<InternalNode, Set<RemoteTask>> tasks = new ConcurrentHashMap<>();

@GuardedBy("this")
private final AtomicInteger nextTaskId = new AtomicInteger();
Expand Down Expand Up @@ -358,7 +358,7 @@ public List<RemoteTask> getAllTasks()
.collect(toImmutableList());
}

public synchronized Optional<RemoteTask> scheduleTask(Node node, int partition, OptionalInt totalPartitions)
public synchronized Optional<RemoteTask> scheduleTask(InternalNode node, int partition, OptionalInt totalPartitions)
{
requireNonNull(node, "node is null");

Expand All @@ -369,7 +369,7 @@ public synchronized Optional<RemoteTask> scheduleTask(Node node, int partition,
return Optional.of(scheduleTask(node, new TaskId(stateMachine.getStageId(), partition), ImmutableMultimap.of(), totalPartitions));
}

public synchronized Set<RemoteTask> scheduleSplits(Node node, Multimap<PlanNodeId, Split> splits, Multimap<PlanNodeId, Lifespan> noMoreSplitsNotification)
public synchronized Set<RemoteTask> scheduleSplits(InternalNode node, Multimap<PlanNodeId, Split> splits, Multimap<PlanNodeId, Lifespan> noMoreSplitsNotification)
{
requireNonNull(node, "node is null");
requireNonNull(splits, "splits is null");
Expand Down Expand Up @@ -407,7 +407,7 @@ public synchronized Set<RemoteTask> scheduleSplits(Node node, Multimap<PlanNodeI
return newTasks.build();
}

private synchronized RemoteTask scheduleTask(Node node, TaskId taskId, Multimap<PlanNodeId, Split> sourceSplits, OptionalInt totalPartitions)
private synchronized RemoteTask scheduleTask(InternalNode node, TaskId taskId, Multimap<PlanNodeId, Split> sourceSplits, OptionalInt totalPartitions)
{
checkArgument(!allTasks.contains(taskId), "A task with id %s already exists", taskId);

Expand Down Expand Up @@ -455,7 +455,7 @@ private synchronized RemoteTask scheduleTask(Node node, TaskId taskId, Multimap<
return task;
}

public Set<Node> getScheduledNodes()
public Set<InternalNode> getScheduledNodes()
{
return ImmutableSet.copyOf(tasks.keySet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
*/
package io.prestosql.execution.scheduler;

import io.prestosql.metadata.InternalNode;
import io.prestosql.metadata.Split;
import io.prestosql.spi.Node;

import java.util.Optional;
import java.util.function.ToIntFunction;
Expand All @@ -32,13 +32,13 @@ public BucketNodeMap(ToIntFunction<Split> splitToBucket)

public abstract int getBucketCount();

public abstract Optional<Node> getAssignedNode(int bucketedId);
public abstract Optional<InternalNode> getAssignedNode(int bucketedId);

public abstract void assignBucketToNode(int bucketedId, Node node);
public abstract void assignBucketToNode(int bucketedId, InternalNode node);

public abstract boolean isDynamic();

public final Optional<Node> getAssignedNode(Split split)
public final Optional<InternalNode> getAssignedNode(Split split)
{
return getAssignedNode(splitToBucket.applyAsInt(split));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
package io.prestosql.execution.scheduler;

import io.prestosql.execution.RemoteTask;
import io.prestosql.metadata.InternalNode;
import io.prestosql.metadata.Split;
import io.prestosql.spi.Node;

import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -48,7 +48,7 @@ public void lockDownNodes()
}

@Override
public List<Node> allNodes()
public List<InternalNode> allNodes()
{
return nodeSelector.allNodes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
package io.prestosql.execution.scheduler;

import com.google.common.collect.ImmutableList;
import io.prestosql.metadata.InternalNode;
import io.prestosql.metadata.Split;
import io.prestosql.spi.Node;

import java.util.List;
import java.util.Optional;
Expand All @@ -27,16 +27,16 @@
public class FixedBucketNodeMap
extends BucketNodeMap
{
private final List<Node> bucketToNode;
private final List<InternalNode> bucketToNode;

public FixedBucketNodeMap(ToIntFunction<Split> splitToBucket, List<Node> bucketToNode)
public FixedBucketNodeMap(ToIntFunction<Split> splitToBucket, List<InternalNode> bucketToNode)
{
super(splitToBucket);
this.bucketToNode = ImmutableList.copyOf(requireNonNull(bucketToNode, "bucketToNode is null"));
}

@Override
public Optional<Node> getAssignedNode(int bucketedId)
public Optional<InternalNode> getAssignedNode(int bucketedId)
{
return Optional.of(bucketToNode.get(bucketedId));
}
Expand All @@ -48,7 +48,7 @@ public int getBucketCount()
}

@Override
public void assignBucketToNode(int bucketedId, Node node)
public void assignBucketToNode(int bucketedId, InternalNode node)
{
throw new UnsupportedOperationException();
}
Expand Down
Loading