Skip to content

Commit

Permalink
Notify routing table updates to replication manager and query the lat…
Browse files Browse the repository at this point in the history
…est leader replica set from Routing table snapshot (#1518)

Notify the routing table updates in Helix dynamic cluster change handler to Replication Manager. On receiving this notification, replication Manager will:

Re-query the latest set of remote leader replicas for the leader partitions on local node from the routing table snapshot and update its internal map.
Assign the new remote leader replicas to replica threads (will be handled in future PRs)
Remove old leader replicas to replica threads (will be handled in future PRs)
  • Loading branch information
Arun-LinkedIn committed May 27, 2020
1 parent 0b646cd commit 56f0385
Show file tree
Hide file tree
Showing 7 changed files with 281 additions and 40 deletions.
Expand Up @@ -26,4 +26,12 @@ public interface ClusterMapChangeListener {
* @param removedReplicas {@link ReplicaId}(s) that have been removed.
*/
void onReplicaAddedOrRemoved(List<ReplicaId> addedReplicas, List<ReplicaId> removedReplicas);

/**
* Take actions when there is a routing table update. This is triggered whenever there is any change to state of a replicas in the cluster.
* On this trigger, we can look up the latest states of all the replicas from the routing table snapshot {@link org.apache.helix.spectator.RoutingTableSnapshot}
* with the help of various APIs provided in its class.
*/
default void onRoutingTableChange() {
}
}
Expand Up @@ -208,6 +208,13 @@ public void onRoutingTableChange(RoutingTableSnapshot routingTableSnapshot, Obje
} else {
logger.info("Routing table change triggered from {}", dcName);
}

//we should notify routing table change indication to different cluster map change listeners (i.e replication manager, partition selection helper, etc). However, only replication manager handles this callback now.
//On receiving this notification, listeners can query the latest state information of replicas by querying APIs provided in class RoutingTableSnapshot
for (ClusterMapChangeListener listener : clusterMapChangeListeners) {
listener.onRoutingTableChange();
}

helixClusterManagerMetrics.routingTableChangeTriggerCount.inc();
}

