Skip to content

Commit

Permalink
[HOPSWORKS-2826] OnlineFS replays data on cluster restart (#709)
Browse files Browse the repository at this point in the history
  • Loading branch information
moritzmeister authored and SirOibaf committed Nov 24, 2021
1 parent 5b92555 commit eaf21b2
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 99 deletions.
1 change: 0 additions & 1 deletion LICENSE_OF_DEPENDENCIES.md
Expand Up @@ -84,7 +84,6 @@ Frontend dependencies

Backend Dependencies

* com.101tec:zkclient:jar:0.9 [ApacheV2](http://www.apache.org/licenses/LICENSE-2.0)
* com.google.code.gson:gson:jar:2.2 [ApacheV2](https://github.com/google/gson/blob/master/LICENSE)
* com.googlecode.json-simple:json-simple:jar:1.1.1 [ApacheV2](https://github.com/fangyidong/json-simple/blob/master/LICENSE.txt)
* com.google.guava:guava:jar:18.0 [ApacheV2](https://github.com/google/guava/blob/master/COPYING)
Expand Down
4 changes: 0 additions & 4 deletions hopsworks-common/pom.xml
Expand Up @@ -104,10 +104,6 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
Expand Down
Expand Up @@ -17,7 +17,8 @@
package io.hops.hopsworks.common.kafka;

import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import com.logicalclocks.servicediscoverclient.service.Service;
import com.logicalclocks.servicediscoverclient.resolvers.Type;
import com.logicalclocks.servicediscoverclient.service.ServiceQuery;
import io.hops.hopsworks.common.dao.kafka.KafkaConst;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.util.Settings;
Expand All @@ -36,6 +37,7 @@
import javax.ejb.TransactionAttributeType;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -96,9 +98,7 @@ public Optional<String> getAnyKafkaBroker() {
@Lock(LockType.READ)
public Set<String> getBrokerEndpoints() throws IOException, KeeperException, InterruptedException {
try {
Service zkService = serviceDiscoveryController
.getAnyAddressOfServiceWithDNS(ServiceDiscoveryController.HopsworksService.ZOOKEEPER_CLIENT);
String zkConnectionString = zkService.getAddress() + ":" + zkService.getPort();
String zkConnectionString = getZookeeperConnectionString();
final ZooKeeper zk = new ZooKeeper(zkConnectionString, Settings.ZOOKEEPER_SESSION_TIMEOUT_MS,
new Watcher() {
@Override
Expand Down Expand Up @@ -130,6 +130,15 @@ public void process(WatchedEvent watchedEvent) {
}
}

public String getZookeeperConnectionString() throws ServiceDiscoveryException {
return serviceDiscoveryController.getService(
Type.DNS, ServiceQuery.of(
serviceDiscoveryController.constructServiceFQDN(ServiceDiscoveryController.HopsworksService.ZOOKEEPER_CLIENT),
Collections.emptySet()))
.map(zkServer -> zkServer.getAddress() + ":" + zkServer.getPort())
.collect(Collectors.joining(","));
}

private String getBrokerInfo(ZooKeeper zk, String brokerId) {
try {
return new String(zk.getData("/brokers/ids/" + brokerId, false, null));
Expand Down
Expand Up @@ -40,18 +40,10 @@
package io.hops.hopsworks.common.kafka;

import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import com.logicalclocks.servicediscoverclient.service.Service;
import io.hops.hopsworks.common.dao.kafka.KafkaConst;
import io.hops.hopsworks.common.dao.kafka.HopsKafkaAdminClient;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.persistence.entity.kafka.ProjectTopics;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.restutils.RESTCodes;
import kafka.admin.AdminUtils;
import kafka.common.TopicAlreadyMarkedForDeletionException;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
Expand All @@ -64,11 +56,10 @@
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -82,16 +73,18 @@ public class ZookeeperTopicCleanerTimer {
private final static Logger LOGGER = Logger.getLogger(
ZookeeperTopicCleanerTimer.class.getName());

private final static String offsetTopic = "__consumer_offsets";

@PersistenceContext(unitName = "kthfsPU")
private EntityManager em;

@EJB
private ServiceDiscoveryController serviceDiscoveryController;
@EJB
private KafkaBrokers kafkaBrokers;
@EJB
private HopsKafkaAdminClient hopsKafkaAdminClient;

private ZkClient zkClient = null;
private ZkConnection zkConnection = null;
private ZooKeeper zk = null;

// Run once per hour
Expand All @@ -102,34 +95,32 @@ public void execute(Timer timer) {
LOGGER.log(Level.INFO, "Running ZookeeperTopicCleanerTimer.");

try {
String zkConnectionString = getZookeeperConnectionString();
String zkConnectionString = kafkaBrokers.getZookeeperConnectionString();
Set<String> zkTopics = new HashSet<>();
//30 seconds
int sessionTimeoutMs = 30 * 1000;
try {
if (zk == null || !zk.getState().isConnected()) {
if (zk != null) {
zk.close();
}
zk = new ZooKeeper(zkConnectionString, sessionTimeoutMs, new ZookeeperWatcher());
}
zk = new ZooKeeper(zkConnectionString, Settings.ZOOKEEPER_SESSION_TIMEOUT_MS, new ZookeeperWatcher());
List<String> topics = zk.getChildren("/brokers/topics", false);
zkTopics.addAll(topics);
} catch (IOException ex) {
LOGGER.log(Level.SEVERE, "Unable to find the zookeeper server: ", ex.toString());
} catch (KeeperException | InterruptedException ex) {
LOGGER.log(Level.SEVERE, "Cannot retrieve topic list from Zookeeper", ex);
} finally {
if (zk != null) {
try {
zk.close();
} catch (InterruptedException ex) {
LOGGER.log(Level.SEVERE, "Unable to close zookeeper connection", ex);
}
zk = null;
}
}

List<ProjectTopics> dbProjectTopics = em.createNamedQuery("ProjectTopics.findAll").getResultList();
Set<String> dbTopics = new HashSet<>();

for (ProjectTopics pt : dbProjectTopics) {
try {
dbTopics.add(pt.getTopicName());
} catch (UnsupportedOperationException e) {
LOGGER.log(Level.SEVERE, e.toString());
}
dbTopics.add(pt.getTopicName());
}

/*
Expand All @@ -144,60 +135,24 @@ public void execute(Timer timer) {
* zkTopics.removeAll(dbTopics);
* 3. remove those topics
*/
try {
if (zkClient == null) {
// 30 seconds
int connectionTimeout = 90 * 1000;
zkClient = new ZkClient(getIp(zkConnectionString).getHostName(),
sessionTimeoutMs, connectionTimeout,
ZKStringSerializer$.MODULE$);
}
if (!zkTopics.isEmpty()) {
zkTopics.removeAll(dbTopics);
for (String topicName : zkTopics) {
if (zkConnection == null) {
zkConnection = new ZkConnection(zkConnectionString);
}
ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);

try {
AdminUtils.deleteTopic(zkUtils, topicName);
LOGGER.log(Level.INFO, "{0} is removed from Zookeeper",
new Object[]{topicName});
} catch (TopicAlreadyMarkedForDeletionException ex) {
LOGGER.log(Level.INFO, "{0} is already marked for deletion",
new Object[]{topicName});
}
}
}
zkTopics.removeAll(dbTopics);

} catch (ServiceException ex) {
LOGGER.log(Level.SEVERE, "Unable to get zookeeper ip address ", ex);
} finally {
if (zkClient != null) {
zkClient.close();
}
// DON'T remove offset topic
zkTopics.remove(offsetTopic);

if (!zkTopics.isEmpty()) {
// blocks until all are deleted
try {
if (zkConnection != null) {
zkConnection.close();
}
} catch (InterruptedException ex) {
LOGGER.log(Level.SEVERE, null, ex);
hopsKafkaAdminClient.deleteTopics(zkTopics).all().get();
LOGGER.log(Level.INFO, "Removed topics {0} from Kafka", new Object[]{zkTopics});
} catch (ExecutionException | InterruptedException ex) {
LOGGER.log(Level.SEVERE, "Error dropping topics from Kafka", ex);
}
}
} catch(Exception e) {
LOGGER.log(Level.SEVERE, "Got an exception while cleaning up topics", e);
}
}

private InetAddress getIp(String zkIp) throws ServiceException {

String ip = zkIp.split(KafkaConst.COLON_SEPARATOR)[0];
try {
return InetAddress.getByName(ip);
} catch (UnknownHostException ex) {
throw new ServiceException(RESTCodes.ServiceErrorCode.ZOOKEEPER_SERVICE_UNAVAILABLE, Level.SEVERE,
ex.getMessage());
} catch (ServiceDiscoveryException ex) {
LOGGER.log(Level.SEVERE, "Could not discover Zookeeper server addresses", ex);
} catch (Exception ex) {
LOGGER.log(Level.SEVERE, "Got an exception while cleaning up kafka topics", ex);
}
}

Expand All @@ -209,6 +164,8 @@ private InetAddress getIp(String zkIp) throws ServiceException {
minute = "*/1",
hour = "*")
public void getBrokers() {
// TODO: This should be removed by HOPSWORKS-2798 and usages of this method should simply call
// kafkaBrokers.getBrokerEndpoints() directly
try {
kafkaBrokers.setKafkaBrokers(kafkaBrokers.getBrokerEndpoints());
} catch (Exception ex) {
Expand All @@ -222,10 +179,4 @@ private class ZookeeperWatcher implements Watcher {
public void process(WatchedEvent we) {
}
}

private String getZookeeperConnectionString() throws ServiceDiscoveryException {
Service zk = serviceDiscoveryController
.getAnyAddressOfServiceWithDNS(ServiceDiscoveryController.HopsworksService.ZOOKEEPER_CLIENT);
return zk.getAddress() + ":" + zk.getPort();
}
}
6 changes: 0 additions & 6 deletions pom.xml
Expand Up @@ -161,17 +161,11 @@
<spnego.version>7.0</spnego.version>
<swagger-annotations.version>1.6.2</swagger-annotations.version>
<swagger.jersey2.jaxrs.version>1.6.2</swagger.jersey2.jaxrs.version>
<zkclient.version>0.11</zkclient.version>
<zookeeper.version>3.4.14</zookeeper.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>${zkclient.version}</version>
</dependency>
<dependency>
<groupId>com.auth0</groupId>
<artifactId>java-jwt</artifactId>
Expand Down

0 comments on commit eaf21b2

Please sign in to comment.