Skip to content

Commit

Permalink
Only rebalance shards to worker nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Mar 17, 2016
1 parent c7bf63a commit b8da841
Showing 1 changed file with 11 additions and 7 deletions.
Expand Up @@ -13,6 +13,7 @@
*/ */
package com.facebook.presto.raptor.storage; package com.facebook.presto.raptor.storage;


import com.facebook.presto.raptor.NodeSupplier;
import com.facebook.presto.raptor.RaptorConnectorId; import com.facebook.presto.raptor.RaptorConnectorId;
import com.facebook.presto.raptor.backup.BackupStore; import com.facebook.presto.raptor.backup.BackupStore;
import com.facebook.presto.raptor.metadata.ShardManager; import com.facebook.presto.raptor.metadata.ShardManager;
Expand Down Expand Up @@ -45,7 +46,6 @@
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;


import static com.facebook.presto.spi.NodeState.ACTIVE;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Maps.filterKeys; import static com.google.common.collect.Maps.filterKeys;
Expand All @@ -64,7 +64,8 @@ public class ShardEjector
{ {
private static final Logger log = Logger.get(ShardEjector.class); private static final Logger log = Logger.get(ShardEjector.class);


private final NodeManager nodeManager; private final String currentNode;
private final NodeSupplier nodeSupplier;
private final ShardManager shardManager; private final ShardManager shardManager;
private final StorageService storageService; private final StorageService storageService;
private final Duration interval; private final Duration interval;
Expand All @@ -79,13 +80,15 @@ public class ShardEjector
@Inject @Inject
public ShardEjector( public ShardEjector(
NodeManager nodeManager, NodeManager nodeManager,
NodeSupplier nodeSupplier,
ShardManager shardManager, ShardManager shardManager,
StorageService storageService, StorageService storageService,
StorageManagerConfig config, StorageManagerConfig config,
Optional<BackupStore> backupStore, Optional<BackupStore> backupStore,
RaptorConnectorId connectorId) RaptorConnectorId connectorId)
{ {
this(nodeManager, this(nodeManager.getCurrentNode().getNodeIdentifier(),
nodeSupplier,
shardManager, shardManager,
storageService, storageService,
config.getShardEjectorInterval(), config.getShardEjectorInterval(),
Expand All @@ -94,14 +97,16 @@ public ShardEjector(
} }


public ShardEjector( public ShardEjector(
NodeManager nodeManager, String currentNode,
NodeSupplier nodeSupplier,
ShardManager shardManager, ShardManager shardManager,
StorageService storageService, StorageService storageService,
Duration interval, Duration interval,
Optional<BackupStore> backupStore, Optional<BackupStore> backupStore,
String connectorId) String connectorId)
{ {
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); this.currentNode = requireNonNull(currentNode, "currentNode is null");
this.nodeSupplier = requireNonNull(nodeSupplier, "nodeSupplier is null");
this.shardManager = requireNonNull(shardManager, "shardManager is null"); this.shardManager = requireNonNull(shardManager, "shardManager is null");
this.storageService = requireNonNull(storageService, "storageService is null"); this.storageService = requireNonNull(storageService, "storageService is null");
this.interval = requireNonNull(interval, "interval is null"); this.interval = requireNonNull(interval, "interval is null");
Expand Down Expand Up @@ -167,7 +172,7 @@ void process()
// get the size of assigned shards for each node // get the size of assigned shards for each node
Map<String, Long> nodes = shardManager.getNodeBytes(); Map<String, Long> nodes = shardManager.getNodeBytes();


Set<String> activeNodes = nodeManager.getNodes(ACTIVE).stream() Set<String> activeNodes = nodeSupplier.getWorkerNodes().stream()
.map(Node::getNodeIdentifier) .map(Node::getNodeIdentifier)
.collect(toSet()); .collect(toSet());


Expand All @@ -179,7 +184,6 @@ void process()
} }


// get current node size // get current node size
String currentNode = nodeManager.getCurrentNode().getNodeIdentifier();
if (!nodes.containsKey(currentNode)) { if (!nodes.containsKey(currentNode)) {
return; return;
} }
Expand Down

0 comments on commit b8da841

Please sign in to comment.