Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepSamudrala committed Dec 7, 2016
2 parents f96a084 + c79e5e4 commit e0ad358
Show file tree
Hide file tree
Showing 27 changed files with 500 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
}else if (optionsList.contains(DETAIL_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.getExtensionDetail(extensionName);
result = prettyPrintJson(result);
} else if (optionsList.contains(FalconCLIConstants.SUBMIT_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.submitExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(REGISTER_OPT)) {
Expand All @@ -103,7 +105,7 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
} else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.submitAndScheduleExtensionJob(extensionName, filePath, doAsUser).getMessage();
result = client.submitAndScheduleExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
Expand Down
5 changes: 5 additions & 0 deletions client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@
<artifactId>jersey-json</artifactId>
</dependency>

<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-multipart</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions client/src/main/java/org/apache/falcon/ExtensionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ private static void stageEntities(List<Entity> entities, String stagePath) {
type = entity.getEntityType();
OutputStream out;
try {
entityFile = new File(stagePath + File.separator + entity.getEntityType().toString() + "_"
+ URLEncoder.encode(entity.getName(), UTF_8));
entityFile = new File(new Path(stagePath + File.separator + entity.getEntityType().toString() + "_"
+ URLEncoder.encode(entity.getName(), UTF_8)).toUri().toURL().getPath());
if (!entityFile.createNewFile()) {
LOG.debug("Not able to stage the entities in the tmp path");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.falcon.resource.TriageResult;

import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
Expand Down Expand Up @@ -178,6 +180,22 @@ public abstract APIResult getStatus(EntityType entityType, String entityName, St
public abstract APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun, String doAsUser,
String properties);

/**
* Registers an extension.
* @param extensionName extensionName of the extension.
* @param packagePath Package location for the extension.
* @param description description of the extension.
* @return Result of the registerExtension command.
*/
public abstract String registerExtension(String extensionName, String packagePath, String description);

/**
*
* @param extensionName extensionName that needs to be unregistered
* @return Result of the unregisterExtension operation
*/
public abstract String unregisterExtension(String extensionName);

/**
* Prepare set of entities the extension has implemented and stage them to a local directory and submit them too.
* @param extensionName extension which is available in the store.
Expand All @@ -190,6 +208,19 @@ public abstract APIResult submitAndSchedule(String entityType, String filePath,
public abstract APIResult submitExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser);

/**
* Prepare set of entities the extension has implemented and stage them to a local directory and submits and
* schedules them.
* @param extensionName extension which is available in the store.
* @param jobName name to be used in all the extension entities' tagging that are built as part of
* loadAndPrepare.
* @param configPath path to extension parameters.
* @return
* @throws FalconCLIException
*/
public abstract APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser);

/**
*
* Get list of the entities.
Expand Down Expand Up @@ -468,6 +499,26 @@ protected InputStream getServletInputStream(String clusters, String sourceCluste
return (buffer.length() == 0) ? null : stream;
}

/**
* Converts a InputStream into ServletInputStream.
*
* @param filePath - Path of file to stream
* @return ServletInputStream
*/
protected InputStream getServletInputStream(String filePath) {

if (filePath == null) {
return null;
}
InputStream stream;
try {
stream = new FileInputStream(filePath);
} catch (FileNotFoundException e) {
throw new FalconCLIException("File not found:", e);
}
return stream;
}

public abstract SchedulableEntityInstanceResult getFeedSlaMissPendingAlerts(String entityType, String entityName,
String start, String end, String colo);

Expand Down
137 changes: 80 additions & 57 deletions client/src/main/java/org/apache/falcon/client/FalconClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.client.urlconnection.HTTPSProperties;
import com.sun.jersey.core.header.FormDataContentDisposition;
import com.sun.jersey.multipart.FormDataBodyPart;
import com.sun.jersey.multipart.FormDataMultiPart;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.net.util.TrustManagerUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.ExtensionHandler;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.DateValidator;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
Expand Down Expand Up @@ -64,8 +65,6 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -124,6 +123,7 @@ public class FalconClient extends AbstractFalconClient {


public static final String DO_AS_OPT = "doAs";
public static final String JOB_NAME_OPT = "jobName";
public static final String ENTITIES_OPT = "entities";
/**
* Name of the HTTP cookie used for the authentication token between the client and the server.
Expand Down Expand Up @@ -747,26 +747,6 @@ public String getDimensionRelations(String dimensionType, String dimensionName,
return sendMetadataDiscoveryRequest(MetadataOperations.RELATIONS, dimensionType, dimensionName, null, doAsUser);
}

/**
* Converts a InputStream into ServletInputStream.
*
* @param filePath - Path of file to stream
* @return ServletInputStream
*/
private InputStream getServletInputStream(String filePath) {

if (filePath == null) {
return null;
}
InputStream stream;
try {
stream = new FileInputStream(filePath);
} catch (FileNotFoundException e) {
throw new FalconCLIException("File not found:", e);
}
return stream;
}

private <T> T getResponse(Class<T> clazz, ClientResponse clientResponse) {
printClientResponse(clientResponse);
checkIfSuccessful(clientResponse);
Expand Down Expand Up @@ -872,6 +852,12 @@ public ClientResponse call(ExtensionOperations operation, InputStream entityStre
.accept(operation.mimeType).type(MediaType.TEXT_XML)
.method(operation.method, ClientResponse.class, entityStream);
}

public ClientResponse call(ExtensionOperations submit, FormDataMultiPart formDataMultiPart) {
return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(submit.mimeType).type(MediaType.MULTIPART_FORM_DATA)
.method(submit.method, ClientResponse.class, formDataMultiPart);
}
}

public FeedLookupResult reverseLookUp(String type, String path, String doAsUser) {
Expand Down Expand Up @@ -1040,14 +1026,17 @@ public String unregisterExtension(final String extensionName) {
}

public String getExtensionDetail(final String extensionName) {
ClientResponse clientResponse = new ResourceBuilder().path(ExtensionOperations.DETAIL.path, extensionName)
return getResponse(String.class, getExtensionDetailResponse(extensionName));
}

public ClientResponse getExtensionDetailResponse(final String extensionName) {
return new ResourceBuilder().path(ExtensionOperations.DETAIL.path, extensionName)
.call(ExtensionOperations.DETAIL);
return getResponse(String.class, clientResponse);
}

public String registerExtension(final String extensionName, final String path, final String description) {
public String registerExtension(final String extensionName, final String packagePath, final String description) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.REGISTER.path, extensionName).addQueryParam(PATH, path)
.path(ExtensionOperations.REGISTER.path, extensionName).addQueryParam(PATH, packagePath)
.addQueryParam(FalconCLIConstants.DESCRIPTION, description)
.call(ExtensionOperations.REGISTER);
return getResponse(String.class, clientResponse);
Expand All @@ -1070,55 +1059,89 @@ public String getExtensionDescription(final String extensionName) {
@Override
public APIResult submitExtensionJob(final String extensionName, final String jobName, final String configPath,
final String doAsUser) {
FormDataMultiPart entitiesForm = getEntitiesForm(extensionName, jobName, configPath);
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.DETAIL.path)
.call(ExtensionOperations.DETAIL);
JSONObject responseJson = clientResponse.getEntity(JSONObject.class);
ExtensionType extensionType;
.path(ExtensionOperations.SUBMIT.path, extensionName)
.addQueryParam(DO_AS_OPT, doAsUser)
.addQueryParam(JOB_NAME_OPT, jobName)
.call(ExtensionOperations.SUBMIT, entitiesForm);
return getResponse(APIResult.class, clientResponse);
}

private FormDataMultiPart getEntitiesForm(String extensionName, String jobName, String configPath) {
InputStream configStream = getServletInputStream(configPath);
List<Entity> entities = validateExtensionAndGetEntities(extensionName, jobName, configStream);
FormDataMultiPart formDataMultiPart = new FormDataMultiPart();

for (Entity entity : entities) {
if (EntityType.FEED.equals(entity.getEntityType())) {
formDataMultiPart.field("feeds", entity, MediaType.APPLICATION_XML_TYPE);
} else if (EntityType.PROCESS.equals(entity.getEntityType())) {
formDataMultiPart.field("processes", entity, MediaType.APPLICATION_XML_TYPE);
}
}

formDataMultiPart.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("config").build(), configStream,
MediaType.APPLICATION_OCTET_STREAM_TYPE));
try {
formDataMultiPart.close();
} catch (IOException e) {
OUT.get().print("Submit failed. Failed to submit entities");
throw new FalconCLIException("Submit failed. Failed to submit entities", e);
}
return formDataMultiPart;
}

private List<Entity> validateExtensionAndGetEntities(String extensionName, String jobName,
InputStream configStream) {
ClientResponse clientResponse = getExtensionDetailResponse(extensionName);
List<Entity> entities = getEntities(extensionName, jobName, configStream, clientResponse);
return entities;
}

private List<Entity> getEntities(String extensionName, String jobName, InputStream configStream,
ClientResponse clientResponse) {
JSONObject responseJson;
try {
responseJson = new JSONObject(clientResponse.getEntity(String.class));
} catch (JSONException e) {
OUT.get().print("Submit failed. Failed to get details for the given extension");
throw new FalconCLIException("Submit failed. Failed to get details for the given extension");
}
String extensionType;
String extensionBuildLocation;
try {
JSONObject extensionDetailsJson = new JSONObject(responseJson.get("detail").toString());
extensionType = ExtensionType.valueOf(extensionDetailsJson.get("type").toString().toUpperCase());
extensionBuildLocation = extensionDetailsJson.get("location").toString();
extensionType = responseJson.get("type").toString();
extensionBuildLocation = responseJson.get("location").toString();
} catch (JSONException e) {
OUT.get().print("Error. " + extensionName + " not found ");
return null;
throw new FalconCLIException("Submit failed. Failed to get details for the given extension");
}
InputStream configStream = getServletInputStream(configPath);

List<Entity> entities;
if (extensionType.equals(ExtensionType.CUSTOM)) {
List<Entity> entities = null;
if (!extensionType.equals(ExtensionType.CUSTOM.name())) {
try {
entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream, extensionBuildLocation);
entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream,
extensionBuildLocation);
} catch (Exception e) {
OUT.get().println("Error in building the extension");
return null;
throw new FalconCLIException("Submit failed. Failed to get details for the given extension");
}
if (entities == null || entities.isEmpty()) {
OUT.get().println("No entities got built");
return null;
}
try {
EntityUtil.applyTags(extensionName, jobName, entities);
} catch (FalconException e) {
OUT.get().println("Error in applying tags to generated entities");
throw new FalconCLIException("Submit failed. Failed to get details for the given extension");
}
}

clientResponse = new ResourceBuilder()
.path(ExtensionOperations.SUBMIT.path, extensionName)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.SUBMIT, configStream);
return getResponse(APIResult.class, clientResponse);
return entities;
}

public APIResult submitAndScheduleExtensionJob(final String extensionName, final String filePath,
final String doAsUser) {
InputStream entityStream = getServletInputStream(filePath);
public APIResult submitAndScheduleExtensionJob(final String extensionName, final String jobName,
final String configPath, final String doAsUser) {
FormDataMultiPart entitiesForm = getEntitiesForm(extensionName, jobName, configPath);
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.SUBMIT_AND_SCHEDULE.path, extensionName)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.SUBMIT_AND_SCHEDULE, entityStream);
.call(ExtensionOperations.SUBMIT_AND_SCHEDULE, entitiesForm);
return getResponse(APIResult.class, clientResponse);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public enum ExtensionType {

private final String text;

private ExtensionType(final String text) {
ExtensionType(final String text) {
this.text = text;
}
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ public Boolean checkIfExtensionExists(String extensionName) {
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_EXTENSION);
q.setParameter(EXTENSION_NAME, extensionName);
if (q.getResultList().size() > 0){
int resultSize = 0;
try {
resultSize = q.getResultList().size();
} finally {
commitAndCloseTransaction(entityManager);
}
if (resultSize > 0){
return true;
}
return false;
Expand Down
Loading

0 comments on commit e0ad358

Please sign in to comment.