Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepSamudrala committed Dec 1, 2016
2 parents 9cf36e9 + 37cb056 commit f96a084
Show file tree
Hide file tree
Showing 46 changed files with 1,571 additions and 301 deletions.
32 changes: 14 additions & 18 deletions cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
Expand Up @@ -54,7 +54,7 @@ public class FalconExtensionCLI {
public static final String REGISTER_OPT = "register";

// Input parameters
public static final String ENTENSION_NAME_OPT = "extensionName";
public static final String EXTENSION_NAME_OPT = "extensionName";
public static final String JOB_NAME_OPT = "jobName";
public static final String DESCRIPTION = "description";
public static final String PATH = "path";
Expand All @@ -69,7 +69,7 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
}

String result;
String extensionName = commandLine.getOptionValue(ENTENSION_NAME_OPT);
String extensionName = commandLine.getOptionValue(EXTENSION_NAME_OPT);
String jobName = commandLine.getOptionValue(JOB_NAME_OPT);
String filePath = commandLine.getOptionValue(FalconCLIConstants.FILE_PATH_OPT);
String doAsUser = commandLine.getOptionValue(FalconCLIConstants.DO_AS_OPT);
Expand All @@ -80,40 +80,36 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
result = client.enumerateExtensions();
result = prettyPrintJson(result);
} else if (optionsList.contains(DEFINITION_OPT)) {
validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.getExtensionDefinition(extensionName);
result = prettyPrintJson(result);
} else if (optionsList.contains(DESCRIBE_OPT)) {
validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.getExtensionDescription(extensionName);
} else if (optionsList.contains(UNREGISTER_OPT)) {
validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.unregisterExtension(extensionName);
}else if (optionsList.contains(DETAIL_OPT)) {
validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.getExtensionDetail(extensionName);
} else if (optionsList.contains(FalconCLIConstants.SUBMIT_OPT)) {
validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.submitExtensionJob(extensionName, filePath, doAsUser).getMessage();
result = client.submitExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(REGISTER_OPT)) {
validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(path, PATH);
result = client.registerExtension(extensionName, path, description);
}else if (optionsList.contains(FalconCLIConstants.SUBMIT_OPT)) {
validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.submitExtensionJob(extensionName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) {
validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.submitAndScheduleExtensionJob(extensionName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) {
validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.updateExtensionJob(extensionName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) {
validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.validateExtensionJob(extensionName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) {
Expand All @@ -129,7 +125,7 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.deleteExtensionJob(jobName, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.LIST_OPT)) {
validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
ExtensionJobList jobs = client.listExtensionJob(extensionName, doAsUser,
commandLine.getOptionValue(FalconCLIConstants.SORT_ORDER_OPT),
commandLine.getOptionValue(FalconCLIConstants.OFFSET_OPT),
Expand Down Expand Up @@ -203,7 +199,7 @@ public Options createExtensionOptions() {
Option doAs = new Option(FalconCLIConstants.DO_AS_OPT, true, "doAs user");
Option debug = new Option(FalconCLIConstants.DEBUG_OPTION, false,
"Use debug mode to see debugging statements on stdout");
Option extensionName = new Option(ENTENSION_NAME_OPT, true, "Extension name");
Option extensionName = new Option(EXTENSION_NAME_OPT, true, "Extension name");
Option jobName = new Option(JOB_NAME_OPT, true, "Extension job name");
Option instanceStatus = new Option(FalconCLIConstants.INSTANCE_STATUS_OPT, true, "Instance status");
Option sortOrder = new Option(FalconCLIConstants.SORT_ORDER_OPT, true, "asc or desc order for results");
Expand Down
4 changes: 4 additions & 0 deletions client/pom.xml
Expand Up @@ -112,6 +112,10 @@
<artifactId>hive-webhcat-java-client</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-extensions</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
61 changes: 61 additions & 0 deletions client/src/main/java/org/apache/falcon/ExtensionClassLoader.java
@@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.falcon;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.List;

/**
* Helper class loader that fetches jars from local disk and loads into JVM.
*/

public class ExtensionClassLoader extends URLClassLoader{

public static final Logger LOG = LoggerFactory.getLogger(ExtensionClassLoader.class);

public ExtensionClassLoader(URL[] urls, ClassLoader parent) {
super(urls, parent);
}

public static ClassLoader load(final List<URL> urls) throws IOException {
final ClassLoader parentClassLoader = ExtensionClassLoader.class.getClassLoader();
ClassLoader extensionClassLoader = java.security.AccessController.doPrivileged(
new java.security.PrivilegedAction<ExtensionClassLoader>() {
@Override
public ExtensionClassLoader run() {
return new ExtensionClassLoader(urls.toArray(new URL[urls.size()]), parentClassLoader);
}
}
);
LOG.info("Created a new ExtensionClassLoader using classpath = {}", Arrays.toString(urls.toArray()));
return extensionClassLoader;
}

@Override
protected void addURL(URL url) {
super.addURL(url);
}

}
189 changes: 189 additions & 0 deletions client/src/main/java/org/apache/falcon/ExtensionHandler.java
@@ -0,0 +1,189 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.falcon;

import org.apache.commons.codec.CharEncoding;
import org.apache.falcon.client.FalconCLIException;
import org.apache.falcon.client.FalconExtensionConstants;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.extensions.ExtensionBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;

/**
* Handler class that is responsible for preparing Extension entities.
*/
public final class ExtensionHandler {

public static final Logger LOG = LoggerFactory.getLogger(ExtensionHandler.class);
private static final String UTF_8 = CharEncoding.UTF_8;
private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir"));

public List<Entity> getEntities(ClassLoader extensionClassloader, String extensionName, String jobName,
InputStream configStream) throws IOException, FalconException {
Thread.currentThread().setContextClassLoader(extensionClassloader);

ServiceLoader<ExtensionBuilder> extensionBuilders = ServiceLoader.load(ExtensionBuilder.class);

List<Class<? extends ExtensionBuilder>> result = new ArrayList<>();

for (ExtensionBuilder extensionBuilder : extensionBuilders) {
result.add(extensionBuilder.getClass());
}

if (result.isEmpty()) {
throw new FalconException("Extension Implementation not found in the package of : " + extensionName);
} else if (result.size() > 1) {
throw new FalconException("Found more than one extension Implementation in the package of : "
+ extensionName);
}

ExtensionBuilder extensionBuilder = null;
try {
Class<ExtensionBuilder> clazz = (Class<ExtensionBuilder>) extensionClassloader
.loadClass(result.get(0).getCanonicalName());
extensionBuilder = clazz.newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new FalconCLIException("Failed to instantiate extension implementation " + extensionName, e);
}

extensionBuilder.validateExtensionConfig(extensionName, configStream);
List<Entity> entities = extensionBuilder.getEntities(jobName, configStream);

return entities;
}

public static List<Entity> loadAndPrepare(String extensionName, String jobName, InputStream configStream,
String extensionBuildLocation) throws IOException, FalconException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
String stagePath = createStagePath(extensionName, jobName);
List<URL> urls = ExtensionHandler.copyExtensionPackage(extensionBuildLocation, fs, stagePath);

List<Entity> entities = prepare(extensionName, jobName, configStream, urls);
ExtensionHandler.stageEntities(entities, stagePath);
return entities;
}

public static List<Entity> prepare(String extensionName, String jobName, InputStream configStream, List<URL> urls)
throws IOException, FalconException {
ClassLoader extensionClassLoader = ExtensionClassLoader.load(urls);
ExtensionHandler extensionHandler = new ExtensionHandler();

return extensionHandler.getEntities(extensionClassLoader, extensionName, jobName, configStream);
}

// This method is only for debugging, the staged entities can be found in /tmp path.
private static void stageEntities(List<Entity> entities, String stagePath) {
File entityFile;
EntityType type;
for (Entity entity : entities) {
type = entity.getEntityType();
OutputStream out;
try {
entityFile = new File(stagePath + File.separator + entity.getEntityType().toString() + "_"
+ URLEncoder.encode(entity.getName(), UTF_8));
if (!entityFile.createNewFile()) {
LOG.debug("Not able to stage the entities in the tmp path");
return;
}
out = new FileOutputStream(entityFile);
type.getMarshaller().marshal(entity, out);
LOG.debug("Staged configuration {}/{}", type, entity.getName());
out.close();
} catch (Exception e) {
LOG.error("Unable to serialize the entity object {}/{}", type, entity.getName(), e);
}
}
}

private static String createStagePath(String extensionName, String jobName) {
String stagePath = TMP_BASE_DIR + File.separator + extensionName + File.separator + jobName
+ File.separator + System.currentTimeMillis()/1000;
File tmpPath = new File(stagePath);
if (tmpPath.mkdir()) {
throw new FalconCLIException("Failed to create stage directory" + tmpPath.toString());
}
return stagePath;
}

public static List<URL> copyExtensionPackage(String extensionBuildUrl, FileSystem fs, String stagePath)
throws IOException {

Path libsPath = new Path(extensionBuildUrl, FalconExtensionConstants.LIBS);
Path buildLibsPath = new Path(libsPath, FalconExtensionConstants.BUILD);
Path localStagePath = new Path(stagePath);
Path localBuildLibsPath = new Path(localStagePath, FalconExtensionConstants.LIBS);
LOG.info("Copying build time libs from {} to {}", buildLibsPath, localBuildLibsPath);
fs.copyToLocalFile(buildLibsPath, localBuildLibsPath);

Path resourcesPath = new Path(extensionBuildUrl, FalconExtensionConstants.RESOURCES);
Path buildResourcesPath = new Path(resourcesPath, FalconExtensionConstants.BUILD);
Path localBuildResourcesPath = new Path(localStagePath, FalconExtensionConstants.RESOURCES);
LOG.info("Copying build time resources from {} to {}", buildLibsPath, localBuildResourcesPath);
fs.copyToLocalFile(buildResourcesPath, localBuildResourcesPath);

List<URL> urls = new ArrayList<>();
urls.addAll(getFilesInPath(localBuildLibsPath.toUri().toURL()));
urls.add(localBuildResourcesPath.toUri().toURL());
return urls;
}

public static List<URL> getFilesInPath(URL fileURL) throws MalformedURLException {
List<URL> urls = new ArrayList<>();

File file = new File(fileURL.getPath());
if (file.isDirectory()) {
File[] files = file.listFiles();

if (files != null) {
for (File innerFile : files) {
if (innerFile.isFile()) {
urls.add(innerFile.toURI().toURL());
} else {
urls.addAll(getFilesInPath(file.toURI().toURL()));
}
}
}

if (!fileURL.toString().endsWith("/")) {
fileURL = new URL(fileURL.toString() + "/");
}
}

urls.add(fileURL);
return urls;
}
}
Expand Up @@ -178,6 +178,18 @@ public abstract APIResult getStatus(EntityType entityType, String entityName, St
public abstract APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun, String doAsUser,
String properties);

/**
* Prepare set of entities the extension has implemented and stage them to a local directory and submit them too.
* @param extensionName extension which is available in the store.
* @param jobName name to be used in all the extension entities' tagging that are built as part of
* loadAndPrepare.
* @param configPath path to extension parameters.
* @return
* @throws FalconCLIException
*/
public abstract APIResult submitExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser);

/**
*
* Get list of the entities.
Expand Down

0 comments on commit f96a084

Please sign in to comment.