Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepSamudrala committed Dec 26, 2016
2 parents 4a2e23e + 9946758 commit 0cf9af6
Show file tree
Hide file tree
Showing 10 changed files with 377 additions and 243 deletions.
29 changes: 17 additions & 12 deletions client/src/main/java/org/apache/falcon/ExtensionHandler.java
Expand Up @@ -44,8 +44,6 @@
import java.util.List;
import java.util.ServiceLoader;

import static org.apache.falcon.client.FalconClient.OUT;

/**
* Handler class that is responsible for preparing Extension entities.
*/
Expand All @@ -56,9 +54,13 @@ public final class ExtensionHandler {
private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir"));
private static final String LOCATION = "location";
private static final String TYPE = "type";
private static final String EXTENSION_BUILDER_INTERFACE_SERVICE_FILE =
"META-INF/services/org.apache.falcon.extensions.ExtensionBuilder";

private ExtensionHandler(){}

public List<Entity> getEntities(ClassLoader extensionClassloader, String extensionName, String jobName,
InputStream configStream) throws IOException, FalconException {
private List<Entity> getEntities(ClassLoader extensionClassloader, String extensionName, String jobName,
InputStream configStream) throws IOException, FalconException {
Thread.currentThread().setContextClassLoader(extensionClassloader);

ServiceLoader<ExtensionBuilder> extensionBuilders = ServiceLoader.load(ExtensionBuilder.class);
Expand All @@ -78,6 +80,7 @@ public List<Entity> getEntities(ClassLoader extensionClassloader, String extensi

ExtensionBuilder extensionBuilder = null;
try {
@SuppressWarnings("unchecked")
Class<ExtensionBuilder> clazz = (Class<ExtensionBuilder>) extensionClassloader
.loadClass(result.get(0).getCanonicalName());
extensionBuilder = clazz.newInstance();
Expand All @@ -86,9 +89,8 @@ public List<Entity> getEntities(ClassLoader extensionClassloader, String extensi
}

extensionBuilder.validateExtensionConfig(extensionName, configStream);
List<Entity> entities = extensionBuilder.getEntities(jobName, configStream);

return entities;
return extensionBuilder.getEntities(jobName, configStream);
}

public static List<Entity> loadAndPrepare(String extensionName, String jobName, InputStream configStream,
Expand All @@ -106,6 +108,10 @@ public static List<Entity> loadAndPrepare(String extensionName, String jobName,
public static List<Entity> prepare(String extensionName, String jobName, InputStream configStream, List<URL> urls)
throws IOException, FalconException {
ClassLoader extensionClassLoader = ExtensionClassLoader.load(urls);
if (extensionClassLoader.getResourceAsStream(EXTENSION_BUILDER_INTERFACE_SERVICE_FILE) == null) {
throw new FalconCLIException("The extension build time jars do not contain "
+ EXTENSION_BUILDER_INTERFACE_SERVICE_FILE);
}
ExtensionHandler extensionHandler = new ExtensionHandler();

return extensionHandler.getEntities(extensionClassLoader, extensionName, jobName, configStream);
Expand Down Expand Up @@ -145,7 +151,7 @@ private static String createStagePath(String extensionName, String jobName) {
return stagePath;
}

public static List<URL> copyExtensionPackage(String extensionBuildUrl, FileSystem fs, String stagePath)
private static List<URL> copyExtensionPackage(String extensionBuildUrl, FileSystem fs, String stagePath)
throws IOException {

Path libsPath = new Path(extensionBuildUrl, FalconExtensionConstants.LIBS);
Expand All @@ -167,7 +173,7 @@ public static List<URL> copyExtensionPackage(String extensionBuildUrl, FileSyste
return urls;
}

public static List<URL> getFilesInPath(URL fileURL) throws MalformedURLException {
static List<URL> getFilesInPath(URL fileURL) throws MalformedURLException {
List<URL> urls = new ArrayList<>();

File file = new File(fileURL.getPath());
Expand Down Expand Up @@ -199,8 +205,8 @@ public static String getExtensionLocation(String extensionName, JSONObject exten
try {
extensionBuildPath = extensionDetailJson.get(LOCATION).toString();
} catch (JSONException e) {
OUT.get().print("Error. " + extensionName + " not found ");
throw new FalconCLIException("Failed to get extension type for the given extension");
throw new FalconCLIException("Failed to get extension location for the given extension:" + extensionName,
e);
}
return extensionBuildPath;
}
Expand All @@ -210,8 +216,7 @@ public static String getExtensionType(String extensionName, JSONObject extensio
try {
extensionType = extensionDetailJson.get(TYPE).toString();
} catch (JSONException e) {
OUT.get().print("Error. " + extensionName + " not found ");
throw new FalconCLIException("Failed to get extension type for the given extension");
throw new FalconCLIException("Failed to get extension type for the given extension:" + extensionName, e);
}
return extensionType;
}
Expand Down
93 changes: 43 additions & 50 deletions client/src/main/java/org/apache/falcon/client/FalconClient.java
Expand Up @@ -84,57 +84,55 @@ public class FalconClient extends AbstractFalconClient {

public static final String WS_HEADER_PREFIX = "header:";
public static final String USER = System.getProperty("user.name");
public static final String AUTH_URL = "api/options?" + PseudoAuthenticator.USER_NAME + "=" + USER;
private static final String AUTH_URL = "api/options?" + PseudoAuthenticator.USER_NAME + "=" + USER;



public static final String PATH = "path";
public static final String COLO = "colo";
private static final String PATH = "path";
private static final String COLO = "colo";
private static final String KEY = "key";
private static final String VALUE = "value";
public static final String CLUSTER = "cluster";
public static final String RUN_ID = "runid";
public static final String FORCE = "force";
public static final String SHOW_SCHEDULER = "showScheduler";
public static final String ENTITY_NAME = "name";
public static final String ENTITY_TYPE = "type";
public static final String SKIP_DRYRUN = "skipDryRun";
public static final String FILTER_BY = "filterBy";
public static final String ORDER_BY = "orderBy";
public static final String SORT_ORDER = "sortOrder";
public static final String OFFSET = "offset";
public static final String NUM_RESULTS = "numResults";
public static final String START = "start";
public static final String END = "end";
public static final String INSTANCE_TIME = "instanceTime";
public static final String INSTANCE_STATUS = "instanceStatus";
public static final String PROPERTIES = "properties";
private static final String RUN_ID = "runid";
private static final String FORCE = "force";
private static final String SHOW_SCHEDULER = "showScheduler";
private static final String ENTITY_NAME = "name";
private static final String ENTITY_TYPE = "type";
private static final String SKIP_DRYRUN = "skipDryRun";
private static final String FILTER_BY = "filterBy";
private static final String ORDER_BY = "orderBy";
private static final String SORT_ORDER = "sortOrder";
private static final String OFFSET = "offset";
private static final String NUM_RESULTS = "numResults";
private static final String START = "start";
private static final String END = "end";
private static final String INSTANCE_TIME = "instanceTime";
private static final String INSTANCE_STATUS = "instanceStatus";
private static final String PROPERTIES = "properties";
private static final String FIELDS = "fields";
private static final String NAME_SUBSEQUENCE = "nameseq";
private static final String FILTER_TAGS = "tags";
private static final String TAG_KEYWORDS = "tagkeys";
private static final String LIFECYCLE = "lifecycle";
private static final String NUM_INSTANCES = "numInstances";
public static final String ALL_ATTEMPTS = "allAttempts";
private static final String ALL_ATTEMPTS = "allAttempts";




public static final String DO_AS_OPT = "doAs";
public static final String JOB_NAME_OPT = "jobName";
private static final String DO_AS_OPT = "doAs";
private static final String JOB_NAME_OPT = "jobName";
public static final String ENTITIES_OPT = "entities";
/**
* Name of the HTTP cookie used for the authentication token between the client and the server.
*/
public static final String AUTH_COOKIE = "hadoop.auth";
private static final String AUTH_COOKIE = "hadoop.auth";
private static final String AUTH_COOKIE_EQ = AUTH_COOKIE + "=";

private static final KerberosAuthenticator AUTHENTICATOR = new KerberosAuthenticator();
private static final String TEMPLATE_SUFFIX = "-template.xml";


private static final String PROPERTIES_SUFFIX = ".properties";
public static final HostnameVerifier ALL_TRUSTING_HOSTNAME_VERIFIER = new HostnameVerifier() {
private static final HostnameVerifier ALL_TRUSTING_HOSTNAME_VERIFIER = new HostnameVerifier() {
public boolean verify(String hostname, SSLSession sslSession) {
return true;
}
Expand Down Expand Up @@ -371,9 +369,8 @@ protected static enum ExtensionOperations {
}
}

public String notEmpty(String str, String name) {
private String notEmpty(String str, String name) {
if (str == null) {

throw new IllegalArgumentException(name + " cannot be null");
}
if (str.length() == 0) {
Expand Down Expand Up @@ -774,28 +771,28 @@ private ResourceBuilder path(String... paths) {
return this;
}

public ResourceBuilder addQueryParam(String paramName, Integer value) {
private ResourceBuilder addQueryParam(String paramName, Integer value) {
if (value != null) {
resource = resource.queryParam(paramName, value.toString());
}
return this;
}

public ResourceBuilder addQueryParam(String paramName, Boolean paramValue) {
private ResourceBuilder addQueryParam(String paramName, Boolean paramValue) {
if (paramValue != null) {
resource = resource.queryParam(paramName, String.valueOf(paramValue));
}
return this;
}

public ResourceBuilder addQueryParam(String paramName, String paramValue) {
private ResourceBuilder addQueryParam(String paramName, String paramValue) {
if (StringUtils.isNotBlank(paramValue)) {
resource = resource.queryParam(paramName, paramValue);
}
return this;
}

public ResourceBuilder addQueryParam(String paramName, List<LifeCycle> lifeCycles,
private ResourceBuilder addQueryParam(String paramName, List<LifeCycle> lifeCycles,
String type) {
if (lifeCycles != null) {
checkLifeCycleOption(lifeCycles, type);
Expand All @@ -812,7 +809,7 @@ private ClientResponse call(Entities entities) {
.method(entities.method, ClientResponse.class);
}

public ClientResponse call(AdminOperations operation) {
private ClientResponse call(AdminOperations operation) {
return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(operation.mimeType).type(MediaType.TEXT_XML)
.method(operation.method, ClientResponse.class);
Expand All @@ -824,37 +821,37 @@ private ClientResponse call(MetadataOperations metadataOperations) {
.method(metadataOperations.method, ClientResponse.class);
}

public ClientResponse call(Instances operation) {
private ClientResponse call(Instances operation) {
return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(operation.mimeType).type(MediaType.TEXT_XML)
.method(operation.method, ClientResponse.class);
}

public ClientResponse call(ExtensionOperations operation) {
private ClientResponse call(ExtensionOperations operation) {
return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(operation.mimeType).type(MediaType.TEXT_XML)
.method(operation.method, ClientResponse.class);
}

public ClientResponse call(Entities operation, InputStream entityStream) {
private ClientResponse call(Entities operation, InputStream entityStream) {
return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(operation.mimeType).type(MediaType.TEXT_XML)
.method(operation.method, ClientResponse.class, entityStream);
}

public ClientResponse call(Instances operation, InputStream entityStream) {
private ClientResponse call(Instances operation, InputStream entityStream) {
return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(operation.mimeType).type(MediaType.TEXT_XML)
.method(operation.method, ClientResponse.class, entityStream);
}

public ClientResponse call(ExtensionOperations operation, InputStream entityStream) {
private ClientResponse call(ExtensionOperations operation, InputStream entityStream) {
return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(operation.mimeType).type(MediaType.TEXT_XML)
.method(operation.method, ClientResponse.class, entityStream);
}

public ClientResponse call(ExtensionOperations submit, FormDataMultiPart formDataMultiPart) {
private ClientResponse call(ExtensionOperations submit, FormDataMultiPart formDataMultiPart) {
return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(submit.mimeType).type(MediaType.MULTIPART_FORM_DATA)
.method(submit.method, ClientResponse.class, formDataMultiPart);
Expand Down Expand Up @@ -902,7 +899,7 @@ private void checkLifeCycleOption(List<LifeCycle> lifeCycles, String type) {
}
}

protected void checkType(String type) {
private void checkType(String type) {
if (type == null || type.isEmpty()) {
throw new FalconCLIException("entity type is empty");
} else {
Expand Down Expand Up @@ -1036,7 +1033,7 @@ public String getExtensionJobDetails(final String jobName) {
return getResponse(String.class, clientResponse);
}

public ClientResponse getExtensionDetailResponse(final String extensionName) {
private ClientResponse getExtensionDetailResponse(final String extensionName) {
return new ResourceBuilder().path(ExtensionOperations.DETAIL.path, extensionName)
.call(ExtensionOperations.DETAIL);
}
Expand Down Expand Up @@ -1093,7 +1090,6 @@ private FormDataMultiPart getEntitiesForm(String extensionName, String jobName,
try {
formDataMultiPart.close();
} catch (IOException e) {
OUT.get().print("Submit failed. Failed to submit entities");
throw new FalconCLIException("Submit failed. Failed to submit entities", e);
}
return formDataMultiPart;
Expand All @@ -1104,9 +1100,8 @@ private List<Entity> validateExtensionAndGetEntities(String extensionName, Strin
JSONObject extensionDetailJson = getExtensionDetailJson(extensionName);
String extensionType = ExtensionHandler.getExtensionType(extensionName, extensionDetailJson);
String extensionBuildLocation = ExtensionHandler.getExtensionLocation(extensionName, extensionDetailJson);
List<Entity> entities = getEntities(extensionName, jobName, configStream, extensionType,
return getEntities(extensionName, jobName, configStream, extensionType,
extensionBuildLocation);
return entities;
}

private JSONObject getExtensionDetailJson(String extensionName) {
Expand All @@ -1115,8 +1110,7 @@ private JSONObject getExtensionDetailJson(String extensionName) {
try {
extensionDetailJson = new JSONObject(clientResponse.getEntity(String.class));
} catch (JSONException e) {
OUT.get().print("Failed to get details for the given extension");
throw new FalconCLIException("Failed to get details for the given extension");
throw new FalconCLIException("Failed to get details for the given extension", e);
}
return extensionDetailJson;
}
Expand All @@ -1129,12 +1123,10 @@ private List<Entity> getEntities(String extensionName, String jobName, InputStre
entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream,
extensionBuildLocation);
} catch (Exception e) {
OUT.get().println("Error in building the extension");
throw new FalconCLIException("Failed to prepare entities for the given extension");
throw new FalconCLIException("Error in building the extension", e);
}
if (entities == null || entities.isEmpty()) {
OUT.get().println("No entities got built");
throw new FalconCLIException("Failed to prepare entities for the given extension");
throw new FalconCLIException("No entities got built for the given extension");
}
}
return entities;
Expand All @@ -1145,6 +1137,7 @@ public APIResult submitAndScheduleExtensionJob(final String extensionName, final
FormDataMultiPart entitiesForm = getEntitiesForm(extensionName, jobName, configPath);
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.SUBMIT_AND_SCHEDULE.path, extensionName)
.addQueryParam(JOB_NAME_OPT, jobName)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.SUBMIT_AND_SCHEDULE, entitiesForm);
return getResponse(APIResult.class, clientResponse);
Expand Down

0 comments on commit 0cf9af6

Please sign in to comment.