Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
PracheerAgarwal committed Jan 6, 2017
2 parents 066c8e2 + 8608665 commit e3728d5
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,12 @@ void extensionCommand(CommandLine commandLine, FalconClient client) throws IOExc
result = client.scheduleExtensionJob(jobName, colo, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SUSPEND_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.suspendExtensionJob(jobName, doAsUser).getMessage();
colo = getColo(colo);
result = client.suspendExtensionJob(jobName, colo, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.RESUME_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.resumeExtensionJob(jobName, doAsUser).getMessage();
colo = getColo(colo);
result = client.resumeExtensionJob(jobName, colo, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.DELETE_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.deleteExtensionJob(jobName, doAsUser).getMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,25 @@ public abstract APIResult submitAndScheduleExtensionJob(String extensionName, St
* @return APIResult status of the deletion query.
*/
public abstract APIResult deleteExtensionJob(final String jobName, final String doAsUser);

/**
*
* @param jobName name of the extension that has to be suspended.
* @param coloExpr comma separated list of colos where the operation has to be performed.
* @param doAsUser proxy user
* @return result status of the suspend operation.
*/
public abstract APIResult suspendExtensionJob(final String jobName, final String coloExpr, final String doAsUser);

/**
*
* @param jobName name of the extension that has to be resumed.
* @param coloExpr comma separated list of colos where the operation has to be performed.
* @param doAsUser proxy user.
* @return result status of the resume operation.
*/
public abstract APIResult resumeExtensionJob(final String jobName, final String coloExpr, final String doAsUser);

/**
* Prepares set of entities the extension has implemented to validate the extension job.
* @param jobName job name of the extension job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1209,17 +1209,19 @@ public APIResult scheduleExtensionJob(String jobName, final String coloExpr, fin
return getResponse(APIResult.class, clientResponse);
}

public APIResult suspendExtensionJob(final String jobName, final String doAsUser) {
public APIResult suspendExtensionJob(final String jobName, final String coloExpr, final String doAsUser) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.SUSPEND.path, jobName)
.addQueryParam(COLO, coloExpr)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.SUSPEND);
return getResponse(APIResult.class, clientResponse);
}

public APIResult resumeExtensionJob(final String jobName, final String doAsUser) {
public APIResult resumeExtensionJob(final String jobName, final String coloExpr, final String doAsUser) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.RESUME.path, jobName)
.addQueryParam(COLO, coloExpr)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.RESUME);
return getResponse(APIResult.class, clientResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.extensions.store.ExtensionStore;
Expand All @@ -37,8 +34,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
Expand All @@ -59,9 +54,9 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
private static final String LAST_UPDATE_TIME = "lastUpdatedTime";

public static final String NAME = "name";
protected static final String EXTENSION_TYPE = "type";
protected static final String EXTENSION_DESC = "description";
protected static final String EXTENSION_LOCATION = "location";
private static final String EXTENSION_TYPE = "type";
private static final String EXTENSION_DESC = "description";
private static final String EXTENSION_LOCATION = "location";

protected static void validateExtensionName(final String extensionName) {
if (StringUtils.isBlank(extensionName)) {
Expand Down Expand Up @@ -114,28 +109,14 @@ public APIResult deleteExtensionMetadata(String extensionName){
}
}

protected SortedMap<EntityType, List<Entity>> getJobEntities(ExtensionJobsBean extensionJobsBean)
throws FalconException, IOException {
TreeMap<EntityType, List<Entity>> entityMap = new TreeMap<>();
List<String> processes = extensionJobsBean.getProcesses();
List<String> feeds = extensionJobsBean.getFeeds();
entityMap.put(EntityType.PROCESS, getEntities(processes, EntityType.PROCESS));
entityMap.put(EntityType.FEED, getEntities(feeds, EntityType.FEED));
protected SortedMap<EntityType, List<String>> getJobEntities(ExtensionJobsBean extensionJobsBean)
throws FalconException {
TreeMap<EntityType, List<String>> entityMap = new TreeMap<>();
entityMap.put(EntityType.PROCESS, extensionJobsBean.getProcesses());
entityMap.put(EntityType.FEED, extensionJobsBean.getFeeds());
return entityMap;
}

private List<Entity> getEntities(List<String> entityNames, EntityType entityType) throws FalconException {
List<Entity> entities = new ArrayList<>();
for (String entityName : entityNames) {
try {
entities.add(EntityUtil.getEntity(entityType, entityName));
} catch (EntityNotRegisteredException e) {
LOG.error("Entity {} not found during deletion nothing to do", entityName);
}
}
return entities;
}

private JSONObject buildExtensionJobDetailResult(final String jobName) throws FalconException {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean jobsBean = metaStore.getExtensionJobDetails(jobName);
Expand Down Expand Up @@ -174,7 +155,7 @@ public static String getJobNameFromTag(String tags) {
return tags.substring(nameStart, nameEnd);
}

public String disableExtension(String extensionName, String currentUser) {
protected String disableExtension(String extensionName, String currentUser) {
validateExtensionName(extensionName);
try {
return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.DISABLED);
Expand All @@ -183,7 +164,7 @@ public String disableExtension(String extensionName, String currentUser) {
}
}

public String enableExtension(String extensionName, String currentUser) {
protected String enableExtension(String extensionName, String currentUser) {
validateExtensionName(extensionName);
try {
return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.ENABLED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,55 @@ protected APIResult doExecute(String colo) throws FalconException {
return results;
}

APIResult proxySchedule(final String type, final String entity, final String coloExpr,
final Boolean skipDryRun, final String properties,
final HttpServletRequest bufferedRequest) {
return new EntityProxy(type, entity) {
@Override
protected Set<String> getColosToApply() {
return getColosFromExpression(coloExpr, type, entity);
}

@Override
protected APIResult doExecute(String colo) throws FalconException {
return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity,
colo, skipDryRun, properties);
}
}.execute();
}

APIResult proxySuspend(final String type, final String entity, final String coloExpr,
final HttpServletRequest bufferedRequest) {
return new EntityProxy(type, entity) {
@Override
protected Set<String> getColosToApply() {
return getColosFromExpression(coloExpr, type, entity);
}

@Override
protected APIResult doExecute(String colo) throws FalconException {
return getEntityManager(colo).invoke("suspend", bufferedRequest, type, entity,
colo);
}
}.execute();
}

APIResult proxyResume(final String type, final String entity, final String coloExpr,
final HttpServletRequest bufferedRequest) {
return new EntityProxy(type, entity) {
@Override
protected Set<String> getColosToApply() {
return getColosFromExpression(coloExpr, type, entity);
}

@Override
protected APIResult doExecute(String colo) throws FalconException {
return getEntityManager(colo).invoke("resume", bufferedRequest, type, entity,
colo);
}
}.execute();
}

Map<String, APIResult> proxyUpdate(final String type, final String entityName, final Boolean skipDryRun,
final HttpServletRequest bufferedRequest, Entity newEntity) {
final Set<String> oldColos = getApplicableColos(type, entityName);
Expand Down
Loading

0 comments on commit e3728d5

Please sign in to comment.