Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepSamudrala committed Jan 17, 2017
2 parents a234d94 + ae510da commit 7e16263
Show file tree
Hide file tree
Showing 13 changed files with 377 additions and 196 deletions.
11 changes: 9 additions & 2 deletions cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ void extensionCommand(CommandLine commandLine, FalconClient client) throws IOExc
result = client.updateExtensionJob(jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.validateExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) {
Expand All @@ -139,10 +140,12 @@ void extensionCommand(CommandLine commandLine, FalconClient client) throws IOExc
result = client.scheduleExtensionJob(jobName, colo, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SUSPEND_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.suspendExtensionJob(jobName, doAsUser).getMessage();
colo = getColo(colo);
result = client.suspendExtensionJob(jobName, colo, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.RESUME_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.resumeExtensionJob(jobName, doAsUser).getMessage();
colo = getColo(colo);
result = client.resumeExtensionJob(jobName, colo, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.DELETE_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.deleteExtensionJob(jobName, doAsUser).getMessage();
Expand Down Expand Up @@ -198,6 +201,8 @@ Options createExtensionOptions() {
Option suspend = new Option(FalconCLIConstants.SUSPEND_OPT, false, "Suspend an extension job");
Option resume = new Option(FalconCLIConstants.RESUME_OPT, false, "Resume an extension job");
Option delete = new Option(FalconCLIConstants.DELETE_OPT, false, "Delete an extension job");
Option enable = new Option(FalconCLIConstants.ENABLE_OPT, false, "Enable an extension");
Option disable = new Option(FalconCLIConstants.DISABLE_OPT, false, "Disable an extension");
Option unregister = new Option(FalconCLIConstants.UREGISTER, false, "Un-register an extension. This will make"
+ " the extension unavailable for instantiation");
Option detail = new Option(FalconCLIConstants.DETAIL, false, "Show details of a given extension");
Expand All @@ -223,6 +228,8 @@ Options createExtensionOptions() {
group.addOption(unregister);
group.addOption(detail);
group.addOption(register);
group.addOption(enable);
group.addOption(disable);
extensionOptions.addOptionGroup(group);

Option url = new Option(FalconCLIConstants.URL_OPTION, true, "Falcon URL");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,25 @@ public abstract APIResult submitAndScheduleExtensionJob(String extensionName, St
* @return APIResult status of the deletion query.
*/
public abstract APIResult deleteExtensionJob(final String jobName, final String doAsUser);

/**
*
* @param jobName name of the extension that has to be suspended.
* @param coloExpr comma separated list of colos where the operation has to be performed.
* @param doAsUser proxy user
* @return result status of the suspend operation.
*/
public abstract APIResult suspendExtensionJob(final String jobName, final String coloExpr, final String doAsUser);

/**
*
* @param jobName name of the extension that has to be resumed.
* @param coloExpr comma separated list of colos where the operation has to be performed.
* @param doAsUser proxy user.
* @return result status of the resume operation.
*/
public abstract APIResult resumeExtensionJob(final String jobName, final String coloExpr, final String doAsUser);

/**
* Prepares set of entities the extension has implemented to validate the extension job.
* @param jobName job name of the extension job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ private FalconCLIConstants(){
public static final String UPDATE_OPT = "update";
public static final String UPDATE_CLUSTER_DEPENDENTS_OPT = "updateClusterDependents";
public static final String DELETE_OPT = "delete";
public static final String ENABLE_OPT = "enable";
public static final String DISABLE_OPT = "disable";
public static final String SCHEDULE_OPT = "schedule";
public static final String CURRENT_COLO = "current.colo";
public static final String CLIENT_PROPERTIES = "/client.properties";
Expand Down
33 changes: 23 additions & 10 deletions client/src/main/java/org/apache/falcon/client/FalconClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.sun.jersey.client.urlconnection.HTTPSProperties;
import com.sun.jersey.multipart.FormDataMultiPart;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.net.util.TrustManagerUtils;
import org.apache.falcon.LifeCycle;
Expand Down Expand Up @@ -52,6 +53,8 @@
import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
Expand Down Expand Up @@ -81,6 +84,7 @@
public class FalconClient extends AbstractFalconClient {

public static final AtomicReference<PrintStream> OUT = new AtomicReference<>(System.out);
public static final Logger LOG = LoggerFactory.getLogger(FalconClient.class);

public static final String WS_HEADER_PREFIX = "header:";
public static final String USER = System.getProperty("user.name");
Expand Down Expand Up @@ -187,6 +191,7 @@ public FalconClient(String falconUrl, Properties properties) {
client.resource(UriBuilder.fromUri(baseUrl).build());
authenticationToken = getToken(baseUrl);
} catch (Exception e) {
LOG.error("Unable to initialize Falcon Client object. Cause : ", e);
throw new FalconCLIException("Unable to initialize Falcon Client object. Cause : " + e.getMessage(), e);
}
}
Expand Down Expand Up @@ -341,8 +346,8 @@ protected static enum AdminOperations {
protected static enum ExtensionOperations {

ENUMERATE("api/extension/enumerate/", HttpMethod.GET, MediaType.TEXT_XML),
DESCRIBE("api/extension/describe/", HttpMethod.GET, MediaType.TEXT_PLAIN),
DEFINITION("api/extension/definition", HttpMethod.GET, MediaType.APPLICATION_JSON),
DESCRIBE("api/extension/describe/", HttpMethod.GET, MediaType.TEXT_XML),
DEFINITION("api/extension/definition", HttpMethod.GET, MediaType.TEXT_XML),
LIST("api/extension/list", HttpMethod.GET, MediaType.APPLICATION_JSON),
INSTANCES("api/extension/instances", HttpMethod.GET, MediaType.APPLICATION_JSON),
SUBMIT("api/extension/submit", HttpMethod.POST, MediaType.TEXT_XML),
Expand All @@ -354,8 +359,8 @@ protected static enum ExtensionOperations {
RESUME("api/extension/resume", HttpMethod.POST, MediaType.TEXT_XML),
DELETE("api/extension/delete", HttpMethod.POST, MediaType.TEXT_XML),
UNREGISTER("api/extension/unregister/", HttpMethod.POST, MediaType.TEXT_XML),
DETAIL("api/extension/detail/", HttpMethod.GET, MediaType.APPLICATION_JSON),
JOB_DETAILS("api/extension/extensionJobDetails/", HttpMethod.GET, MediaType.APPLICATION_JSON),
DETAIL("api/extension/detail/", HttpMethod.GET, MediaType.TEXT_XML),
JOB_DETAILS("api/extension/extensionJobDetails/", HttpMethod.GET, MediaType.TEXT_XML),
REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_XML),
ENABLE("api/extension/enable", HttpMethod.POST, MediaType.TEXT_XML),
DISABLE("api/extension/disable", HttpMethod.POST, MediaType.TEXT_XML);
Expand Down Expand Up @@ -1107,7 +1112,8 @@ private FormDataMultiPart getEntitiesForm(String extensionName, String jobName,
try {
formDataMultiPart.close();
} catch (IOException e) {
throw new FalconCLIException("Submit failed. Failed to submit entities", e);
LOG.error("Submit failed. Failed to submit entities. Cause: ", e);
throw new FalconCLIException("Submit failed. Failed to submit entities:" + e.getMessage(), e);
}
return formDataMultiPart;
}
Expand All @@ -1132,17 +1138,20 @@ private JSONObject getExtensionDetailJson(String extensionName) {
try {
extensionDetailJson = new JSONObject(getResponse(APIResult.class, clientResponse).getMessage());
} catch (JSONException e) {
throw new FalconCLIException("Failed to get details for the given extension", e);
LOG.error("Failed to get details for the given extension. Cause: ", e);
throw new FalconCLIException("Failed to get details for the given extension:" + e.getMessage(), e);
}
return extensionDetailJson;
}

private JSONObject getExtensionJobDetailJson(String jobName) {
ClientResponse clientResponse = getExtensionJobDetailsResponse(jobName);
JSONObject extensionJobDetailJson;
try {
extensionJobDetailJson = new JSONObject(getResponse(APIResult.class, clientResponse).getMessage());
} catch (JSONException e) {
throw new FalconCLIException("Failed to get details for the given extension", e);
LOG.error("Failed to get details for the given extension. Cause: ", e);
throw new FalconCLIException("Failed to get details for the given extension:" + e.getMessage(), e);
}
return extensionJobDetailJson;
}
Expand All @@ -1155,7 +1164,9 @@ private List<Entity> getEntities(String extensionName, String jobName, InputStre
entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream,
extensionBuildLocation);
} catch (Exception e) {
throw new FalconCLIException("Error in building the extension", e);
LOG.error("Error in building the extension. Cause: ", e);
OUT.get().println("Error in building the extension:" + ExceptionUtils.getFullStackTrace(e));
throw new FalconCLIException("Error in building the extension:" + e.getMessage(), e);
}
if (entities == null || entities.isEmpty()) {
throw new FalconCLIException("No entities got built for the given extension");
Expand Down Expand Up @@ -1209,17 +1220,19 @@ public APIResult scheduleExtensionJob(String jobName, final String coloExpr, fin
return getResponse(APIResult.class, clientResponse);
}

public APIResult suspendExtensionJob(final String jobName, final String doAsUser) {
public APIResult suspendExtensionJob(final String jobName, final String coloExpr, final String doAsUser) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.SUSPEND.path, jobName)
.addQueryParam(COLO, coloExpr)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.SUSPEND);
return getResponse(APIResult.class, clientResponse);
}

public APIResult resumeExtensionJob(final String jobName, final String doAsUser) {
public APIResult resumeExtensionJob(final String jobName, final String coloExpr, final String doAsUser) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.RESUME.path, jobName)
.addQueryParam(COLO, coloExpr)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.RESUME);
return getResponse(APIResult.class, clientResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.extensions.store.ExtensionStore;
Expand All @@ -37,8 +34,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
Expand All @@ -59,9 +54,9 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
private static final String LAST_UPDATE_TIME = "lastUpdatedTime";

public static final String NAME = "name";
protected static final String EXTENSION_TYPE = "type";
protected static final String EXTENSION_DESC = "description";
protected static final String EXTENSION_LOCATION = "location";
private static final String EXTENSION_TYPE = "type";
private static final String EXTENSION_DESC = "description";
private static final String EXTENSION_LOCATION = "location";

protected static void validateExtensionName(final String extensionName) {
if (StringUtils.isBlank(extensionName)) {
Expand Down Expand Up @@ -114,28 +109,14 @@ public APIResult deleteExtensionMetadata(String extensionName){
}
}

protected SortedMap<EntityType, List<Entity>> getJobEntities(ExtensionJobsBean extensionJobsBean)
throws FalconException, IOException {
TreeMap<EntityType, List<Entity>> entityMap = new TreeMap<>();
List<String> processes = extensionJobsBean.getProcesses();
List<String> feeds = extensionJobsBean.getFeeds();
entityMap.put(EntityType.PROCESS, getEntities(processes, EntityType.PROCESS));
entityMap.put(EntityType.FEED, getEntities(feeds, EntityType.FEED));
protected SortedMap<EntityType, List<String>> getJobEntities(ExtensionJobsBean extensionJobsBean)
throws FalconException {
TreeMap<EntityType, List<String>> entityMap = new TreeMap<>();
entityMap.put(EntityType.PROCESS, extensionJobsBean.getProcesses());
entityMap.put(EntityType.FEED, extensionJobsBean.getFeeds());
return entityMap;
}

private List<Entity> getEntities(List<String> entityNames, EntityType entityType) throws FalconException {
List<Entity> entities = new ArrayList<>();
for (String entityName : entityNames) {
try {
entities.add(EntityUtil.getEntity(entityType, entityName));
} catch (EntityNotRegisteredException e) {
LOG.error("Entity {} not found during deletion nothing to do", entityName);
}
}
return entities;
}

private JSONObject buildExtensionJobDetailResult(final String jobName) throws FalconException {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean jobsBean = metaStore.getExtensionJobDetails(jobName);
Expand Down Expand Up @@ -174,7 +155,7 @@ public static String getJobNameFromTag(String tags) {
return tags.substring(nameStart, nameEnd);
}

public String disableExtension(String extensionName, String currentUser) {
protected String disableExtension(String extensionName, String currentUser) {
validateExtensionName(extensionName);
try {
return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.DISABLED);
Expand All @@ -183,7 +164,7 @@ public String disableExtension(String extensionName, String currentUser) {
}
}

public String enableExtension(String extensionName, String currentUser) {
protected String enableExtension(String extensionName, String currentUser) {
validateExtensionName(extensionName);
try {
return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.ENABLED);
Expand Down Expand Up @@ -232,4 +213,23 @@ private static JSONArray buildEnumerateResult() throws FalconException {
}
return results;
}

protected static void checkIfExtensionIsEnabled(String extensionName) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
if (!metaStore.getDetail(extensionName).getStatus().equals(ExtensionStatus.ENABLED)) {
LOG.error("Extension: " + extensionName + " is in disabled state.");
throw FalconWebException.newAPIException("Extension: " + extensionName + " is in disabled state.",
Response.Status.INTERNAL_SERVER_ERROR);
}
}

protected static void checkIfExtensionJobNameExists(String jobName, String extensionName) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
if (extensionJobsBean != null && !extensionJobsBean.getExtensionName().equals(extensionName)) {
LOG.error("Extension job with name: " + extensionName + " already exists.");
throw FalconWebException.newAPIException("Extension job with name: " + extensionName + " already exists.",
Response.Status.INTERNAL_SERVER_ERROR);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,55 @@ protected APIResult doExecute(String colo) throws FalconException {
return results;
}

APIResult proxySchedule(final String type, final String entity, final String coloExpr,
final Boolean skipDryRun, final String properties,
final HttpServletRequest bufferedRequest) {
return new EntityProxy(type, entity) {
@Override
protected Set<String> getColosToApply() {
return getColosFromExpression(coloExpr, type, entity);
}

@Override
protected APIResult doExecute(String colo) throws FalconException {
return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity,
colo, skipDryRun, properties);
}
}.execute();
}

APIResult proxySuspend(final String type, final String entity, final String coloExpr,
final HttpServletRequest bufferedRequest) {
return new EntityProxy(type, entity) {
@Override
protected Set<String> getColosToApply() {
return getColosFromExpression(coloExpr, type, entity);
}

@Override
protected APIResult doExecute(String colo) throws FalconException {
return getEntityManager(colo).invoke("suspend", bufferedRequest, type, entity,
colo);
}
}.execute();
}

APIResult proxyResume(final String type, final String entity, final String coloExpr,
final HttpServletRequest bufferedRequest) {
return new EntityProxy(type, entity) {
@Override
protected Set<String> getColosToApply() {
return getColosFromExpression(coloExpr, type, entity);
}

@Override
protected APIResult doExecute(String colo) throws FalconException {
return getEntityManager(colo).invoke("resume", bufferedRequest, type, entity,
colo);
}
}.execute();
}

Map<String, APIResult> proxyUpdate(final String type, final String entityName, final Boolean skipDryRun,
final HttpServletRequest bufferedRequest, Entity newEntity) {
final Set<String> oldColos = getApplicableColos(type, entityName);
Expand Down
Loading

0 comments on commit 7e16263

Please sign in to comment.