diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index 4566e10304..0f17793435 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -2114,4 +2114,10 @@ private ConfigKeys() { */ public static final String SERVER_RECORD_LEVEL_METRICS_WHEN_BOOTSTRAPPING_CURRENT_VERSION_ENABLED = "server.record.level.metrics.when.bootstrapping.current.version.enabled"; + + /** + * Time interval for checking dangling topics between 2 different types of pub sub backends. + */ + public static final String CONTROLLER_DANGLING_TOPIC_CLEAN_UP_INTERVAL_SECOND = + "controller.dangling.topic.clean.up.interval.second"; } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceController.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceController.java index b9c1edb966..0308d8fda1 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceController.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceController.java @@ -190,7 +190,8 @@ private void createServices() { admin, multiClusterConfigs, pubSubTopicRepository, - new TopicCleanupServiceStats(metricsRepository)); + new TopicCleanupServiceStats(metricsRepository), + pubSubClientsFactory); if (!(admin instanceof VeniceParentHelixAdmin)) { throw new VeniceException( "'VeniceParentHelixAdmin' is expected of the returned 'Admin' from 'VeniceControllerService#getVeniceHelixAdmin' in parent mode"); @@ -210,7 +211,8 @@ private void createServices() { admin, multiClusterConfigs, pubSubTopicRepository, - new TopicCleanupServiceStats(metricsRepository)); + new TopicCleanupServiceStats(metricsRepository), + pubSubClientsFactory); if (!(admin instanceof VeniceHelixAdmin)) { throw new VeniceException( "'VeniceHelixAdmin' is expected of the returned 'Admin' from 'VeniceControllerService#getVeniceHelixAdmin' in child mode"); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java index 4411975511..6ee4bb3e7a 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java @@ -32,6 +32,7 @@ import static com.linkedin.venice.ConfigKeys.CONTROLLER_CLUSTER_LEADER_HAAS; import static com.linkedin.venice.ConfigKeys.CONTROLLER_CLUSTER_REPLICA; import static com.linkedin.venice.ConfigKeys.CONTROLLER_CLUSTER_ZK_ADDRESSS; +import static com.linkedin.venice.ConfigKeys.CONTROLLER_DANGLING_TOPIC_CLEAN_UP_INTERVAL_SECOND; import static com.linkedin.venice.ConfigKeys.CONTROLLER_DISABLED_REPLICA_ENABLER_INTERVAL_MS; import static com.linkedin.venice.ConfigKeys.CONTROLLER_DISABLED_ROUTES; import static com.linkedin.venice.ConfigKeys.CONTROLLER_DISABLE_PARENT_TOPIC_TRUNCATION_UPON_COMPLETION; @@ -322,6 +323,8 @@ public class VeniceControllerConfig extends VeniceControllerClusterConfig { private final boolean useDaVinciSpecificExecutionStatusForError; private final PubSubClientsFactory pubSubClientsFactory; + private final long danglingTopicCleanupIntervalSeconds; + public VeniceControllerConfig(VeniceProperties props) { super(props); this.adminPort = props.getInt(ADMIN_PORT); @@ -560,6 +563,7 @@ public VeniceControllerConfig(VeniceProperties props) { this.useDaVinciSpecificExecutionStatusForError = props.getBoolean(USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR, false); this.pubSubClientsFactory = new PubSubClientsFactory(props); + this.danglingTopicCleanupIntervalSeconds = props.getLong(CONTROLLER_DANGLING_TOPIC_CLEAN_UP_INTERVAL_SECOND, 3600); } private void validateActiveActiveConfigs() { @@ -1133,6 +1137,10 @@ private static Map parseChildDataCenterToValue( return outputMap; } + public long getDanglingTopicCleanupIntervalSeconds() { + return danglingTopicCleanupIntervalSeconds; + } + /** * A function that would put a k/v pair into a map with some processing works. */ diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java index 7e7c033ae6..d8714f2859 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java @@ -270,8 +270,11 @@ public int getGraveyardCleanupSleepIntervalBetweenListFetchMinutes() { return getCommonConfig().getStoreGraveyardCleanupSleepIntervalBetweenListFetchMinutes(); } - // TODO: Remove this method once we fully support cluster-level pub sub adapter configuration. public PubSubClientsFactory getPubSubClientsFactory() { return getCommonConfig().getPubSubClientsFactory(); } + + public long getDanglingTopicCleanupIntervalSeconds() { + return getCommonConfig().getDanglingTopicCleanupIntervalSeconds(); + } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/TopicCleanupService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/TopicCleanupService.java index 5ece17447e..ee4ff5e474 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/TopicCleanupService.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/TopicCleanupService.java @@ -5,13 +5,21 @@ import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig; import com.linkedin.venice.controller.stats.TopicCleanupServiceStats; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.exceptions.VeniceNoStoreException; +import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter; +import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapterFactory; +import com.linkedin.venice.pubsub.api.PubSubAdminAdapter; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.manager.TopicManager; import com.linkedin.venice.service.AbstractVeniceService; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.utils.VeniceProperties; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -74,13 +82,17 @@ public class TopicCleanupService extends AbstractVeniceService { private boolean isRTTopicDeletionBlocked = false; private boolean isLeaderControllerOfControllerCluster = false; private long refreshQueueCycle = Time.MS_PER_MINUTE; + private Optional apacheKafkaAdminAdapter; + private long recentDanglingTopicCleanupTime = -1L; + private final long danglingTopicCleanupIntervalMs; protected final VeniceControllerMultiClusterConfig multiClusterConfigs; public TopicCleanupService( Admin admin, VeniceControllerMultiClusterConfig multiClusterConfigs, PubSubTopicRepository pubSubTopicRepository, - TopicCleanupServiceStats topicCleanupServiceStats) { + TopicCleanupServiceStats topicCleanupServiceStats, + PubSubClientsFactory pubSubClientsFactory) { this.admin = admin; this.sleepIntervalBetweenTopicListFetchMs = multiClusterConfigs.getTopicCleanupSleepIntervalBetweenTopicListFetchMs(); @@ -101,6 +113,28 @@ public TopicCleanupService( } else { this.multiDataCenterStoreToVersionTopicCount = Collections.emptyMap(); } + + this.danglingTopicCleanupIntervalMs = + Time.MS_PER_SECOND * multiClusterConfigs.getDanglingTopicCleanupIntervalSeconds(); + if (!pubSubClientsFactory.getAdminAdapterFactory().getClass().equals(ApacheKafkaAdminAdapterFactory.class) + && danglingTopicCleanupIntervalMs > 0) { + this.apacheKafkaAdminAdapter = Optional.of(constructApacheKafkaAdminAdapter()); + } else { + this.apacheKafkaAdminAdapter = Optional.empty(); + } + } + + private PubSubAdminAdapter constructApacheKafkaAdminAdapter() { + VeniceProperties veniceProperties = admin.getPubSubSSLProperties(getTopicManager().getPubSubClusterAddress()); + ApacheKafkaAdminAdapterFactory apacheKafkaAdminAdapterFactory = new ApacheKafkaAdminAdapterFactory(); + PubSubAdminAdapter apacheKafkaAdminAdapter = + apacheKafkaAdminAdapterFactory.create(veniceProperties, pubSubTopicRepository); + return apacheKafkaAdminAdapter; + } + + // For test purpose + void setApacheKafkaAdminAdapter(ApacheKafkaAdminAdapter apacheKafkaAdminAdapter) { + this.apacheKafkaAdminAdapter = Optional.of(apacheKafkaAdminAdapter); } @Override @@ -248,6 +282,17 @@ private void populateDeprecatedTopicQueue(PriorityQueue topics) { if (realTimeTopicDeletionNeeded.get() && !multiDataCenterStoreToVersionTopicCount.isEmpty()) { refreshMultiDataCenterStoreToVersionTopicCountMap(topicsWithRetention.keySet()); } + + // Check if there is dangling topics to be deleted. + if (apacheKafkaAdminAdapter.isPresent() + && System.currentTimeMillis() - danglingTopicCleanupIntervalMs > recentDanglingTopicCleanupTime) { + List pubSubTopics = collectDanglingTopics(topicsWithRetention); + if (!pubSubTopics.isEmpty()) { + LOGGER.info("Find topic discrepancy: topics not in kafka but in xinfra {}", pubSubTopics); + } + recentDanglingTopicCleanupTime = System.currentTimeMillis(); + topics.addAll(pubSubTopics); + } } private void refreshMultiDataCenterStoreToVersionTopicCountMap(Set localTopics) { @@ -401,4 +446,40 @@ public static List extractVersionTopicsToCleanup( }) .collect(Collectors.toList()); } + + private List collectDanglingTopics(Map pubSubTopicsRetentions) { + List topicsTobeCleanup = new ArrayList<>(); + Set kafkaTopics = apacheKafkaAdminAdapter.get().listAllTopics(); + for (Map.Entry entry: pubSubTopicsRetentions.entrySet()) { + PubSubTopic pubSubTopic = entry.getKey(); + if (!kafkaTopics.contains(pubSubTopic)) { + try { + String storeName = pubSubTopic.getStoreName(); + String clusterDiscovered = admin.discoverCluster(storeName).getFirst(); + Store store = admin.getStore(clusterDiscovered, storeName); + LOGGER.info("Find topic discrepancy case: {}", pubSubTopic); + if (pubSubTopic.isRealTime()) { + if (!store.isHybrid()) { + LOGGER.info("Will remove real-time dangling topic {}", pubSubTopic); + topicsTobeCleanup.add(pubSubTopic); + } + } else if (pubSubTopic.isVersionTopicOrStreamReprocessingTopic() || pubSubTopic.isViewTopic()) { + int versionNum = Version.parseVersionFromKafkaTopicName(pubSubTopic.getName()); + if (!store.containsVersion(versionNum)) { + LOGGER.info("Will remove dangling version topic {}", pubSubTopic); + topicsTobeCleanup.add(pubSubTopic); + } + } + } catch (Exception e) { + if (e instanceof VeniceNoStoreException) { + LOGGER.info("No store is found for topic: {}", pubSubTopic); + topicsTobeCleanup.add(pubSubTopic); + } else { + LOGGER.error("Error happened during checking dangling topic: {}", pubSubTopic, e); + } + } + } + } + return topicsTobeCleanup; + } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/TopicCleanupServiceForParentController.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/TopicCleanupServiceForParentController.java index da029874aa..10f4d12022 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/TopicCleanupServiceForParentController.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/TopicCleanupServiceForParentController.java @@ -4,6 +4,7 @@ import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig; import com.linkedin.venice.controller.stats.TopicCleanupServiceStats; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.manager.TopicManager; @@ -26,8 +27,9 @@ public TopicCleanupServiceForParentController( Admin admin, VeniceControllerMultiClusterConfig multiClusterConfigs, PubSubTopicRepository pubSubTopicRepository, - TopicCleanupServiceStats topicCleanupServiceStats) { - super(admin, multiClusterConfigs, pubSubTopicRepository, topicCleanupServiceStats); + TopicCleanupServiceStats topicCleanupServiceStats, + PubSubClientsFactory pubSubClientsFactory) { + super(admin, multiClusterConfigs, pubSubTopicRepository, topicCleanupServiceStats, pubSubClientsFactory); } @Override diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupService.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupService.java index 3a83b5474d..e7b678f787 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupService.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupService.java @@ -1,9 +1,11 @@ package com.linkedin.venice.controller.kafka; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -17,19 +19,26 @@ import com.linkedin.venice.controller.stats.TopicCleanupServiceStats; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.HelixReadOnlyStoreConfigRepository; +import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreConfig; import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter; +import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapterFactory; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.manager.TopicManager; +import com.linkedin.venice.utils.Pair; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -48,6 +57,7 @@ public class TestTopicCleanupService { private VeniceControllerMultiClusterConfig veniceControllerMultiClusterConfig; private TopicCleanupServiceStats topicCleanupServiceStats; private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); + private final PubSubClientsFactory pubSubClientsFactory = mock(PubSubClientsFactory.class); @BeforeMethod public void setUp() { @@ -76,11 +86,13 @@ public void setUp() { doReturn(Collections.emptyMap()).when(remoteTopicManager).getAllTopicRetentions(); topicCleanupServiceStats = mock(TopicCleanupServiceStats.class); + doReturn(new ApacheKafkaAdminAdapterFactory()).when(pubSubClientsFactory).getAdminAdapterFactory(); topicCleanupService = new TopicCleanupService( admin, veniceControllerMultiClusterConfig, pubSubTopicRepository, - topicCleanupServiceStats); + topicCleanupServiceStats, + pubSubClientsFactory); } @AfterMethod @@ -183,6 +195,7 @@ public void testCleanupVeniceTopics() throws ExecutionException { storeTopics.put(getPubSubTopic(storeName2, "_rt"), 1000L); storeTopics.put(getPubSubTopic(storeName2, "_v1"), 1000L); storeTopics.put(getPubSubTopic(storeName3, "_rt"), 1000L); + storeTopics.put(getPubSubTopic(storeName3, "_v100"), Long.MAX_VALUE); Map storeTopics2 = new HashMap<>(); storeTopics2.put(getPubSubTopic(storeName1, "_v3"), Long.MAX_VALUE); @@ -205,6 +218,24 @@ public void testCleanupVeniceTopics() throws ExecutionException { doReturn(true).when(admin).isTopicTruncatedBasedOnRetention(1000L); doReturn(Optional.of(new StoreConfig(storeName1))).when(storeConfigRepository).getStoreConfig(storeName1); + Set pubSubTopicSet = new HashSet<>(); + pubSubTopicSet.addAll(storeTopics.keySet()); + pubSubTopicSet.remove(getPubSubTopic(storeName3, "_v100")); + + ApacheKafkaAdminAdapterFactory apacheKafkaAdminAdapterFactory = mock(ApacheKafkaAdminAdapterFactory.class); + ApacheKafkaAdminAdapter apacheKafkaAdminAdapter = mock(ApacheKafkaAdminAdapter.class); + doReturn(apacheKafkaAdminAdapter).when(apacheKafkaAdminAdapterFactory).create(any(), eq(pubSubTopicRepository)); + + topicCleanupService.setApacheKafkaAdminAdapter(apacheKafkaAdminAdapter); + String clusterName = "clusterName"; + Pair pair = new Pair<>(clusterName, ""); + doReturn(pair).when(admin).discoverCluster(storeName3); + + Store store3 = mock(Store.class); + doReturn(false).when(store3).containsVersion(100); + doReturn(store3).when(admin).getStore(clusterName, storeName3); + doReturn(pubSubTopicSet).when(apacheKafkaAdminAdapter).listAllTopics(); + topicCleanupService.cleanupVeniceTopics(); verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName1, "_rt")); @@ -217,10 +248,12 @@ public void testCleanupVeniceTopics() throws ExecutionException { verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName2, "_rt")); // Delete should be blocked by remote VT verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName3, "_rt")); - verify(topicCleanupServiceStats, atLeastOnce()).recordDeletableTopicsCount(5); + verify(topicCleanupServiceStats, atLeastOnce()).recordDeletableTopicsCount(6); verify(topicCleanupServiceStats, never()).recordTopicDeletionError(); verify(topicCleanupServiceStats, atLeastOnce()).recordTopicDeleted(); + verify(topicManager, atLeastOnce()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName3, "_v100")); + topicCleanupService.cleanupVeniceTopics(); verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName1, "_v3")); @@ -384,7 +417,8 @@ public void testCleanVeniceTopicsBlockRTTopicDeletionWhenMisconfigured() { admin, veniceControllerMultiClusterConfig, pubSubTopicRepository, - topicCleanupServiceStats); + topicCleanupServiceStats, + pubSubClientsFactory); String storeName = Utils.getUniqueString("testStore"); Map storeTopics = new HashMap<>(); storeTopics.put(getPubSubTopic(storeName, "_rt"), 1000L); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForMultiKafkaClusters.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForMultiKafkaClusters.java index 1fe3889f68..71db908e7a 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForMultiKafkaClusters.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForMultiKafkaClusters.java @@ -9,6 +9,7 @@ import com.linkedin.venice.controller.VeniceControllerConfig; import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig; import com.linkedin.venice.controller.stats.TopicCleanupServiceStats; +import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.manager.TopicManager; @@ -28,6 +29,7 @@ public class TestTopicCleanupServiceForMultiKafkaClusters { private TopicCleanupServiceForParentController topicCleanupService; private PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); + private final PubSubClientsFactory pubSubClientsFactory = mock(PubSubClientsFactory.class); @BeforeTest public void setUp() { @@ -62,8 +64,12 @@ public void setUp() { doReturn(topicManager2).when(admin).getTopicManager(kafkaClusterServerUrl2); TopicCleanupServiceStats topicCleanupServiceStats = mock(TopicCleanupServiceStats.class); - topicCleanupService = - new TopicCleanupServiceForParentController(admin, config, pubSubTopicRepository, topicCleanupServiceStats); + topicCleanupService = new TopicCleanupServiceForParentController( + admin, + config, + pubSubTopicRepository, + topicCleanupServiceStats, + pubSubClientsFactory); } @Test diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForParentController.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForParentController.java index 2aa66cb180..4ea6251965 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForParentController.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForParentController.java @@ -9,6 +9,7 @@ import com.linkedin.venice.controller.VeniceControllerConfig; import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig; import com.linkedin.venice.controller.stats.TopicCleanupServiceStats; +import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.manager.TopicManager; @@ -24,6 +25,7 @@ public class TestTopicCleanupServiceForParentController { private TopicManager topicManager; private TopicCleanupService topicCleanupService; private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); + private final PubSubClientsFactory pubSubClientsFactory = mock(PubSubClientsFactory.class); @BeforeTest public void setUp() { @@ -37,8 +39,12 @@ public void setUp() { doReturn(veniceControllerConfig).when(config).getCommonConfig(); doReturn("dc1").when(veniceControllerConfig).getChildDatacenters(); TopicCleanupServiceStats topicCleanupServiceStats = mock(TopicCleanupServiceStats.class); - topicCleanupService = - new TopicCleanupServiceForParentController(admin, config, pubSubTopicRepository, topicCleanupServiceStats); + topicCleanupService = new TopicCleanupServiceForParentController( + admin, + config, + pubSubTopicRepository, + topicCleanupServiceStats, + pubSubClientsFactory); } @Test