diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java index 2a105dc5b..60578d018 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java @@ -139,10 +139,12 @@ void extensionCommand(CommandLine commandLine, FalconClient client) throws IOExc result = client.scheduleExtensionJob(jobName, colo, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.SUSPEND_OPT)) { validateRequiredParameter(jobName, JOB_NAME_OPT); - result = client.suspendExtensionJob(jobName, doAsUser).getMessage(); + colo = getColo(colo); + result = client.suspendExtensionJob(jobName, colo, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.RESUME_OPT)) { validateRequiredParameter(jobName, JOB_NAME_OPT); - result = client.resumeExtensionJob(jobName, doAsUser).getMessage(); + colo = getColo(colo); + result = client.resumeExtensionJob(jobName, colo, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.DELETE_OPT)) { validateRequiredParameter(jobName, JOB_NAME_OPT); result = client.deleteExtensionJob(jobName, doAsUser).getMessage(); diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index 7b8a606e7..49392c257 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -255,6 +255,25 @@ public abstract APIResult submitAndScheduleExtensionJob(String extensionName, St * @return APIResult status of the deletion query. */ public abstract APIResult deleteExtensionJob(final String jobName, final String doAsUser); + + /** + * + * @param jobName name of the extension that has to be suspended. + * @param coloExpr comma separated list of colos where the operation has to be performed. + * @param doAsUser proxy user + * @return result status of the suspend operation. + */ + public abstract APIResult suspendExtensionJob(final String jobName, final String coloExpr, final String doAsUser); + + /** + * + * @param jobName name of the extension that has to be resumed. + * @param coloExpr comma separated list of colos where the operation has to be performed. + * @param doAsUser proxy user. + * @return result status of the resume operation. + */ + public abstract APIResult resumeExtensionJob(final String jobName, final String coloExpr, final String doAsUser); + /** * Prepares set of entities the extension has implemented to validate the extension job. * @param jobName job name of the extension job. diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 277208560..cf457ea77 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -1209,17 +1209,19 @@ public APIResult scheduleExtensionJob(String jobName, final String coloExpr, fin return getResponse(APIResult.class, clientResponse); } - public APIResult suspendExtensionJob(final String jobName, final String doAsUser) { + public APIResult suspendExtensionJob(final String jobName, final String coloExpr, final String doAsUser) { ClientResponse clientResponse = new ResourceBuilder() .path(ExtensionOperations.SUSPEND.path, jobName) + .addQueryParam(COLO, coloExpr) .addQueryParam(DO_AS_OPT, doAsUser) .call(ExtensionOperations.SUSPEND); return getResponse(APIResult.class, clientResponse); } - public APIResult resumeExtensionJob(final String jobName, final String doAsUser) { + public APIResult resumeExtensionJob(final String jobName, final String coloExpr, final String doAsUser) { ClientResponse clientResponse = new ResourceBuilder() .path(ExtensionOperations.RESUME.path, jobName) + .addQueryParam(COLO, coloExpr) .addQueryParam(DO_AS_OPT, doAsUser) .call(ExtensionOperations.RESUME); return getResponse(APIResult.class, clientResponse); diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java index 63bf1b655..ff8968262 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java @@ -20,11 +20,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; -import org.apache.falcon.entity.EntityNotRegisteredException; -import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.parser.ValidationException; import org.apache.falcon.extensions.ExtensionStatus; -import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.extensions.store.ExtensionStore; @@ -37,8 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.core.Response; -import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.SortedMap; import java.util.TreeMap; @@ -59,9 +54,9 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager { private static final String LAST_UPDATE_TIME = "lastUpdatedTime"; public static final String NAME = "name"; - protected static final String EXTENSION_TYPE = "type"; - protected static final String EXTENSION_DESC = "description"; - protected static final String EXTENSION_LOCATION = "location"; + private static final String EXTENSION_TYPE = "type"; + private static final String EXTENSION_DESC = "description"; + private static final String EXTENSION_LOCATION = "location"; protected static void validateExtensionName(final String extensionName) { if (StringUtils.isBlank(extensionName)) { @@ -114,28 +109,14 @@ public APIResult deleteExtensionMetadata(String extensionName){ } } - protected SortedMap> getJobEntities(ExtensionJobsBean extensionJobsBean) - throws FalconException, IOException { - TreeMap> entityMap = new TreeMap<>(); - List processes = extensionJobsBean.getProcesses(); - List feeds = extensionJobsBean.getFeeds(); - entityMap.put(EntityType.PROCESS, getEntities(processes, EntityType.PROCESS)); - entityMap.put(EntityType.FEED, getEntities(feeds, EntityType.FEED)); + protected SortedMap> getJobEntities(ExtensionJobsBean extensionJobsBean) + throws FalconException { + TreeMap> entityMap = new TreeMap<>(); + entityMap.put(EntityType.PROCESS, extensionJobsBean.getProcesses()); + entityMap.put(EntityType.FEED, extensionJobsBean.getFeeds()); return entityMap; } - private List getEntities(List entityNames, EntityType entityType) throws FalconException { - List entities = new ArrayList<>(); - for (String entityName : entityNames) { - try { - entities.add(EntityUtil.getEntity(entityType, entityName)); - } catch (EntityNotRegisteredException e) { - LOG.error("Entity {} not found during deletion nothing to do", entityName); - } - } - return entities; - } - private JSONObject buildExtensionJobDetailResult(final String jobName) throws FalconException { ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); ExtensionJobsBean jobsBean = metaStore.getExtensionJobDetails(jobName); @@ -174,7 +155,7 @@ public static String getJobNameFromTag(String tags) { return tags.substring(nameStart, nameEnd); } - public String disableExtension(String extensionName, String currentUser) { + protected String disableExtension(String extensionName, String currentUser) { validateExtensionName(extensionName); try { return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.DISABLED); @@ -183,7 +164,7 @@ public String disableExtension(String extensionName, String currentUser) { } } - public String enableExtension(String extensionName, String currentUser) { + protected String enableExtension(String extensionName, String currentUser) { validateExtensionName(extensionName); try { return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.ENABLED); diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java index ae0a61ae7..7d00442ef 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java @@ -120,6 +120,55 @@ protected APIResult doExecute(String colo) throws FalconException { return results; } + APIResult proxySchedule(final String type, final String entity, final String coloExpr, + final Boolean skipDryRun, final String properties, + final HttpServletRequest bufferedRequest) { + return new EntityProxy(type, entity) { + @Override + protected Set getColosToApply() { + return getColosFromExpression(coloExpr, type, entity); + } + + @Override + protected APIResult doExecute(String colo) throws FalconException { + return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity, + colo, skipDryRun, properties); + } + }.execute(); + } + + APIResult proxySuspend(final String type, final String entity, final String coloExpr, + final HttpServletRequest bufferedRequest) { + return new EntityProxy(type, entity) { + @Override + protected Set getColosToApply() { + return getColosFromExpression(coloExpr, type, entity); + } + + @Override + protected APIResult doExecute(String colo) throws FalconException { + return getEntityManager(colo).invoke("suspend", bufferedRequest, type, entity, + colo); + } + }.execute(); + } + + APIResult proxyResume(final String type, final String entity, final String coloExpr, + final HttpServletRequest bufferedRequest) { + return new EntityProxy(type, entity) { + @Override + protected Set getColosToApply() { + return getColosFromExpression(coloExpr, type, entity); + } + + @Override + protected APIResult doExecute(String colo) throws FalconException { + return getEntityManager(colo).invoke("resume", bufferedRequest, type, entity, + colo); + } + }.execute(); + } + Map proxyUpdate(final String type, final String entityName, final Boolean skipDryRun, final HttpServletRequest bufferedRequest, Entity newEntity) { final Set oldColos = getApplicableColos(type, entityName); diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java index 6f75dc776..8733170a9 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java @@ -205,11 +205,11 @@ public APIResult schedule(@PathParam("job-name") String jobName, Response.Status.NOT_FOUND); } - SortedMap> entityMap; + SortedMap> entityMap; try { entityMap = getJobEntities(extensionJobsBean); scheduleEntities(entityMap, request, coloExpr); - } catch (FalconException | IOException | JAXBException e) { + } catch (FalconException e) { LOG.error("Error while scheduling entities of the extension: " + jobName + ": ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } @@ -221,27 +221,47 @@ public APIResult schedule(@PathParam("job-name") String jobName, @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) public APIResult suspend(@PathParam("job-name") String jobName, - @DefaultValue("") @QueryParam("doAs") String doAsUser) { + @Context HttpServletRequest request, + @DefaultValue("") @QueryParam("doAs") String doAsUser, + @QueryParam("colo") final String coloExpr) { checkIfExtensionServiceIsEnabled(); + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); + if (extensionJobsBean == null) { + // return failure if the extension job doesn't exist + LOG.error("Extension Job not found:" + jobName); + throw FalconWebException.newAPIException("ExtensionJob not found:" + jobName, + Response.Status.NOT_FOUND); + } + try { - List entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser); - if (entities.isEmpty()) { - // return failure if the extension job doesn't exist - return new APIResult(APIResult.Status.FAILED, "Extension job " + jobName + " doesn't exist."); + SortedMap> entityNameMap = getJobEntities(extensionJobsBean); + suspendEntities(entityNameMap, coloExpr, request); + } catch (FalconException e) { + LOG.error("Error while suspending entities of the extension: " + jobName + ": ", e); + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " suspended successfully"); + } + + private void suspendEntities(SortedMap> entityNameMap, String coloExpr, + final HttpServletRequest request) throws FalconException { + HttpServletRequest bufferedRequest = new BufferedRequest(request); + for (Map.Entry> entityTypeEntry : entityNameMap.entrySet()) { + for (final String entityName : entityTypeEntry.getValue()) { + entityProxyUtil.proxySuspend(entityTypeEntry.getKey().name(), entityName, coloExpr, bufferedRequest); } + } + } - for (Entity entity : entities) { - if (entity.getEntityType().isSchedulable()) { - if (getWorkflowEngine(entity).isActive(entity)) { - getWorkflowEngine(entity).suspend(entity); - } - } + private void resumeEntities(SortedMap> entityNameMap, String coloExpr, + final HttpServletRequest request) throws FalconException { + HttpServletRequest bufferedRequest = new BufferedRequest(request); + for (Map.Entry> entityTypeEntry : entityNameMap.entrySet()) { + for (final String entityName : entityTypeEntry.getValue()) { + entityProxyUtil.proxyResume(entityTypeEntry.getKey().name(), entityName, coloExpr, bufferedRequest); } - } catch (FalconException | IOException e) { - LOG.error("Error when scheduling extension job: " + jobName + ": ", e); - throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } - return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " suspended successfully"); } @POST @@ -249,24 +269,23 @@ public APIResult suspend(@PathParam("job-name") String jobName, @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) public APIResult resume(@PathParam("job-name") String jobName, + @Context HttpServletRequest request, + @QueryParam("colo") final String coloExpr, @DefaultValue("") @QueryParam("doAs") String doAsUser) { checkIfExtensionServiceIsEnabled(); + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); + if (extensionJobsBean == null) { + // return failure if the extension job doesn't exist + LOG.error("Extension Job not found:" + jobName); + throw FalconWebException.newAPIException("ExtensionJob not found:" + jobName, + Response.Status.NOT_FOUND); + } try { - List entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser); - if (entities.isEmpty()) { - // return failure if the extension job doesn't exist - return new APIResult(APIResult.Status.FAILED, "Extension job " + jobName + " doesn't exist."); - } - - for (Entity entity : entities) { - if (entity.getEntityType().isSchedulable()) { - if (getWorkflowEngine(entity).isSuspended(entity)) { - getWorkflowEngine(entity).resume(entity); - } - } - } - } catch (FalconException | IOException e) { - LOG.error("Error when resuming extension job " + jobName + ": ", e); + SortedMap> entityNameMap = getJobEntities(extensionJobsBean); + resumeEntities(entityNameMap, coloExpr, request); + } catch (FalconException e) { + LOG.error("Error while resuming entities of the extension: " + jobName + ": ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " resumed successfully"); @@ -288,11 +307,11 @@ public APIResult delete(@PathParam("job-name") String jobName, "Extension job " + jobName + " doesn't exist. Nothing to delete."); } - SortedMap> entityMap; + SortedMap> entityMap; try { entityMap = getJobEntities(extensionJobsBean); deleteEntities(entityMap, request); - } catch (FalconException | IOException | JAXBException e) { + } catch (FalconException e) { LOG.error("Error when deleting extension job: " + jobName + ": ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } @@ -388,10 +407,13 @@ public APIResult submitAndSchedule( checkIfExtensionIsEnabled(extensionName); checkIfExtensionJobExists(jobName, extensionName); SortedMap> entityMap; + SortedMap> entityNameMap; + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); try { entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config); submitEntities(extensionName, jobName, entityMap, config, request); - scheduleEntities(entityMap, request, coloExpr); + entityNameMap = getJobEntities(metaStore.getExtensionJobDetails(jobName)); + scheduleEntities(entityNameMap, request, coloExpr); } catch (FalconException | IOException | JAXBException e) { LOG.error("Error while submitting extension job: ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); @@ -399,27 +421,13 @@ public APIResult submitAndSchedule( return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled successfully"); } - private void scheduleEntities(Map> entityMap, HttpServletRequest request, String coloExpr) - throws FalconException, JAXBException, IOException { - for (Map.Entry> entry : entityMap.entrySet()) { - for (final Entity entity : entry.getValue()) { - final HttpServletRequest httpServletRequest = getEntityStream(entity, entity.getEntityType(), request); - final HttpServletRequest bufferedRequest = getBufferedRequest(httpServletRequest); - final Set colos = getColosFromExpression(coloExpr, entity.getEntityType().name(), entity); - - new EntityProxy(entity.getEntityType().toString(), entity.getName()) { - @Override - protected Set getColosToApply() { - return colos; - } - - @Override - protected APIResult doExecute(String colo) throws FalconException { - return new EntityProxyUtil().getEntityManager(colo).invoke("schedule", bufferedRequest, - entity.getEntityType().toString(), - entity.getName(), colo, Boolean.FALSE, ""); - } - }.execute(); + private void scheduleEntities(SortedMap> entityMap, HttpServletRequest request, + String coloExpr) throws FalconException { + HttpServletRequest bufferedRequest = new BufferedRequest(request); + for (Map.Entry> entityTypeEntry : entityMap.entrySet()) { + for (final String entityName : entityTypeEntry.getValue()) { + entityProxyUtil.proxySchedule(entityTypeEntry.getKey().name(), entityName, coloExpr, + Boolean.FALSE, "", bufferedRequest); } } } @@ -431,16 +439,14 @@ private BufferedRequest getBufferedRequest(HttpServletRequest request) { return new BufferedRequest(request); } - private void deleteEntities(SortedMap> entityMap, HttpServletRequest request) - throws IOException, JAXBException { - for (Map.Entry> entry : entityMap.entrySet()) { - for (final Entity entity : entry.getValue()) { - final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request); - final String entityType = entity.getEntityType().toString(); - final String entityName = entity.getName(); - entityProxyUtil.proxyDelete(entityType, entityName, bufferedRequest); + private void deleteEntities(SortedMap> entityMap, HttpServletRequest request) + throws FalconException { + for (Map.Entry> entityTypeEntry : entityMap.entrySet()) { + for (final String entityName : entityTypeEntry.getValue()) { + HttpServletRequest bufferedRequest = new BufferedRequest(request); + entityProxyUtil.proxyDelete(entityTypeEntry.getKey().name(), entityName, bufferedRequest); if (!embeddedMode) { - super.delete(bufferedRequest, entityType, entityName, currentColo); + super.delete(bufferedRequest, entityTypeEntry.getKey().name(), entityName, currentColo); } } } diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java index 8f41c483f..5b5d690fb 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java @@ -469,18 +469,7 @@ public APIResult schedule(@Context final HttpServletRequest request, @QueryParam("properties") final String properties) { final HttpServletRequest bufferedRequest = getBufferedRequest(request); - return new EntityProxy(type, entity) { - @Override - protected Set getColosToApply() { - return getColosFromExpression(coloExpr, type, entity); - } - - @Override - protected APIResult doExecute(String colo) throws FalconException { - return entityProxyUtil.getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity, - colo, skipDryRun, properties); - } - }.execute(); + return entityProxyUtil.proxySchedule(type, entity, coloExpr, skipDryRun, properties, bufferedRequest); } /** @@ -531,22 +520,11 @@ public APIResult suspend(@Context final HttpServletRequest request, @Dimension("colo") @QueryParam("colo") final String coloExpr) { final HttpServletRequest bufferedRequest = new BufferedRequest(request); - return new EntityProxy(type, entity) { - @Override - protected Set getColosToApply() { - return getColosFromExpression(coloExpr, type, entity); - } - - @Override - protected APIResult doExecute(String colo) throws FalconException { - return entityProxyUtil.getEntityManager(colo).invoke("suspend", bufferedRequest, type, entity, - colo); - } - }.execute(); + return entityProxyUtil.proxySuspend(type, entity, coloExpr, bufferedRequest); } /** - * Resume a supended entity. + * Resume a suspended entity. * @param request Servlet Request * @param type Valid options are feed or process. * @param entity Name of the entity. @@ -564,18 +542,7 @@ public APIResult resume( @Dimension("colo") @QueryParam("colo") final String coloExpr) { final HttpServletRequest bufferedRequest = new BufferedRequest(request); - return new EntityProxy(type, entity) { - @Override - protected Set getColosToApply() { - return getColosFromExpression(coloExpr, type, entity); - } - - @Override - protected APIResult doExecute(String colo) throws FalconException { - return entityProxyUtil.getEntityManager(colo).invoke("resume", bufferedRequest, type, entity, - colo); - } - }.execute(); + return entityProxyUtil.proxyResume(type, entity, coloExpr, bufferedRequest); } //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 6a65d2c28..d76dbcacd 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -77,14 +77,14 @@ public class FalconUnitClient extends AbstractFalconClient { private static final String DEFAULT_ORDER_BY = "status"; private static final String DEFAULT_SORTED_ORDER = "asc"; - protected ConfigurationStore configStore; + private ConfigurationStore configStore; private AbstractWorkflowEngine workflowEngine; private LocalSchedulableEntityManager localSchedulableEntityManager; private LocalInstanceManager localInstanceManager; private LocalExtensionManager localExtensionManager; - public FalconUnitClient() throws FalconException { + FalconUnitClient() throws FalconException { configStore = ConfigurationStore.get(); workflowEngine = WorkflowEngineFactory.getWorkflowEngine(); localSchedulableEntityManager = new LocalSchedulableEntityManager(); @@ -123,7 +123,6 @@ public APIResult submit(String type, String filePath, String doAsUser) { * @param entityName entity name * @param cluster cluster on which it has to be scheduled * @return - * @throws FalconException */ @Override public APIResult schedule(EntityType entityType, String entityName, String cluster, @@ -377,6 +376,23 @@ public APIResult deleteExtensionJob(String jobName, String doAsUser) { } } + @Override + public APIResult suspendExtensionJob(String jobName, String coloExpr, String doAsUser) { + try { + return localExtensionManager.suspendExtensionJob(jobName, coloExpr, doAsUser); + } catch (FalconException e) { + throw new FalconCLIException("Failed in suspending the extension job:" + jobName); + } + } + + @Override + public APIResult resumeExtensionJob(String jobName, String coloExpr, String doAsUser) { + try { + return localExtensionManager.resumeExtensionJob(jobName, coloExpr, doAsUser); + } catch (FalconException e) { + throw new FalconCLIException("Failed in resuming the extension job:" + jobName); + } + } @Override public APIResult getExtensionJobDetails(final String jobName) { diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java index 20ccfcaab..a32dbfa53 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java @@ -39,7 +39,7 @@ /** * A proxy implementation of the extension operations in local mode. */ -public class LocalExtensionManager extends AbstractExtensionManager { +class LocalExtensionManager extends AbstractExtensionManager { LocalExtensionManager() {} APIResult submitExtensionJob(String extensionName, String jobName, InputStream configStream, @@ -99,10 +99,10 @@ APIResult scheduleExtensionJob(String jobName, String coloExpr, String doAsUser) throws FalconException, IOException{ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); - SortedMap> entityMap = getJobEntities(extensionJobsBean); - for (Map.Entry> entry : entityMap.entrySet()) { - for (Entity entity : entry.getValue()) { - scheduleInternal(entity.getEntityType().name(), entity.getName(), true, null); + SortedMap> entityMap = getJobEntities(extensionJobsBean); + for (Map.Entry> entry : entityMap.entrySet()) { + for (String entityName : entry.getValue()) { + scheduleInternal(entry.getKey().name(), entityName, true, null); } } return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " scheduled successfully"); @@ -111,10 +111,10 @@ APIResult scheduleExtensionJob(String jobName, String coloExpr, String doAsUser) APIResult deleteExtensionJob(String jobName) throws FalconException, IOException { ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); - SortedMap> entityMap = getJobEntities(extensionJobsBean); - for (Map.Entry> entry : entityMap.entrySet()) { - for (Entity entity : entry.getValue()) { - delete(entity.getEntityType().name(), entity.getName(), null); + SortedMap> entityMap = getJobEntities(extensionJobsBean); + for (Map.Entry> entry : entityMap.entrySet()) { + for (String entityName : entry.getValue()) { + delete(entry.getKey().name(), entityName, null); } } ExtensionStore.getMetaStore().deleteExtensionJob(jobName); @@ -148,6 +148,30 @@ APIResult updateExtensionJob(String extensionName, String jobName, InputStream c return new APIResult(APIResult.Status.SUCCEEDED, "Updated successfully"); } + APIResult suspendExtensionJob(String jobName, String coloExpr, String doAsUser) throws FalconException { + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); + SortedMap> entityMap = getJobEntities(extensionJobsBean); + for (Map.Entry> entityTypeEntry : entityMap.entrySet()) { + for (String entityName : entityTypeEntry.getValue()) { + super.suspend(null, entityTypeEntry.getKey().name(), entityName, coloExpr); + } + } + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " suspended successfully"); + } + + APIResult resumeExtensionJob(String jobName, String coloExpr, String doAsUser) throws FalconException { + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); + SortedMap> entityMap = getJobEntities(extensionJobsBean); + for (Map.Entry> entityTypeEntry : entityMap.entrySet()) { + for (String entityName : entityTypeEntry.getValue()) { + super.resume(null, entityTypeEntry.getKey().name(), entityName, coloExpr); + } + } + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " suspended successfully"); + } + APIResult registerExtensionMetadata(String extensionName, String packagePath, String description) { return super.registerExtensionMetadata(extensionName, packagePath, description, CurrentUser.getUser()); } diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index 508a7bb02..5717fc2a7 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -438,6 +438,8 @@ public void testExtensionJobOperations() throws Exception { clearDB(); submitCluster(); createExtensionPackage(); + createDir(PROCESS_APP_PATH); + fs.copyFromLocalFile(new Path(getAbsolutePath(WORKFLOW)), new Path(PROCESS_APP_PATH, "workflow.xml")); String packageBuildLib = new Path(EXTENSION_PATH, "libs/build/").toString(); String result = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION); Assert.assertEquals(result, "Extension :testExtension registered successfully."); @@ -454,6 +456,14 @@ public void testExtensionJobOperations() throws Exception { apiResult = getClient().scheduleExtensionJob(TEST_JOB, null, null); assertStatus(apiResult); + + apiResult = getClient().suspendExtensionJob(TEST_JOB, null, null); + assertStatus(apiResult); + apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false); + Assert.assertEquals(apiResult.getMessage(), "SUSPENDED"); + + apiResult = getClient().resumeExtensionJob(TEST_JOB, null, null); + assertStatus(apiResult); apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false); assertStatus(apiResult); Assert.assertEquals(apiResult.getMessage(), "RUNNING"); @@ -487,8 +497,13 @@ public void testExtensionJobOperations() throws Exception { } } + @Test + public void testExtensionJobSuspendAndResume() throws Exception { + + } + - void copyExtensionJar(String destDirPath) throws IOException { + private void copyExtensionJar(String destDirPath) throws IOException { File dir = new File(new Path(JARS_DIR).toUri().toURL().getPath()); for (File file : dir.listFiles()) { if (file.toString().endsWith(".jar")) { diff --git a/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java b/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java index 3a6c9c003..ac05b0f86 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java +++ b/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java @@ -130,6 +130,28 @@ public APIResult deleteExtensionMetadata( + "on Prism."); } + @POST + @Path("suspend/{job-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) + public APIResult suspend(@PathParam("job-name") String jobName, + @DefaultValue("") @QueryParam("doAs") String doAsUser) { + LOG.error("Suspend of an extension job is not supported on Server.Please run your operation on Prism "); + throw FalconWebException.newAPIException("Suspend of an extension job is not supported on Server." + + "Please run your operation on Prism."); + } + + @POST + @Path("resume/{job-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) + public APIResult resume(@PathParam("job-name") String jobName, + @DefaultValue("") @QueryParam("doAs") String doAsUser) { + LOG.error("Resume of an extension job is not supported on Server.Please run your operation on Prism "); + throw FalconWebException.newAPIException("Resume of an extension job is not supported on Server." + + "Please run your operation on Prism."); + } + @GET @Path("definition/{extension-name}") @Produces({MediaType.APPLICATION_JSON})