Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
Pracheer Agarwal committed Jan 5, 2017
2 parents 46042fd + 3f6b690 commit 066c8e2
Show file tree
Hide file tree
Showing 47 changed files with 1,694 additions and 752 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ activemq-data
#log files
logs
*.log
*.patch

#Falcon UI NPM files
falcon-ui/dist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ public void entityCommand(CommandLine commandLine, FalconClient client) throws I
OUT.get().println(result);
}

private void validateColo(Set<String> optionsList) {
static void validateColo(Set<String> optionsList) {
if (optionsList.contains(FalconCLIConstants.COLO_OPT)) {
throw new FalconCLIException("Invalid argument : " + FalconCLIConstants.COLO_OPT);
}
Expand Down
73 changes: 48 additions & 25 deletions cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,44 @@
import org.apache.falcon.resource.ExtensionInstanceList;
import org.apache.falcon.resource.ExtensionJobList;

import java.io.IOException;
import java.io.PrintStream;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.falcon.cli.FalconEntityCLI.validateColo;
import static org.apache.falcon.client.FalconCLIConstants.COLO_OPT;
import static org.apache.falcon.client.FalconCLIConstants.COLO_OPT_DESCRIPTION;

/**
* Falcon extensions Command Line Interface - wraps the RESTful API for extensions.
*/
public class FalconExtensionCLI {
public class FalconExtensionCLI extends FalconCLI{
public static final AtomicReference<PrintStream> OUT = new AtomicReference<>(System.out);

// Extension commands
public static final String ENUMERATE_OPT = "enumerate";
public static final String DEFINITION_OPT = "definition";
public static final String DESCRIBE_OPT = "describe";
public static final String INSTANCES_OPT = "instances";
public static final String UNREGISTER_OPT = "unregister";
public static final String DETAIL_OPT = "detail";
public static final String REGISTER_OPT = "register";
private static final String ENUMERATE_OPT = "enumerate";
private static final String DEFINITION_OPT = "definition";
private static final String DESCRIBE_OPT = "describe";
private static final String INSTANCES_OPT = "instances";
private static final String UNREGISTER_OPT = "unregister";
private static final String DETAIL_OPT = "detail";
private static final String REGISTER_OPT = "register";
private static final String ENABLE_OPT = "enable";
private static final String DISABLE_OPT = "disable";

// Input parameters
public static final String EXTENSION_NAME_OPT = "extensionName";
public static final String JOB_NAME_OPT = "jobName";
private static final String EXTENSION_NAME_OPT = "extensionName";
private static final String JOB_NAME_OPT = "jobName";
public static final String DESCRIPTION = "description";
public static final String PATH = "path";
private static final String PATH = "path";

public FalconExtensionCLI() {
FalconExtensionCLI() throws Exception {
super();
}

public void extensionCommand(CommandLine commandLine, FalconClient client) {
void extensionCommand(CommandLine commandLine, FalconClient client) throws IOException {
Set<String> optionsList = new HashSet<>();
for (Option option : commandLine.getOptions()) {
optionsList.add(option.getOpt());
Expand All @@ -75,54 +83,60 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
String doAsUser = commandLine.getOptionValue(FalconCLIConstants.DO_AS_OPT);
String path = commandLine.getOptionValue(FalconCLIConstants.PATH);
String description = commandLine.getOptionValue(FalconCLIConstants.DESCRIPTION);
String colo = commandLine.getOptionValue(FalconCLIConstants.COLO_OPT);
colo = getColo(colo);

if (optionsList.contains(ENUMERATE_OPT)) {
result = client.enumerateExtensions();
result = client.enumerateExtensions().getMessage();
result = prettyPrintJson(result);
} else if (optionsList.contains(DEFINITION_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.getExtensionDefinition(extensionName);
result = client.getExtensionDefinition(extensionName).getMessage();
result = prettyPrintJson(result);
} else if (optionsList.contains(DESCRIBE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.getExtensionDescription(extensionName);
result = client.getExtensionDescription(extensionName).getMessage();
} else if (optionsList.contains(UNREGISTER_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.unregisterExtension(extensionName);
result = client.unregisterExtension(extensionName).getMessage();
} else if (optionsList.contains(DETAIL_OPT)) {
if (optionsList.contains(JOB_NAME_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.getExtensionJobDetails(jobName);
result = client.getExtensionJobDetails(jobName).getMessage();
result = prettyPrintJson(result);
} else {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.getExtensionDetail(extensionName);
result = client.getExtensionDetail(extensionName).getMessage();
result = prettyPrintJson(result);
}
} else if (optionsList.contains(FalconCLIConstants.SUBMIT_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
validateColo(optionsList);
result = client.submitExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(REGISTER_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(path, PATH);
result = client.registerExtension(extensionName, path, description);
result = client.registerExtension(extensionName, path, description).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
validateColo(optionsList);
result = client.submitAndScheduleExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.updateExtensionJob(extensionName, filePath, doAsUser).getMessage();
result = client.updateExtensionJob(jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.validateExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.scheduleExtensionJob(jobName, doAsUser).getMessage();
colo = getColo(colo);
result = client.scheduleExtensionJob(jobName, colo, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SUSPEND_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.suspendExtensionJob(jobName, doAsUser).getMessage();
Expand Down Expand Up @@ -152,16 +166,22 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
commandLine.getOptionValue(FalconCLIConstants.OFFSET_OPT),
commandLine.getOptionValue(FalconCLIConstants.NUM_RESULTS_OPT));
result = instances != null ? instances.toString() : "No instance (" + jobName + ") found.";
} else if (optionsList.contains(ENABLE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.enableExtension(extensionName).getMessage();
} else if (optionsList.contains(DISABLE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.disableExtension(extensionName).getMessage();
} else {
throw new FalconCLIException("Invalid/missing extension command. Supported commands include "
+ "enumerate, definition, describe, list, instances, submit, submitAndSchedule, "
+ "schedule, suspend, resume, delete, update, validate. "
+ "schedule, suspend, resume, delete, update, validate, enable, disable. "
+ "Please refer to Falcon CLI twiki for more details.");
}
OUT.get().println(result);
}

public Options createExtensionOptions() {
Options createExtensionOptions() {
Options extensionOptions = new Options();

Option enumerate = new Option(ENUMERATE_OPT, false, "Enumerate all extensions");
Expand All @@ -183,6 +203,8 @@ public Options createExtensionOptions() {
Option detail = new Option(FalconCLIConstants.DETAIL, false, "Show details of a given extension");
Option register = new Option(FalconCLIConstants.REGISTER, false, "Register an extension with Falcon. This will "
+ "make the extension available for instantiation for all users.");
Option colo = new Option(COLO_OPT, true, COLO_OPT_DESCRIPTION);
colo.setRequired(false);

OptionGroup group = new OptionGroup();
group.addOption(enumerate);
Expand Down Expand Up @@ -240,6 +262,7 @@ public Options createExtensionOptions() {
extensionOptions.addOption(filePath);
extensionOptions.addOption(path);
extensionOptions.addOption(description);
extensionOptions.addOption(colo);

return extensionOptions;
}
Expand Down
42 changes: 28 additions & 14 deletions client/src/main/java/org/apache/falcon/ExtensionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@
import java.util.List;
import java.util.ServiceLoader;

import static org.apache.falcon.client.FalconClient.OUT;

/**
* Handler class that is responsible for preparing Extension entities.
*/
Expand All @@ -56,9 +54,14 @@ public final class ExtensionHandler {
private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir"));
private static final String LOCATION = "location";
private static final String TYPE = "type";
private static final String NAME = "extensionName";
private static final String EXTENSION_BUILDER_INTERFACE_SERVICE_FILE =
"META-INF/services/org.apache.falcon.extensions.ExtensionBuilder";

private ExtensionHandler(){}

public List<Entity> getEntities(ClassLoader extensionClassloader, String extensionName, String jobName,
InputStream configStream) throws IOException, FalconException {
private List<Entity> getEntities(ClassLoader extensionClassloader, String extensionName, String jobName,
InputStream configStream) throws IOException, FalconException {
Thread.currentThread().setContextClassLoader(extensionClassloader);

ServiceLoader<ExtensionBuilder> extensionBuilders = ServiceLoader.load(ExtensionBuilder.class);
Expand All @@ -78,6 +81,7 @@ public List<Entity> getEntities(ClassLoader extensionClassloader, String extensi

ExtensionBuilder extensionBuilder = null;
try {
@SuppressWarnings("unchecked")
Class<ExtensionBuilder> clazz = (Class<ExtensionBuilder>) extensionClassloader
.loadClass(result.get(0).getCanonicalName());
extensionBuilder = clazz.newInstance();
Expand All @@ -86,9 +90,8 @@ public List<Entity> getEntities(ClassLoader extensionClassloader, String extensi
}

extensionBuilder.validateExtensionConfig(extensionName, configStream);
List<Entity> entities = extensionBuilder.getEntities(jobName, configStream);

return entities;
return extensionBuilder.getEntities(jobName, configStream);
}

public static List<Entity> loadAndPrepare(String extensionName, String jobName, InputStream configStream,
Expand All @@ -106,6 +109,10 @@ public static List<Entity> loadAndPrepare(String extensionName, String jobName,
public static List<Entity> prepare(String extensionName, String jobName, InputStream configStream, List<URL> urls)
throws IOException, FalconException {
ClassLoader extensionClassLoader = ExtensionClassLoader.load(urls);
if (extensionClassLoader.getResourceAsStream(EXTENSION_BUILDER_INTERFACE_SERVICE_FILE) == null) {
throw new FalconCLIException("The extension build time jars do not contain "
+ EXTENSION_BUILDER_INTERFACE_SERVICE_FILE);
}
ExtensionHandler extensionHandler = new ExtensionHandler();

return extensionHandler.getEntities(extensionClassLoader, extensionName, jobName, configStream);
Expand Down Expand Up @@ -145,7 +152,7 @@ private static String createStagePath(String extensionName, String jobName) {
return stagePath;
}

public static List<URL> copyExtensionPackage(String extensionBuildUrl, FileSystem fs, String stagePath)
private static List<URL> copyExtensionPackage(String extensionBuildUrl, FileSystem fs, String stagePath)
throws IOException {

Path libsPath = new Path(extensionBuildUrl, FalconExtensionConstants.LIBS);
Expand All @@ -167,7 +174,7 @@ public static List<URL> copyExtensionPackage(String extensionBuildUrl, FileSyste
return urls;
}

public static List<URL> getFilesInPath(URL fileURL) throws MalformedURLException {
static List<URL> getFilesInPath(URL fileURL) throws MalformedURLException {
List<URL> urls = new ArrayList<>();

File file = new File(fileURL.getPath());
Expand All @@ -178,8 +185,6 @@ public static List<URL> getFilesInPath(URL fileURL) throws MalformedURLException
for (File innerFile : files) {
if (innerFile.isFile()) {
urls.add(innerFile.toURI().toURL());
} else {
urls.addAll(getFilesInPath(file.toURI().toURL()));
}
}
}
Expand All @@ -199,8 +204,8 @@ public static String getExtensionLocation(String extensionName, JSONObject exten
try {
extensionBuildPath = extensionDetailJson.get(LOCATION).toString();
} catch (JSONException e) {
OUT.get().print("Error. " + extensionName + " not found ");
throw new FalconCLIException("Failed to get extension type for the given extension");
throw new FalconCLIException("Failed to get extension location for the given extension:" + extensionName,
e);
}
return extensionBuildPath;
}
Expand All @@ -210,8 +215,17 @@ public static String getExtensionType(String extensionName, JSONObject extensio
try {
extensionType = extensionDetailJson.get(TYPE).toString();
} catch (JSONException e) {
OUT.get().print("Error. " + extensionName + " not found ");
throw new FalconCLIException("Failed to get extension type for the given extension");
throw new FalconCLIException("Failed to get extension type for the given extension:" + extensionName, e);
}
return extensionType;
}

public static String getExtensionName(String jobName, JSONObject extensionJobDetailJson) {
String extensionType;
try {
extensionType = extensionJobDetailJson.get(NAME).toString();
} catch (JSONException e) {
throw new FalconCLIException("Failed to get extension name for the given extension job:" + jobName, e);
}
return extensionType;
}
Expand Down
Loading

0 comments on commit 066c8e2

Please sign in to comment.