Skip to content

Commit

Permalink
Move network topology location segment names to config
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Sep 9, 2019
1 parent 6ef45e5 commit bedd0d8
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 38 deletions.
Expand Up @@ -13,11 +13,8 @@
*/
package io.prestosql.execution.scheduler;

import com.google.common.collect.ImmutableList;
import io.prestosql.spi.HostAddress;

import java.util.List;

public class FlatNetworkTopology
implements NetworkTopology
{
Expand All @@ -26,10 +23,4 @@ public NetworkLocation locate(HostAddress address)
{
return new NetworkLocation(address.getHostText());
}

@Override
public List<String> getLocationSegmentNames()
{
return ImmutableList.of("machine");
}
}
Expand Up @@ -17,19 +17,11 @@

import javax.annotation.concurrent.ThreadSafe;

import java.util.List;

/**
* Implementations of this interface must be thread safe.
*/
@ThreadSafe
public interface NetworkTopology
{
NetworkLocation locate(HostAddress address);

/**
* Strings describing the meaning of each segment of a NetworkLocation returned from locate().
* This method must return a constant.
*/
List<String> getLocationSegmentNames();
}
@@ -0,0 +1,51 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.execution.scheduler;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.airlift.configuration.Config;

import javax.validation.constraints.NotNull;

import java.util.List;

import static java.util.Objects.requireNonNull;