Expand Down
@@ -0,0 +1,153 @@
/**
* Copyright 2020 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.github.ambry.replication;

import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.clustermap.ReplicaState;
import com.github.ambry.server.StoreManager;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.github.ambry.clustermap.ClusterMapUtils.*;


/**
* Maintains the list of leader partitions on local node and their corresponding peer leaders in remote data centers
*/
class PartitionLeaderInfo {

private final Map<String, Set<ReplicaId>> peerLeaderReplicasByPartition = new ConcurrentHashMap<>();
private final StoreManager storeManager;
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private static final Logger logger = LoggerFactory.getLogger(PartitionLeaderInfo.class);

public PartitionLeaderInfo(StoreManager storeManager) {
this.storeManager = storeManager;

// We can't initialize the peerLeaderReplicasByPartition here because we don't know the leader partitions on local node (server) until it has finished participating with Helix.
// peerLeaderReplicasByPartition map will be updated after server participates with Helix and state of replicas transition to LEADER (via onPartitionBecomeLeaderFromStandby())
}

/**
* Get a map of partitions to their sets of peer leader replicas (this method is only by ReplicationTest for now)
* @return an unmodifiable map of peer leader replicas stored by partition {@link PartitionLeaderInfo#peerLeaderReplicasByPartition}
*/
public Map<String, Set<ReplicaId>> getPeerLeaderReplicasByPartition() {
return Collections.unmodifiableMap(peerLeaderReplicasByPartition);
}

/**
* Add a leader partition and its set of peer leader replicas. This method is thread safe.
* @param partitionName name of the partition to be added
*/
public void addPartition(String partitionName) {

// 1. get local replica from store manager
ReplicaId localReplica = storeManager.getReplica(partitionName);

// Read-write lock avoids contention from threads removing old leader partitions (removePartition()) and threads updating existing leader partitions (refreshPeerLeadersForAllPartitions())
rwLock.writeLock().lock();
try {
// 2. Get the peer leader replicas from all data centers for this partition
List<? extends ReplicaId> leaderReplicas =
localReplica.getPartitionId().getReplicaIdsByState(ReplicaState.LEADER, null);

// 3. Log the list of leader replicas associated with this partition (will be used later for leadership based replication)
Set<ReplicaId> peerLeaderReplicas = new HashSet<>();
for (ReplicaId leaderReplica : leaderReplicas) {
if (leaderReplica.getDataNodeId() != localReplica.getDataNodeId()) {
peerLeaderReplicas.add(leaderReplica);
logger.info("Partition {} on node instance {} is leader in remote dc {}", partitionName,
getInstanceName(leaderReplica.getDataNodeId().getHostname(), leaderReplica.getDataNodeId().getPort()),
leaderReplica.getDataNodeId().getDatacenterName());
}
}

peerLeaderReplicasByPartition.put(partitionName, peerLeaderReplicas);
} finally {
rwLock.writeLock().unlock();
}
}

/**
* Remove a partition from the map of leader partitions. This method is thread safe.
* @param partitionName name of the partition to be removed
*/
public void removePartition(String partitionName) {
// Read-write lock avoids contention from threads adding new leaders (addPartition()) and threads updating existing leader partitions (refreshPeerLeadersForAllPartitions())
rwLock.writeLock().lock();
try {
peerLeaderReplicasByPartition.remove(partitionName);
} finally {
rwLock.writeLock().unlock();
}
}

/**
* Refreshes the list of remote leaders for all leader partitions by querying the latest information from RoutingTableSnapshots of all data centers.
* This method is thread safe.
*/
public void refreshPeerLeadersForAllPartitions() {

// Read-write lock usage: Avoids contention between threads doing the following activities:
// 1. Adding new leaders (in addPeerLeadersByPartition())
// 2. Removing old leaders (in removePartition())
// 3. Refreshing remote leader set for existing leaders (current method).
// Explanation for point 3: Multiple threads from different cluster change handlers (we have one cluster change handler for each DC) can trigger onRoutingTableUpdate() in parallel which calls this method to refresh leader partitions.
// We need to make sure that the sequence of gathering remote leaders (from RoutingTableSnapshot of each DC) and updating the map is an atomic operation.

rwLock.writeLock().lock();
try {
for (Map.Entry<String, Set<ReplicaId>> entry : peerLeaderReplicasByPartition.entrySet()) {
String partitionName = entry.getKey();
ReplicaId localLeaderReplica = storeManager.getReplica(partitionName);
PartitionId partition = localLeaderReplica.getPartitionId();
Set<ReplicaId> previousRemoteLeaderReplicas = entry.getValue();
Set<ReplicaId> currentRemoteLeaderReplicas =
new HashSet<>(partition.getReplicaIdsByState(ReplicaState.LEADER, null));
currentRemoteLeaderReplicas.remove(localLeaderReplica);
if (!previousRemoteLeaderReplicas.equals(currentRemoteLeaderReplicas)) {
peerLeaderReplicasByPartition.put(partitionName, currentRemoteLeaderReplicas);
}
}
} finally {
rwLock.writeLock().unlock();
}
}

/**
* Checks if a remote replica is a leader for a partition (Pre-requisite: the partition itself should be a leader locally).
* @param partitionName name of local leader partition
* @param replicaId remote replica to be checked
* @return true if remote replica is a leader for a partition (Pre-requisite: the partition itself should be a leader locally).
*/
public boolean isPeerReplicaLeaderForPartition(String partitionName, ReplicaId replicaId) {
rwLock.readLock().lock();
try {
return peerLeaderReplicasByPartition.getOrDefault(partitionName, Collections.emptySet()).contains(replicaId);
} finally {
rwLock.readLock().unlock();
}
}
}
Expand Up @@ -80,10 +80,10 @@ public abstract class ReplicationEngine implements ReplicationAPI {
protected final Logger logger = LoggerFactory.getLogger(getClass());
protected final Map<PartitionId, PartitionInfo> partitionToPartitionInfo;
protected final Map<String, Set<PartitionInfo>> mountPathToPartitionInfos;
protected final Map<String, List<ReplicaId>> peerLeaderReplicasByPartition;
protected final ReplicaSyncUpManager replicaSyncUpManager;
protected final StoreManager storeManager;
protected ReplicaTokenPersistor persistor = null;
protected final PartitionLeaderInfo partitionLeaderInfo;

protected static final short Replication_Delay_Multiplier = 5;
protected static final String replicaTokenFileName = "replicaTokens";
Expand Down Expand Up @@ -120,7 +120,7 @@ public ReplicationEngine(ReplicationConfig replicationConfig, ClusterMapConfig c
this.transformerClassName = transformerClassName;
this.storeManager = storeManager;
replicaSyncUpManager = clusterParticipant == null ? null : clusterParticipant.getReplicaSyncUpManager();
peerLeaderReplicasByPartition = new ConcurrentHashMap<>();
partitionLeaderInfo = new PartitionLeaderInfo(storeManager);
}

/**
Expand Down
Expand Up @@ -37,9 +37,7 @@
import com.github.ambry.utils.Utils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand All @@ -49,7 +47,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import static com.github.ambry.clustermap.ClusterMapUtils.*;
import static com.github.ambry.clustermap.StateTransitionException.TransitionErrorCode.*;


Expand Down Expand Up @@ -246,14 +243,6 @@ private void updatePartitionInfoMaps(List<RemoteReplicaInfo> remoteReplicaInfos,
.add(partitionInfo);
}

/**
* Get a map of partition to list of peer leader replicas
* @return an unmodifiable map of peer leader replicas stored by partition {@link ReplicationEngine#peerLeaderReplicasByPartition}
*/
Map<String, List<ReplicaId>> getPeerLeaderReplicasByPartition() {
return Collections.unmodifiableMap(peerLeaderReplicasByPartition);
}

/**
* Implementation of {@link ClusterMapChangeListener} that helps replication manager react to cluster map changes.
*/
Expand Down Expand Up @@ -331,6 +320,28 @@ public void onReplicaAddedOrRemoved(List<ReplicaId> addedReplicas, List<ReplicaI
}
}
}

