Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
PracheerAgarwal-zz committed Jan 7, 2017
2 parents b20f044 + 8608665 commit fda3b28
Show file tree
Hide file tree
Showing 16 changed files with 313 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,12 @@ void extensionCommand(CommandLine commandLine, FalconClient client) throws IOExc
result = client.scheduleExtensionJob(jobName, colo, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SUSPEND_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.suspendExtensionJob(jobName, doAsUser).getMessage();
colo = getColo(colo);
result = client.suspendExtensionJob(jobName, colo, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.RESUME_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.resumeExtensionJob(jobName, doAsUser).getMessage();
colo = getColo(colo);
result = client.resumeExtensionJob(jobName, colo, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.DELETE_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.deleteExtensionJob(jobName, doAsUser).getMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,25 @@ public abstract APIResult submitAndScheduleExtensionJob(String extensionName, St
* @return APIResult status of the deletion query.
*/
public abstract APIResult deleteExtensionJob(final String jobName, final String doAsUser);

/**
*
* @param jobName name of the extension that has to be suspended.
* @param coloExpr comma separated list of colos where the operation has to be performed.
* @param doAsUser proxy user
* @return result status of the suspend operation.
*/
public abstract APIResult suspendExtensionJob(final String jobName, final String coloExpr, final String doAsUser);

/**
*
* @param jobName name of the extension that has to be resumed.
* @param coloExpr comma separated list of colos where the operation has to be performed.
* @param doAsUser proxy user.
* @return result status of the resume operation.
*/
public abstract APIResult resumeExtensionJob(final String jobName, final String coloExpr, final String doAsUser);

/**
* Prepares set of entities the extension has implemented to validate the extension job.
* @param jobName job name of the extension job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1209,17 +1209,19 @@ public APIResult scheduleExtensionJob(String jobName, final String coloExpr, fin
return getResponse(APIResult.class, clientResponse);
}

public APIResult suspendExtensionJob(final String jobName, final String doAsUser) {
public APIResult suspendExtensionJob(final String jobName, final String coloExpr, final String doAsUser) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.SUSPEND.path, jobName)
.addQueryParam(COLO, coloExpr)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.SUSPEND);
return getResponse(APIResult.class, clientResponse);
}

public APIResult resumeExtensionJob(final String jobName, final String doAsUser) {
public APIResult resumeExtensionJob(final String jobName, final String coloExpr, final String doAsUser) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.RESUME.path, jobName)
.addQueryParam(COLO, coloExpr)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.RESUME);
return getResponse(APIResult.class, clientResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
@Entity
@NamedQueries({
@NamedQuery(name = PersistenceConstants.GET_ALL_BACKLOG_INSTANCES, query = "select OBJECT(a) from BacklogMetricBean a "),
@NamedQuery(name = PersistenceConstants.DELETE_BACKLOG_METRIC_INSTANCE, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType")
@NamedQuery(name = PersistenceConstants.DELETE_BACKLOG_METRIC_INSTANCE, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
@NamedQuery(name = PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.entityType = :entityType")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import java.util.Map;

/**
* Backlog Metric Store for entitties.
* Backlog Metric Store for entities' backlog instances.
*/
public class BacklogMetricStore {

Expand Down Expand Up @@ -70,18 +70,19 @@ public synchronized void deleteMetricInstance(String entityName, String cluster,
q.setParameter("clusterName", cluster);
q.setParameter("nominalTime", nominalTime);
q.setParameter("entityType", entityType.name());
try{
try {
q.executeUpdate();
} finally {
commitAndCloseTransaction(entityManager);
}
}

public void deleteEntityInstance(String entityName){
public void deleteEntityBackLogInstances(String entityName, String entityType) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES);
q.setParameter("entityName", entityName);
q.setParameter("entityType", entityType);
try {
q.executeUpdate();
} finally {
Expand Down Expand Up @@ -110,7 +111,7 @@ public Map<Entity, List<MetricInfo>> getAllInstances() throws FalconException {
if (CollectionUtils.isEmpty(result)) {
return null;
}
} finally{
} finally {
entityManager.close();
}

Expand All @@ -121,7 +122,7 @@ public Map<Entity, List<MetricInfo>> getAllInstances() throws FalconException {
if (!backlogMetrics.containsKey(entity)) {
backlogMetrics.put(entity, new ArrayList<MetricInfo>());
}
List<MetricInfo> metrics = backlogMetrics.get(entity);
List<MetricInfo> metrics = backlogMetrics.get(entity);
MetricInfo metricInfo = new MetricInfo(BacklogMetricEmitterService.DATE_FORMAT.get()
.format(backlogMetricBean.getNominalTime()),
backlogMetricBean.getClusterName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.extensions.store.ExtensionStore;
Expand All @@ -37,8 +34,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
Expand All @@ -59,9 +54,9 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
private static final String LAST_UPDATE_TIME = "lastUpdatedTime";

public static final String NAME = "name";
protected static final String EXTENSION_TYPE = "type";
protected static final String EXTENSION_DESC = "description";
protected static final String EXTENSION_LOCATION = "location";
private static final String EXTENSION_TYPE = "type";
private static final String EXTENSION_DESC = "description";
private static final String EXTENSION_LOCATION = "location";

protected static void validateExtensionName(final String extensionName) {
if (StringUtils.isBlank(extensionName)) {
Expand Down Expand Up @@ -114,28 +109,14 @@ public APIResult deleteExtensionMetadata(String extensionName){
}
}

protected SortedMap<EntityType, List<Entity>> getJobEntities(ExtensionJobsBean extensionJobsBean)
throws FalconException, IOException {
TreeMap<EntityType, List<Entity>> entityMap = new TreeMap<>();
List<String> processes = extensionJobsBean.getProcesses();
List<String> feeds = extensionJobsBean.getFeeds();
entityMap.put(EntityType.PROCESS, getEntities(processes, EntityType.PROCESS));
entityMap.put(EntityType.FEED, getEntities(feeds, EntityType.FEED));
protected SortedMap<EntityType, List<String>> getJobEntities(ExtensionJobsBean extensionJobsBean)
throws FalconException {
TreeMap<EntityType, List<String>> entityMap = new TreeMap<>();
entityMap.put(EntityType.PROCESS, extensionJobsBean.getProcesses());
entityMap.put(EntityType.FEED, extensionJobsBean.getFeeds());
return entityMap;
}

private List<Entity> getEntities(List<String> entityNames, EntityType entityType) throws FalconException {
List<Entity> entities = new ArrayList<>();
for (String entityName : entityNames) {
try {
entities.add(EntityUtil.getEntity(entityType, entityName));
} catch (EntityNotRegisteredException e) {
LOG.error("Entity {} not found during deletion nothing to do", entityName);
}
}
return entities;
}

private JSONObject buildExtensionJobDetailResult(final String jobName) throws FalconException {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean jobsBean = metaStore.getExtensionJobDetails(jobName);
Expand Down Expand Up @@ -174,7 +155,7 @@ public static String getJobNameFromTag(String tags) {
return tags.substring(nameStart, nameEnd);
}

public String disableExtension(String extensionName, String currentUser) {
protected String disableExtension(String extensionName, String currentUser) {
validateExtensionName(extensionName);
try {
return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.DISABLED);
Expand All @@ -183,7 +164,7 @@ public String disableExtension(String extensionName, String currentUser) {
}
}

public String enableExtension(String extensionName, String currentUser) {
protected String enableExtension(String extensionName, String currentUser) {
validateExtensionName(extensionName);
try {
return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.ENABLED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,55 @@ protected APIResult doExecute(String colo) throws FalconException {
return results;
}

APIResult proxySchedule(final String type, final String entity, final String coloExpr,
final Boolean skipDryRun, final String properties,
final HttpServletRequest bufferedRequest) {
return new EntityProxy(type, entity) {
@Override
protected Set<String> getColosToApply() {
return getColosFromExpression(coloExpr, type, entity);
}

@Override
protected APIResult doExecute(String colo) throws FalconException {
return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity,
colo, skipDryRun, properties);
}
}.execute();
}

APIResult proxySuspend(final String type, final String entity, final String coloExpr,
final HttpServletRequest bufferedRequest) {
return new EntityProxy(type, entity) {
@Override
protected Set<String> getColosToApply() {
return getColosFromExpression(coloExpr, type, entity);
}

@Override
protected APIResult doExecute(String colo) throws FalconException {
return getEntityManager(colo).invoke("suspend", bufferedRequest, type, entity,
colo);
}
}.execute();
}

APIResult proxyResume(final String type, final String entity, final String coloExpr,
final HttpServletRequest bufferedRequest) {
return new EntityProxy(type, entity) {
@Override
protected Set<String> getColosToApply() {
return getColosFromExpression(coloExpr, type, entity);
}

@Override
protected APIResult doExecute(String colo) throws FalconException {
return getEntityManager(colo).invoke("resume", bufferedRequest, type, entity,
colo);
}
}.execute();
}

Map<String, APIResult> proxyUpdate(final String type, final String entityName, final Boolean skipDryRun,
final HttpServletRequest bufferedRequest, Entity newEntity) {
final Set<String> oldColos = getApplicableColos(type, entityName);
Expand Down
Loading

0 comments on commit fda3b28

Please sign in to comment.