public class TopologyAwareNodeSelectorConfig
{
private List<String> locationSegmentNames = ImmutableList.of("machine");

@NotNull
public List<String> getLocationSegmentNames()
{
return locationSegmentNames;
}

public TopologyAwareNodeSelectorConfig setLocationSegmentNames(List<String> locationSegmentNames)
{
this.locationSegmentNames = requireNonNull(locationSegmentNames, "locationSegmentNames is null");
return this;
}

@Config("node-scheduler.network-topology-segments")
public TopologyAwareNodeSelectorConfig setLocationSegmentNames(String locationSegmentNames)
{
this.locationSegmentNames = Splitter.on(",")
.trimResults()
.omitEmptyStrings()
.splitToList(locationSegmentNames);
return this;
}
}
Expand Up @@ -68,20 +68,22 @@ public class TopologyAwareNodeSelectorFactory
public TopologyAwareNodeSelectorFactory(
NetworkTopology networkTopology,
InternalNodeManager nodeManager,
NodeSchedulerConfig config,
NodeTaskMap nodeTaskMap)
NodeSchedulerConfig schedulerConfig,
NodeTaskMap nodeTaskMap,
TopologyAwareNodeSelectorConfig topologyConfig)
{
requireNonNull(networkTopology, "networkTopology is null");
requireNonNull(nodeManager, "nodeManager is null");
requireNonNull(config, "config is null");
requireNonNull(schedulerConfig, "schedulerConfig is null");
requireNonNull(nodeTaskMap, "nodeTaskMap is null");
requireNonNull(topologyConfig, "topologyConfig is null");

this.networkTopology = networkTopology;
this.nodeManager = nodeManager;
this.minCandidates = config.getMinCandidates();
this.includeCoordinator = config.isIncludeCoordinator();
this.maxSplitsPerNode = config.getMaxSplitsPerNode();
this.maxPendingSplitsPerTask = config.getMaxPendingSplitsPerTask();
this.minCandidates = schedulerConfig.getMinCandidates();
this.includeCoordinator = schedulerConfig.isIncludeCoordinator();
this.maxSplitsPerNode = schedulerConfig.getMaxSplitsPerNode();
this.maxPendingSplitsPerTask = schedulerConfig.getMaxPendingSplitsPerTask();
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
checkArgument(maxSplitsPerNode >= maxPendingSplitsPerTask, "maxSplitsPerNode must be > maxPendingSplitsPerTask");

Expand All @@ -93,7 +95,7 @@ public TopologyAwareNodeSelectorFactory(
placementCounters.add(allCounter);
placementCountersByName.put("all", allCounter);

for (String segmentName : ImmutableList.copyOf(networkTopology.getLocationSegmentNames())) {
for (String segmentName : ImmutableList.copyOf(topologyConfig.getLocationSegmentNames())) {
CounterStat segmentCounter = new CounterStat();
placementCounters.add(segmentCounter);
placementCountersByName.put(segmentName, segmentCounter);
Expand Down
Expand Up @@ -17,12 +17,15 @@
import com.google.inject.Module;
import com.google.inject.Scopes;

import static io.airlift.configuration.ConfigBinder.configBinder;

public class TopologyAwareNodeSelectorModule
implements Module
{
@Override
public void configure(Binder binder)
{
configBinder(binder).bindConfig(TopologyAwareNodeSelectorConfig.class);
binder.bind(NetworkTopology.class).to(FlatNetworkTopology.class).in(Scopes.SINGLETON);
binder.bind(TopologyAwareNodeSelectorFactory.class).in(Scopes.SINGLETON);
binder.bind(NodeSelectorFactory.class).to(TopologyAwareNodeSelectorFactory.class).in(Scopes.SINGLETON);
Expand Down
Expand Up @@ -27,6 +27,7 @@
import io.prestosql.execution.scheduler.NodeSchedulerConfig;
import io.prestosql.execution.scheduler.NodeSelector;
import io.prestosql.execution.scheduler.NodeSelectorFactory;
import io.prestosql.execution.scheduler.TopologyAwareNodeSelectorConfig;
import io.prestosql.execution.scheduler.TopologyAwareNodeSelectorFactory;
import io.prestosql.execution.scheduler.UniformNodeSelectorFactory;
import io.prestosql.metadata.InMemoryNodeManager;
Expand Down Expand Up @@ -196,9 +197,9 @@ private NodeSelectorFactory getNodeSelectorFactory(InMemoryNodeManager nodeManag
case "uniform":
return new UniformNodeSelectorFactory(nodeManager, nodeSchedulerConfig, nodeTaskMap);
case "topology":
return new TopologyAwareNodeSelectorFactory(new FlatNetworkTopology(), nodeManager, nodeSchedulerConfig, nodeTaskMap);
return new TopologyAwareNodeSelectorFactory(new FlatNetworkTopology(), nodeManager, nodeSchedulerConfig, nodeTaskMap, new TopologyAwareNodeSelectorConfig());
case "benchmark":
return new TopologyAwareNodeSelectorFactory(new BenchmarkNetworkTopology(), nodeManager, nodeSchedulerConfig, nodeTaskMap);
return new TopologyAwareNodeSelectorFactory(new BenchmarkNetworkTopology(), nodeManager, nodeSchedulerConfig, nodeTaskMap, getBenchmarkNetworkTopologyConfig());
default:
throw new IllegalStateException();
}
Expand Down Expand Up @@ -240,12 +241,12 @@ public NetworkLocation locate(HostAddress address)
Collections.reverse(parts);
return NetworkLocation.create(parts);
}
}

@Override
public List<String> getLocationSegmentNames()
{
return ImmutableList.of("rack", "machine");
}
private static TopologyAwareNodeSelectorConfig getBenchmarkNetworkTopologyConfig()
{
return new TopologyAwareNodeSelectorConfig()
.setLocationSegmentNames(ImmutableList.of("rack", "machine"));
}

private static class TestSplitRemote
Expand Down
Expand Up @@ -30,6 +30,7 @@
import io.prestosql.execution.scheduler.NodeSchedulerConfig;
import io.prestosql.execution.scheduler.NodeSelector;
import io.prestosql.execution.scheduler.NodeSelectorFactory;
import io.prestosql.execution.scheduler.TopologyAwareNodeSelectorConfig;
import io.prestosql.execution.scheduler.TopologyAwareNodeSelectorFactory;
import io.prestosql.execution.scheduler.UniformNodeSelector;
import io.prestosql.execution.scheduler.UniformNodeSelectorFactory;
Expand Down Expand Up @@ -171,7 +172,7 @@ public void testTopologyAwareScheduling()
.setMaxPendingSplitsPerTask(20);

TestNetworkTopology topology = new TestNetworkTopology();
NodeSelectorFactory nodeSelectorFactory = new TopologyAwareNodeSelectorFactory(topology, nodeManager, nodeSchedulerConfig, nodeTaskMap);
NodeSelectorFactory nodeSelectorFactory = new TopologyAwareNodeSelectorFactory(topology, nodeManager, nodeSchedulerConfig, nodeTaskMap, getNetworkTopologyConfig());
NodeScheduler nodeScheduler = new NodeScheduler(nodeSelectorFactory);
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(Optional.of(CONNECTOR_ID));

Expand Down Expand Up @@ -799,11 +800,11 @@ public NetworkLocation locate(HostAddress address)
Collections.reverse(parts);
return NetworkLocation.create(parts);
}
}

@Override
public List<String> getLocationSegmentNames()
{
return ImmutableList.of("rack", "machine");
}
private static TopologyAwareNodeSelectorConfig getNetworkTopologyConfig()
{
return new TopologyAwareNodeSelectorConfig()
.setLocationSegmentNames(ImmutableList.of("rack", "machine"));
}
}
@@ -0,0 +1,44 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.execution.scheduler;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.configuration.testing.ConfigAssertions;
import org.testng.annotations.Test;

import java.util.Map;

public class TestTopologyAwareNodeSelectorConfig
{
@Test
public void testDefaults()
{
ConfigAssertions.assertRecordedDefaults(ConfigAssertions.recordDefaults(TopologyAwareNodeSelectorConfig.class)
.setLocationSegmentNames("machine"));
}

@Test
public void testExplicitPropertyMappings()
{
Map<String, String> properties = new ImmutableMap.Builder<String, String>()
.put("node-scheduler.network-topology-segments", "rack,machine")
.build();

TopologyAwareNodeSelectorConfig expected = new TopologyAwareNodeSelectorConfig()
.setLocationSegmentNames(ImmutableList.of("rack", "machine"));

ConfigAssertions.assertFullMapping(properties, expected);
}
}

0 comments on commit bedd0d8

Please sign in to comment.