/**
* {@inheritDoc}
* Note that, this method should be thread-safe because multiple threads (from different cluster change handlers) may
* concurrently call this method.
*/
@Override
public void onRoutingTableChange() {

// wait for start() to complete
try {
startupLatch.await();
} catch (InterruptedException e) {
logger.warn("Waiting for startup is interrupted.");
throw new IllegalStateException(
"Replication manager startup is interrupted while handling routing table change");
}

// Refreshes the remote leader information for all local leader partitions maintained in an in-mem structure in PartitionLeaderInfo.
// Thread safety is ensured in the method PartitionLeaderInfo.refreshPeerLeadersForAllPartitions().
partitionLeaderInfo.refreshPeerLeadersForAllPartitions();
}
}

/**
Expand Down Expand Up @@ -386,34 +397,18 @@ public void onPartitionBecomeStandbyFromBootstrap(String partitionName) {
public void onPartitionBecomeLeaderFromStandby(String partitionName) {
logger.info("Partition state change notification from Standby to Leader received for partition {}",
partitionName);
//Changes for leader based replication - for now, we just log the list of peer leader replicas
// 1. get replica ID of current node from store manager
ReplicaId localReplica = storeManager.getReplica(partitionName);

// 2. Get the peer leader replicas from all data centers for this partition
List<? extends ReplicaId> leaderReplicas =
localReplica.getPartitionId().getReplicaIdsByState(ReplicaState.LEADER, null);

// 3. Log the list of leader replicas associated with this partition (will be used later for leadership based replication)
List<ReplicaId> peerLeaderReplicas = new ArrayList<>();
for (ReplicaId leaderReplica : leaderReplicas) {
if (leaderReplica.getDataNodeId() != localReplica.getDataNodeId()) {
peerLeaderReplicas.add(leaderReplica);
logger.info("Partition {} on node instance {} is leader in remote dc {}", partitionName,
getInstanceName(leaderReplica.getDataNodeId().getHostname(), leaderReplica.getDataNodeId().getPort()),
leaderReplica.getDataNodeId().getDatacenterName());
}
}
peerLeaderReplicasByPartition.put(partitionName, peerLeaderReplicas);
// Add the leader partition (and its remote leaders) information to an in-mem structure maintained in PartitionLeaderInfo.
// PartitionLeaderInfo::addPartition is thread safe.
partitionLeaderInfo.addPartition(partitionName);
}

@Override
public void onPartitionBecomeStandbyFromLeader(String partitionName) {
logger.info("Partition state change notification from Leader to Standby received for partition {}",
partitionName);
if (peerLeaderReplicasByPartition.containsKey(partitionName)) {
peerLeaderReplicasByPartition.remove((partitionName));
}
// Remove the leader partition from an in-mem structure maintained in PartitionLeaderInfo.
// PartitionLeaderInfo::removePartition is thread safe.
partitionLeaderInfo.removePartition(partitionName);
}

@Override
Expand Down

0 comments on commit 56f0385

Please sign in to comment.