Skip to content

Commit

Permalink
feat(exporters): load, configure, and validate exporters
Browse files Browse the repository at this point in the history
- adds isolated JAR class loader
- adds repository to load and cache class loaders from 3rd party JARs
- adds exporter descriptor which knows how to create a configured
exporter instance
- adds exporter repository which can load and cache exporters from
configuration
- adds exporter config to broker configuration

related to: #1170
  • Loading branch information
npepinpe committed Aug 24, 2018
1 parent 1736a14 commit 07bff10
Show file tree
Hide file tree
Showing 21 changed files with 1,288 additions and 4 deletions.
9 changes: 7 additions & 2 deletions broker-core/pom.xml
Expand Up @@ -19,7 +19,7 @@
</properties>

<dependencies>
<dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
Expand Down Expand Up @@ -73,12 +73,17 @@
<groupId>io.zeebe</groupId>
<artifactId>zeebe-gateway</artifactId>
</dependency>

<dependency>
<groupId>io.zeebe</groupId>
<artifactId>zb-msgpack-json-el</artifactId>
</dependency>

<dependency>
<groupId>io.zeebe</groupId>
<artifactId>zb-exporter</artifactId>
</dependency>

<dependency>
<groupId>com.moandjiezana.toml</groupId>
<artifactId>toml4j</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion broker-core/src/main/java/io/zeebe/broker/Loggers.java
Expand Up @@ -21,12 +21,12 @@
import org.slf4j.Logger;

