Skip to content

Commit

Permalink
Support dangling topic check and deletion in topic cleanup service.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao Xu committed May 1, 2024
1 parent b2301ac commit ae70258
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2114,4 +2114,10 @@ private ConfigKeys() {
*/
public static final String SERVER_RECORD_LEVEL_METRICS_WHEN_BOOTSTRAPPING_CURRENT_VERSION_ENABLED =
"server.record.level.metrics.when.bootstrapping.current.version.enabled";

/**
* Time interval for checking dangling topics between 2 different types of pub sub backends.
*/
public static final String CONTROLLER_DANGLING_TOPIC_CLEAN_UP_INTERVAL_SECOND =
"controller.dangling.topic.clean.up.interval.second";
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ private void createServices() {
admin,
multiClusterConfigs,
pubSubTopicRepository,
new TopicCleanupServiceStats(metricsRepository));
new TopicCleanupServiceStats(metricsRepository),
pubSubClientsFactory);
if (!(admin instanceof VeniceParentHelixAdmin)) {
throw new VeniceException(
"'VeniceParentHelixAdmin' is expected of the returned 'Admin' from 'VeniceControllerService#getVeniceHelixAdmin' in parent mode");
Expand All @@ -210,7 +211,8 @@ private void createServices() {
admin,
multiClusterConfigs,
pubSubTopicRepository,
new TopicCleanupServiceStats(metricsRepository));
new TopicCleanupServiceStats(metricsRepository),
pubSubClientsFactory);
if (!(admin instanceof VeniceHelixAdmin)) {
throw new VeniceException(
"'VeniceHelixAdmin' is expected of the returned 'Admin' from 'VeniceControllerService#getVeniceHelixAdmin' in child mode");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static com.linkedin.venice.ConfigKeys.CONTROLLER_CLUSTER_LEADER_HAAS;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_CLUSTER_REPLICA;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_CLUSTER_ZK_ADDRESSS;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DANGLING_TOPIC_CLEAN_UP_INTERVAL_SECOND;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DISABLED_REPLICA_ENABLER_INTERVAL_MS;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DISABLED_ROUTES;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DISABLE_PARENT_TOPIC_TRUNCATION_UPON_COMPLETION;
Expand Down Expand Up @@ -322,6 +323,8 @@ public class VeniceControllerConfig extends VeniceControllerClusterConfig {
private final boolean useDaVinciSpecificExecutionStatusForError;
private final PubSubClientsFactory pubSubClientsFactory;

private final long danglingTopicCleanupIntervalSeconds;

public VeniceControllerConfig(VeniceProperties props) {
super(props);
this.adminPort = props.getInt(ADMIN_PORT);
Expand Down Expand Up @@ -560,6 +563,7 @@ public VeniceControllerConfig(VeniceProperties props) {
this.useDaVinciSpecificExecutionStatusForError =
props.getBoolean(USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR, false);
this.pubSubClientsFactory = new PubSubClientsFactory(props);
this.danglingTopicCleanupIntervalSeconds = props.getLong(CONTROLLER_DANGLING_TOPIC_CLEAN_UP_INTERVAL_SECOND, 3600);
}

private void validateActiveActiveConfigs() {
Expand Down Expand Up @@ -1133,6 +1137,10 @@ private static Map<String, String> parseChildDataCenterToValue(
return outputMap;
}

public long getDanglingTopicCleanupIntervalSeconds() {
return danglingTopicCleanupIntervalSeconds;
}

/**
* A function that would put a k/v pair into a map with some processing works.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,11 @@ public int getGraveyardCleanupSleepIntervalBetweenListFetchMinutes() {
return getCommonConfig().getStoreGraveyardCleanupSleepIntervalBetweenListFetchMinutes();
}

// TODO: Remove this method once we fully support cluster-level pub sub adapter configuration.
public PubSubClientsFactory getPubSubClientsFactory() {
return getCommonConfig().getPubSubClientsFactory();
}

public long getDanglingTopicCleanupIntervalSeconds() {
return getCommonConfig().getDanglingTopicCleanupIntervalSeconds();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@
import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig;
import com.linkedin.venice.controller.stats.TopicCleanupServiceStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter;
import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubAdminAdapter;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.manager.TopicManager;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -74,13 +82,17 @@ public class TopicCleanupService extends AbstractVeniceService {
private boolean isRTTopicDeletionBlocked = false;
private boolean isLeaderControllerOfControllerCluster = false;
private long refreshQueueCycle = Time.MS_PER_MINUTE;
private Optional<PubSubAdminAdapter> apacheKafkaAdminAdapter;
private long recentDanglingTopicCleanupTime = -1L;
private final long danglingTopicCleanupIntervalMs;
protected final VeniceControllerMultiClusterConfig multiClusterConfigs;

public TopicCleanupService(
Admin admin,
VeniceControllerMultiClusterConfig multiClusterConfigs,
PubSubTopicRepository pubSubTopicRepository,
TopicCleanupServiceStats topicCleanupServiceStats) {
TopicCleanupServiceStats topicCleanupServiceStats,
PubSubClientsFactory pubSubClientsFactory) {
this.admin = admin;
this.sleepIntervalBetweenTopicListFetchMs =
multiClusterConfigs.getTopicCleanupSleepIntervalBetweenTopicListFetchMs();
Expand All @@ -101,6 +113,28 @@ public TopicCleanupService(
} else {
this.multiDataCenterStoreToVersionTopicCount = Collections.emptyMap();
}

this.danglingTopicCleanupIntervalMs =
Time.MS_PER_SECOND * multiClusterConfigs.getDanglingTopicCleanupIntervalSeconds();
if (!pubSubClientsFactory.getAdminAdapterFactory().getClass().equals(ApacheKafkaAdminAdapterFactory.class)
&& danglingTopicCleanupIntervalMs > 0) {
this.apacheKafkaAdminAdapter = Optional.of(constructApacheKafkaAdminAdapter());
} else {
this.apacheKafkaAdminAdapter = Optional.empty();
}
}

private PubSubAdminAdapter constructApacheKafkaAdminAdapter() {
VeniceProperties veniceProperties = admin.getPubSubSSLProperties(getTopicManager().getPubSubClusterAddress());
ApacheKafkaAdminAdapterFactory apacheKafkaAdminAdapterFactory = new ApacheKafkaAdminAdapterFactory();
PubSubAdminAdapter apacheKafkaAdminAdapter =
apacheKafkaAdminAdapterFactory.create(veniceProperties, pubSubTopicRepository);
return apacheKafkaAdminAdapter;
}

// For test purpose
void setApacheKafkaAdminAdapter(ApacheKafkaAdminAdapter apacheKafkaAdminAdapter) {
this.apacheKafkaAdminAdapter = Optional.of(apacheKafkaAdminAdapter);
}

@Override
Expand Down Expand Up @@ -248,6 +282,17 @@ private void populateDeprecatedTopicQueue(PriorityQueue<PubSubTopic> topics) {
if (realTimeTopicDeletionNeeded.get() && !multiDataCenterStoreToVersionTopicCount.isEmpty()) {
refreshMultiDataCenterStoreToVersionTopicCountMap(topicsWithRetention.keySet());
}

// Check if there is dangling topics to be deleted.
if (apacheKafkaAdminAdapter.isPresent()
&& System.currentTimeMillis() - danglingTopicCleanupIntervalMs > recentDanglingTopicCleanupTime) {
List<PubSubTopic> pubSubTopics = collectDanglingTopics(topicsWithRetention);
if (!pubSubTopics.isEmpty()) {
LOGGER.info("Find topic discrepancy: topics not in kafka but in xinfra {}", pubSubTopics);
}
recentDanglingTopicCleanupTime = System.currentTimeMillis();
topics.addAll(pubSubTopics);
}
}

private void refreshMultiDataCenterStoreToVersionTopicCountMap(Set<PubSubTopic> localTopics) {
Expand Down Expand Up @@ -401,4 +446,40 @@ public static List<PubSubTopic> extractVersionTopicsToCleanup(
})
.collect(Collectors.toList());
}

private List<PubSubTopic> collectDanglingTopics(Map<PubSubTopic, Long> pubSubTopicsRetentions) {
List<PubSubTopic> topicsTobeCleanup = new ArrayList<>();
Set<PubSubTopic> kafkaTopics = apacheKafkaAdminAdapter.get().listAllTopics();
for (Map.Entry<PubSubTopic, Long> entry: pubSubTopicsRetentions.entrySet()) {
PubSubTopic pubSubTopic = entry.getKey();
if (!kafkaTopics.contains(pubSubTopic)) {
try {
String storeName = pubSubTopic.getStoreName();
String clusterDiscovered = admin.discoverCluster(storeName).getFirst();
Store store = admin.getStore(clusterDiscovered, storeName);
LOGGER.info("Find topic discrepancy case: {}", pubSubTopic);
if (pubSubTopic.isRealTime()) {
if (!store.isHybrid()) {
LOGGER.info("Will remove real-time dangling topic {}", pubSubTopic);
topicsTobeCleanup.add(pubSubTopic);
}
} else if (pubSubTopic.isVersionTopicOrStreamReprocessingTopic() || pubSubTopic.isViewTopic()) {
int versionNum = Version.parseVersionFromKafkaTopicName(pubSubTopic.getName());
if (!store.containsVersion(versionNum)) {
LOGGER.info("Will remove dangling version topic {}", pubSubTopic);
topicsTobeCleanup.add(pubSubTopic);
}
}
} catch (Exception e) {
if (e instanceof VeniceNoStoreException) {
LOGGER.info("No store is found for topic: {}", pubSubTopic);
topicsTobeCleanup.add(pubSubTopic);
} else {
LOGGER.error("Error happened during checking dangling topic: {}", pubSubTopic, e);
}
}
}
}
return topicsTobeCleanup;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig;
import com.linkedin.venice.controller.stats.TopicCleanupServiceStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.manager.TopicManager;
Expand All @@ -26,8 +27,9 @@ public TopicCleanupServiceForParentController(
Admin admin,
VeniceControllerMultiClusterConfig multiClusterConfigs,
PubSubTopicRepository pubSubTopicRepository,
TopicCleanupServiceStats topicCleanupServiceStats) {
super(admin, multiClusterConfigs, pubSubTopicRepository, topicCleanupServiceStats);
TopicCleanupServiceStats topicCleanupServiceStats,
PubSubClientsFactory pubSubClientsFactory) {
super(admin, multiClusterConfigs, pubSubTopicRepository, topicCleanupServiceStats, pubSubClientsFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.linkedin.venice.controller.kafka;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
Expand All @@ -17,19 +19,26 @@
import com.linkedin.venice.controller.stats.TopicCleanupServiceStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixReadOnlyStoreConfigRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreConfig;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter;
import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.manager.TopicManager;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -48,6 +57,7 @@ public class TestTopicCleanupService {
private VeniceControllerMultiClusterConfig veniceControllerMultiClusterConfig;
private TopicCleanupServiceStats topicCleanupServiceStats;
private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
private final PubSubClientsFactory pubSubClientsFactory = mock(PubSubClientsFactory.class);

@BeforeMethod
public void setUp() {
Expand Down Expand Up @@ -76,11 +86,13 @@ public void setUp() {
doReturn(Collections.emptyMap()).when(remoteTopicManager).getAllTopicRetentions();
topicCleanupServiceStats = mock(TopicCleanupServiceStats.class);

doReturn(new ApacheKafkaAdminAdapterFactory()).when(pubSubClientsFactory).getAdminAdapterFactory();
topicCleanupService = new TopicCleanupService(
admin,
veniceControllerMultiClusterConfig,
pubSubTopicRepository,
topicCleanupServiceStats);
topicCleanupServiceStats,
pubSubClientsFactory);
}

@AfterMethod
Expand Down Expand Up @@ -183,6 +195,7 @@ public void testCleanupVeniceTopics() throws ExecutionException {
storeTopics.put(getPubSubTopic(storeName2, "_rt"), 1000L);
storeTopics.put(getPubSubTopic(storeName2, "_v1"), 1000L);
storeTopics.put(getPubSubTopic(storeName3, "_rt"), 1000L);
storeTopics.put(getPubSubTopic(storeName3, "_v100"), Long.MAX_VALUE);

Map<PubSubTopic, Long> storeTopics2 = new HashMap<>();
storeTopics2.put(getPubSubTopic(storeName1, "_v3"), Long.MAX_VALUE);
Expand All @@ -205,6 +218,24 @@ public void testCleanupVeniceTopics() throws ExecutionException {
doReturn(true).when(admin).isTopicTruncatedBasedOnRetention(1000L);
doReturn(Optional.of(new StoreConfig(storeName1))).when(storeConfigRepository).getStoreConfig(storeName1);

Set<PubSubTopic> pubSubTopicSet = new HashSet<>();
pubSubTopicSet.addAll(storeTopics.keySet());
pubSubTopicSet.remove(getPubSubTopic(storeName3, "_v100"));

ApacheKafkaAdminAdapterFactory apacheKafkaAdminAdapterFactory = mock(ApacheKafkaAdminAdapterFactory.class);
ApacheKafkaAdminAdapter apacheKafkaAdminAdapter = mock(ApacheKafkaAdminAdapter.class);
doReturn(apacheKafkaAdminAdapter).when(apacheKafkaAdminAdapterFactory).create(any(), eq(pubSubTopicRepository));

topicCleanupService.setApacheKafkaAdminAdapter(apacheKafkaAdminAdapter);
String clusterName = "clusterName";
Pair<String, String> pair = new Pair<>(clusterName, "");
doReturn(pair).when(admin).discoverCluster(storeName3);

Store store3 = mock(Store.class);
doReturn(false).when(store3).containsVersion(100);
doReturn(store3).when(admin).getStore(clusterName, storeName3);
doReturn(pubSubTopicSet).when(apacheKafkaAdminAdapter).listAllTopics();

topicCleanupService.cleanupVeniceTopics();

verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName1, "_rt"));
Expand All @@ -217,10 +248,12 @@ public void testCleanupVeniceTopics() throws ExecutionException {
verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName2, "_rt"));
// Delete should be blocked by remote VT
verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName3, "_rt"));
verify(topicCleanupServiceStats, atLeastOnce()).recordDeletableTopicsCount(5);
verify(topicCleanupServiceStats, atLeastOnce()).recordDeletableTopicsCount(6);
verify(topicCleanupServiceStats, never()).recordTopicDeletionError();
verify(topicCleanupServiceStats, atLeastOnce()).recordTopicDeleted();

verify(topicManager, atLeastOnce()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName3, "_v100"));

topicCleanupService.cleanupVeniceTopics();

verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName1, "_v3"));
Expand Down Expand Up @@ -384,7 +417,8 @@ public void testCleanVeniceTopicsBlockRTTopicDeletionWhenMisconfigured() {
admin,
veniceControllerMultiClusterConfig,
pubSubTopicRepository,
topicCleanupServiceStats);
topicCleanupServiceStats,
pubSubClientsFactory);
String storeName = Utils.getUniqueString("testStore");
Map<PubSubTopic, Long> storeTopics = new HashMap<>();
storeTopics.put(getPubSubTopic(storeName, "_rt"), 1000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.venice.controller.VeniceControllerConfig;
import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig;
import com.linkedin.venice.controller.stats.TopicCleanupServiceStats;
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.manager.TopicManager;
Expand All @@ -28,6 +29,7 @@ public class TestTopicCleanupServiceForMultiKafkaClusters {
private TopicCleanupServiceForParentController topicCleanupService;

private PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
private final PubSubClientsFactory pubSubClientsFactory = mock(PubSubClientsFactory.class);

@BeforeTest
public void setUp() {
Expand Down Expand Up @@ -62,8 +64,12 @@ public void setUp() {
doReturn(topicManager2).when(admin).getTopicManager(kafkaClusterServerUrl2);
TopicCleanupServiceStats topicCleanupServiceStats = mock(TopicCleanupServiceStats.class);

topicCleanupService =
new TopicCleanupServiceForParentController(admin, config, pubSubTopicRepository, topicCleanupServiceStats);
topicCleanupService = new TopicCleanupServiceForParentController(
admin,
config,
pubSubTopicRepository,
topicCleanupServiceStats,
pubSubClientsFactory);
}

@Test
Expand Down

0 comments on commit ae70258

Please sign in to comment.