From eaf21b2434d0eb997cc436ef2b5636893cadaca4 Mon Sep 17 00:00:00 2001
From: Moritz Meister <8422705+moritzmeister@users.noreply.github.com>
Date: Wed, 24 Nov 2021 09:22:51 +0100
Subject: [PATCH] [HOPSWORKS-2826] OnlineFS replays data on cluster restart
(#709)
---
LICENSE_OF_DEPENDENCIES.md | 1 -
hopsworks-common/pom.xml | 4 -
.../hopsworks/common/kafka/KafkaBrokers.java | 17 ++-
.../kafka/ZookeeperTopicCleanerTimer.java | 119 ++++++------------
pom.xml | 6 -
5 files changed, 48 insertions(+), 99 deletions(-)
diff --git a/LICENSE_OF_DEPENDENCIES.md b/LICENSE_OF_DEPENDENCIES.md
index 6712240530..88fb276f7e 100644
--- a/LICENSE_OF_DEPENDENCIES.md
+++ b/LICENSE_OF_DEPENDENCIES.md
@@ -84,7 +84,6 @@ Frontend dependencies
Backend Dependencies
-* com.101tec:zkclient:jar:0.9 [ApacheV2](http://www.apache.org/licenses/LICENSE-2.0)
* com.google.code.gson:gson:jar:2.2 [ApacheV2](https://github.com/google/gson/blob/master/LICENSE)
* com.googlecode.json-simple:json-simple:jar:1.1.1 [ApacheV2](https://github.com/fangyidong/json-simple/blob/master/LICENSE.txt)
* com.google.guava:guava:jar:18.0 [ApacheV2](https://github.com/google/guava/blob/master/COPYING)
diff --git a/hopsworks-common/pom.xml b/hopsworks-common/pom.xml
index 61fc173ff8..e5a67e6dc9 100644
--- a/hopsworks-common/pom.xml
+++ b/hopsworks-common/pom.xml
@@ -104,10 +104,6 @@
org.apache.kafka
kafka_2.11
-
- com.101tec
- zkclient
-
org.apache.zookeeper
zookeeper
diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaBrokers.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaBrokers.java
index 3614b9b8d1..4825d41d50 100644
--- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaBrokers.java
+++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaBrokers.java
@@ -17,7 +17,8 @@
package io.hops.hopsworks.common.kafka;
import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
-import com.logicalclocks.servicediscoverclient.service.Service;
+import com.logicalclocks.servicediscoverclient.resolvers.Type;
+import com.logicalclocks.servicediscoverclient.service.ServiceQuery;
import io.hops.hopsworks.common.dao.kafka.KafkaConst;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.util.Settings;
@@ -36,6 +37,7 @@
import javax.ejb.TransactionAttributeType;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
@@ -96,9 +98,7 @@ public Optional getAnyKafkaBroker() {
@Lock(LockType.READ)
public Set getBrokerEndpoints() throws IOException, KeeperException, InterruptedException {
try {
- Service zkService = serviceDiscoveryController
- .getAnyAddressOfServiceWithDNS(ServiceDiscoveryController.HopsworksService.ZOOKEEPER_CLIENT);
- String zkConnectionString = zkService.getAddress() + ":" + zkService.getPort();
+ String zkConnectionString = getZookeeperConnectionString();
final ZooKeeper zk = new ZooKeeper(zkConnectionString, Settings.ZOOKEEPER_SESSION_TIMEOUT_MS,
new Watcher() {
@Override
@@ -130,6 +130,15 @@ public void process(WatchedEvent watchedEvent) {
}
}
+ public String getZookeeperConnectionString() throws ServiceDiscoveryException {
+ return serviceDiscoveryController.getService(
+ Type.DNS, ServiceQuery.of(
+ serviceDiscoveryController.constructServiceFQDN(ServiceDiscoveryController.HopsworksService.ZOOKEEPER_CLIENT),
+ Collections.emptySet()))
+ .map(zkServer -> zkServer.getAddress() + ":" + zkServer.getPort())
+ .collect(Collectors.joining(","));
+ }
+
private String getBrokerInfo(ZooKeeper zk, String brokerId) {
try {
return new String(zk.getData("/brokers/ids/" + brokerId, false, null));
diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/ZookeeperTopicCleanerTimer.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/ZookeeperTopicCleanerTimer.java
index e789528a95..4514f52904 100644
--- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/ZookeeperTopicCleanerTimer.java
+++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/ZookeeperTopicCleanerTimer.java
@@ -40,18 +40,10 @@
package io.hops.hopsworks.common.kafka;
import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
-import com.logicalclocks.servicediscoverclient.service.Service;
-import io.hops.hopsworks.common.dao.kafka.KafkaConst;
+import io.hops.hopsworks.common.dao.kafka.HopsKafkaAdminClient;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
+import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.persistence.entity.kafka.ProjectTopics;
-import io.hops.hopsworks.exceptions.ServiceException;
-import io.hops.hopsworks.restutils.RESTCodes;
-import kafka.admin.AdminUtils;
-import kafka.common.TopicAlreadyMarkedForDeletionException;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -64,11 +56,10 @@
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -82,6 +73,8 @@ public class ZookeeperTopicCleanerTimer {
private final static Logger LOGGER = Logger.getLogger(
ZookeeperTopicCleanerTimer.class.getName());
+ private final static String offsetTopic = "__consumer_offsets";
+
@PersistenceContext(unitName = "kthfsPU")
private EntityManager em;
@@ -89,9 +82,9 @@ public class ZookeeperTopicCleanerTimer {
private ServiceDiscoveryController serviceDiscoveryController;
@EJB
private KafkaBrokers kafkaBrokers;
+ @EJB
+ private HopsKafkaAdminClient hopsKafkaAdminClient;
- private ZkClient zkClient = null;
- private ZkConnection zkConnection = null;
private ZooKeeper zk = null;
// Run once per hour
@@ -102,34 +95,32 @@ public void execute(Timer timer) {
LOGGER.log(Level.INFO, "Running ZookeeperTopicCleanerTimer.");
try {
- String zkConnectionString = getZookeeperConnectionString();
+ String zkConnectionString = kafkaBrokers.getZookeeperConnectionString();
Set zkTopics = new HashSet<>();
- //30 seconds
- int sessionTimeoutMs = 30 * 1000;
try {
- if (zk == null || !zk.getState().isConnected()) {
- if (zk != null) {
- zk.close();
- }
- zk = new ZooKeeper(zkConnectionString, sessionTimeoutMs, new ZookeeperWatcher());
- }
+ zk = new ZooKeeper(zkConnectionString, Settings.ZOOKEEPER_SESSION_TIMEOUT_MS, new ZookeeperWatcher());
List topics = zk.getChildren("/brokers/topics", false);
zkTopics.addAll(topics);
} catch (IOException ex) {
LOGGER.log(Level.SEVERE, "Unable to find the zookeeper server: ", ex.toString());
} catch (KeeperException | InterruptedException ex) {
LOGGER.log(Level.SEVERE, "Cannot retrieve topic list from Zookeeper", ex);
+ } finally {
+ if (zk != null) {
+ try {
+ zk.close();
+ } catch (InterruptedException ex) {
+ LOGGER.log(Level.SEVERE, "Unable to close zookeeper connection", ex);
+ }
+ zk = null;
+ }
}
List dbProjectTopics = em.createNamedQuery("ProjectTopics.findAll").getResultList();
Set dbTopics = new HashSet<>();
for (ProjectTopics pt : dbProjectTopics) {
- try {
- dbTopics.add(pt.getTopicName());
- } catch (UnsupportedOperationException e) {
- LOGGER.log(Level.SEVERE, e.toString());
- }
+ dbTopics.add(pt.getTopicName());
}
/*
@@ -144,60 +135,24 @@ public void execute(Timer timer) {
* zkTopics.removeAll(dbTopics);
* 3. remove those topics
*/
- try {
- if (zkClient == null) {
- // 30 seconds
- int connectionTimeout = 90 * 1000;
- zkClient = new ZkClient(getIp(zkConnectionString).getHostName(),
- sessionTimeoutMs, connectionTimeout,
- ZKStringSerializer$.MODULE$);
- }
- if (!zkTopics.isEmpty()) {
- zkTopics.removeAll(dbTopics);
- for (String topicName : zkTopics) {
- if (zkConnection == null) {
- zkConnection = new ZkConnection(zkConnectionString);
- }
- ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
-
- try {
- AdminUtils.deleteTopic(zkUtils, topicName);
- LOGGER.log(Level.INFO, "{0} is removed from Zookeeper",
- new Object[]{topicName});
- } catch (TopicAlreadyMarkedForDeletionException ex) {
- LOGGER.log(Level.INFO, "{0} is already marked for deletion",
- new Object[]{topicName});
- }
- }
- }
+ zkTopics.removeAll(dbTopics);
- } catch (ServiceException ex) {
- LOGGER.log(Level.SEVERE, "Unable to get zookeeper ip address ", ex);
- } finally {
- if (zkClient != null) {
- zkClient.close();
- }
+ // DON'T remove offset topic
+ zkTopics.remove(offsetTopic);
+
+ if (!zkTopics.isEmpty()) {
+ // blocks until all are deleted
try {
- if (zkConnection != null) {
- zkConnection.close();
- }
- } catch (InterruptedException ex) {
- LOGGER.log(Level.SEVERE, null, ex);
+ hopsKafkaAdminClient.deleteTopics(zkTopics).all().get();
+ LOGGER.log(Level.INFO, "Removed topics {0} from Kafka", new Object[]{zkTopics});
+ } catch (ExecutionException | InterruptedException ex) {
+ LOGGER.log(Level.SEVERE, "Error dropping topics from Kafka", ex);
}
}
- } catch(Exception e) {
- LOGGER.log(Level.SEVERE, "Got an exception while cleaning up topics", e);
- }
- }
-
- private InetAddress getIp(String zkIp) throws ServiceException {
-
- String ip = zkIp.split(KafkaConst.COLON_SEPARATOR)[0];
- try {
- return InetAddress.getByName(ip);
- } catch (UnknownHostException ex) {
- throw new ServiceException(RESTCodes.ServiceErrorCode.ZOOKEEPER_SERVICE_UNAVAILABLE, Level.SEVERE,
- ex.getMessage());
+ } catch (ServiceDiscoveryException ex) {
+ LOGGER.log(Level.SEVERE, "Could not discover Zookeeper server addresses", ex);
+ } catch (Exception ex) {
+ LOGGER.log(Level.SEVERE, "Got an exception while cleaning up kafka topics", ex);
}
}
@@ -209,6 +164,8 @@ private InetAddress getIp(String zkIp) throws ServiceException {
minute = "*/1",
hour = "*")
public void getBrokers() {
+ // TODO: This should be removed by HOPSWORKS-2798 and usages of this method should simply call
+ // kafkaBrokers.getBrokerEndpoints() directly
try {
kafkaBrokers.setKafkaBrokers(kafkaBrokers.getBrokerEndpoints());
} catch (Exception ex) {
@@ -222,10 +179,4 @@ private class ZookeeperWatcher implements Watcher {
public void process(WatchedEvent we) {
}
}
-
- private String getZookeeperConnectionString() throws ServiceDiscoveryException {
- Service zk = serviceDiscoveryController
- .getAnyAddressOfServiceWithDNS(ServiceDiscoveryController.HopsworksService.ZOOKEEPER_CLIENT);
- return zk.getAddress() + ":" + zk.getPort();
- }
}
diff --git a/pom.xml b/pom.xml
index a6f27d7ef2..91a359df0d 100755
--- a/pom.xml
+++ b/pom.xml
@@ -161,17 +161,11 @@
7.0
1.6.2
1.6.2
- 0.11
3.4.14
-
- com.101tec
- zkclient
- ${zkclient.version}
-
com.auth0
java-jwt