public class Loggers {

public static final Logger CLUSTERING_LOGGER = new ZbLogger("io.zeebe.broker.clustering");
public static final Logger SERVICES_LOGGER = new ZbLogger("io.zeebe.broker.services");
public static final Logger SYSTEM_LOGGER = new ZbLogger("io.zeebe.broker.system");
public static final Logger TRANSPORT_LOGGER = new ZbLogger("io.zeebe.broker.transport");
public static final Logger STREAM_PROCESSING = new ZbLogger("io.zeebe.broker.streamProcessing");
public static final Logger WORKFLOW_REPOSITORY_LOGGER =
new ZbLogger("io.zeebe.broker.workflow.repository");
public static final Logger EXPORTER_LOGGER = new ZbLogger("io.zeebe.broker.exporter");
}
@@ -0,0 +1,61 @@
/*
* Zeebe Broker Core
* Copyright © 2017 camunda services GmbH (info@camunda.com)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.zeebe.broker.exporter.context;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import io.zeebe.exporter.context.Configuration;
import java.util.Map;

public class ExporterConfiguration implements Configuration {
private static final Gson CONFIG_INSTANTIATOR = new GsonBuilder().create();

private final String id;
private final Map<String, Object> arguments;

private JsonElement intermediateConfiguration;

public ExporterConfiguration(final String id, final Map<String, Object> arguments) {
this.id = id;
this.arguments = arguments;
}

@Override
public String getId() {
return id;
}

@Override
public Map<String, Object> getArguments() {
return arguments;
}

@Override
public <T> T instantiate(Class<T> configClass) {
return CONFIG_INSTANTIATOR.fromJson(getIntermediateConfiguration(), configClass);
}

private JsonElement getIntermediateConfiguration() {
if (intermediateConfiguration == null) {
intermediateConfiguration = CONFIG_INSTANTIATOR.toJsonTree(arguments);
}

return intermediateConfiguration;
}
}
@@ -0,0 +1,42 @@
/*
* Zeebe Broker Core
* Copyright © 2017 camunda services GmbH (info@camunda.com)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.zeebe.broker.exporter.context;

import io.zeebe.exporter.context.Configuration;
import io.zeebe.exporter.context.Context;
import org.slf4j.Logger;

public class ExporterContext implements Context {
private final Logger logger;
private final Configuration configuration;

public ExporterContext(final Logger logger, final Configuration configuration) {
this.logger = logger;
this.configuration = configuration;
}

@Override
public Logger getLogger() {
return logger;
}

@Override
public Configuration getConfiguration() {
return configuration;
}
}
@@ -0,0 +1,87 @@
/*
* Zeebe Broker Core
* Copyright © 2017 camunda services GmbH (info@camunda.com)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.zeebe.broker.exporter.jar;

import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;

/**
* Provides a class loader which isolates external exporters from other exporters, while exposing
* our own code to ensure versions match at runtime.
*/
public class ExporterJarClassLoader extends URLClassLoader {
public static final String JAVA_PACKAGE_PREFIX = "java.";
public static final String JAR_URL_FORMAT = "jar:%s!/";

/** lists of packages from broker base that are exposed at runtime to the external exporters */
public static final String[] EXPOSED_PACKAGE_PREFIXES =
new String[] {"io.zeebe.exporter.", "org.slf4j.", "org.apache.logging.log4j."};

public ExporterJarClassLoader(URL[] urls) {
super(urls);
}

public static ExporterJarClassLoader ofPath(final Path jarPath) throws ExporterJarLoadException {
final URL jarUrl;

try {
final String expandedPath = jarPath.toUri().toURL().toString();
jarUrl = new URL(String.format(JAR_URL_FORMAT, expandedPath));
} catch (final MalformedURLException e) {
throw new ExporterJarLoadException(jarPath, "bad JAR url", e);
}

return new ExporterJarClassLoader(new URL[] {jarUrl});
}

@Override
public Class<?> loadClass(String name) throws ClassNotFoundException {
synchronized (getClassLoadingLock(name)) {
if (name.startsWith(JAVA_PACKAGE_PREFIX)) {
return findSystemClass(name);
}

if (isProvidedByBroker(name)) {
return getSystemClassLoader().loadClass(name);
}

Class<?> clazz = findLoadedClass(name);
if (clazz == null) {
try {
clazz = findClass(name);
} catch (final ClassNotFoundException ex) {
clazz = super.loadClass(name);
}
}

return clazz;
}
}

private boolean isProvidedByBroker(final String name) {
for (final String prefix : EXPOSED_PACKAGE_PREFIXES) {
if (name.startsWith(prefix)) {
return true;
}
}

return false;
}
}
@@ -0,0 +1,34 @@
/*
* Zeebe Broker Core
* Copyright © 2017 camunda services GmbH (info@camunda.com)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.zeebe.broker.exporter.jar;

import java.io.IOException;
import java.nio.file.Path;

public class ExporterJarLoadException extends IOException {
private static final String MESSAGE_FORMAT = "Cannot load JAR at [%s]: %s";
private static final long serialVersionUID = 1655276726721040696L;

public ExporterJarLoadException(final Path jarPath, final String reason) {
super(String.format(MESSAGE_FORMAT, jarPath, reason));
}

public ExporterJarLoadException(final Path jarPath, final String reason, final Throwable cause) {
super(String.format(MESSAGE_FORMAT, jarPath, reason), cause);
}
}
@@ -0,0 +1,91 @@
/*
* Zeebe Broker Core
* Copyright © 2017 camunda services GmbH (info@camunda.com)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.zeebe.broker.exporter.jar;

import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* Maintains a map of all loaded exporter JARs and their corresponding class loaders for quick
* reuse.
*/
public class ExporterJarRepository {
public static final String JAR_EXTENSION = ".jar";

private final Map<Path, ExporterJarClassLoader> loadedJars;

public ExporterJarRepository() {
this(new HashMap<>());
}

public ExporterJarRepository(final Map<Path, ExporterJarClassLoader> loadedJars) {
this.loadedJars = loadedJars;
}

public Map<Path, ExporterJarClassLoader> getJars() {
return Collections.unmodifiableMap(loadedJars);
}

public ExporterJarClassLoader remove(final String jarPath) {
return remove(Paths.get(jarPath));
}

public ExporterJarClassLoader remove(final Path jarPath) {
return loadedJars.remove(jarPath);
}

public ExporterJarClassLoader load(final String jarPath) throws ExporterJarLoadException {
return load(Paths.get(jarPath));
}

public ExporterJarClassLoader load(final Path jarPath) throws ExporterJarLoadException {
ExporterJarClassLoader classLoader = loadedJars.get(jarPath);

if (classLoader == null) {
verifyJarPath(jarPath);

classLoader = ExporterJarClassLoader.ofPath(jarPath);
loadedJars.put(jarPath, classLoader);
}

return classLoader;
}

/**
* Verifies that the given path points to an existing, readable JAR file. Does not perform more
* complex validation such as checking it is a valid JAR, verifying its signature, etc.
*
* @param path path to verify
* @throws ExporterJarLoadException if it is not a JAR, not readable, or does not exist
*/
private void verifyJarPath(final Path path) throws ExporterJarLoadException {
final File jarFile = path.toFile();

if (!jarFile.getName().endsWith(JAR_EXTENSION)) {
throw new ExporterJarLoadException(path, "is not a JAR");
}

if (!jarFile.canRead()) {
throw new ExporterJarLoadException(path, "is not readable");
}
}
}

0 comments on commit 07bff10

Please sign in to comment.