Skip to content

Commit

Permalink
FALCON-2204 Change mode for falcon_merge_pr.py to executable
Browse files Browse the repository at this point in the history
This pull request changes the permissions to be executable.

Author: Ajay Yadava <ajayyadava@apache.org>

Reviewers: @pallavi-rao, @sandeepSamudrala

Closes apache#310 from ajayyadava/2204
  • Loading branch information
ajayyadava authored and Pallavi Rao committed Dec 1, 2016
1 parent 1f28bde commit 37cb056
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 88 deletions.
Expand Up @@ -41,7 +41,7 @@ public enum WorkflowExecutionArgs {
DATASOURCE_NAME("datasource", "name of the datasource", false),

// who
WORKFLOW_USER("workflowUser", "user who owns the feed instance (partition)"),
WORKFLOW_USER("workflowUser", "user who ran the instance"),

// what
// workflow details
Expand Down
Empty file modified falcon_merge_pr.py 100644 → 100755
Empty file.
Expand Up @@ -22,6 +22,7 @@
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.process.EngineType;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -91,6 +92,12 @@ public int run(WorkflowExecutionContext context) {
context.getWorkflowId(), context.getWorkflowStatus());
return 0;
}
String instanceOwner = context.getWorkflowUser();
if (StringUtils.isNotBlank(instanceOwner)) {
CurrentUser.authenticate(instanceOwner);
} else {
CurrentUser.authenticate(System.getProperty("user.name"));
}
OozieClient client = new OozieClient(engineUrl);
WorkflowJob jobInfo;
try {
Expand Down
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.falcon.jdbc;

import org.apache.commons.collections.CollectionUtils;

import org.apache.falcon.FalconException;
import org.apache.falcon.persistence.MonitoredEntityBean;
import org.apache.falcon.persistence.PendingInstanceBean;
Expand Down Expand Up @@ -188,14 +186,7 @@ public List<PendingInstanceBean> getAllPendingInstances(){
EntityManager entityManager = getEntityManager();
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES);
List result = q.getResultList();

try {
if (CollectionUtils.isEmpty(result)) {
return null;
}
} finally{
entityManager.close();
}
entityManager.close();
return result;
}

Expand Down
Expand Up @@ -164,27 +164,32 @@ public static void validateSlaParams(String entityType, String entityName, Strin
* @param endStr
*/
public SchedulableEntityInstanceResult getEntitySLAMissPendingAlerts(String entityName, String entityType,
String startStr, String endStr, String colo) {

String startStr, String endStr,
String colo) {
Set<SchedulableEntityInstance> instances = new HashSet<>();
String resultMessage = "Success!";
try {
checkColo(colo);
Date start = EntityUtil.parseDateUTC(startStr);
Date end = (endStr == null) ? new Date() : EntityUtil.parseDateUTC(endStr);

if (StringUtils.isBlank(entityName)) {
instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(start, end));
instances = EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(start, end);
} else {
for (String clusterName : DeploymentUtil.getCurrentClusters()) {
instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(entityName,
clusterName, start, end, entityType));
String status = getStatusString(EntityUtil.getEntity(entityType, entityName));
if (status.equals(EntityStatus.RUNNING.name())) {
for (String clusterName : DeploymentUtil.getCurrentClusters()) {
instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(entityName,
clusterName, start, end, entityType));
}
} else {
resultMessage = entityName + " is " + status;
}
}
} catch (FalconException e) {
throw FalconWebException.newAPIException(e);
}
SchedulableEntityInstanceResult result = new SchedulableEntityInstanceResult(APIResult.Status.SUCCEEDED,
"Success!");
resultMessage);
result.setCollection(instances.toArray());
return result;
}
Expand Down
Expand Up @@ -42,6 +42,7 @@
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -80,6 +81,9 @@ public final class BacklogMetricEmitterService implements FalconService,
private static MetricNotificationService metricNotificationService =
Services.get().getService(MetricNotificationService.SERVICE_NAME);

private static final List<LifeCycle> PROCESS_LIFE_CYCLE =
Arrays.asList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));

public static BacklogMetricEmitterService get() {
return SERVICE;
}
Expand Down Expand Up @@ -149,7 +153,7 @@ public void onChange(Entity oldEntity, Entity newEntity) throws FalconException{
for(Cluster cluster : process.getClusters().getClusters()){
dropMetric(cluster.getName(), process);
}
}else{
} else {
addToBacklog(newEntity);
}
}
Expand Down Expand Up @@ -412,7 +416,7 @@ public void run() {
continue;
}
InstancesResult status = wfEngine.getStatus(entity, nominalTime,
nominalTime, null, null);
new Date(nominalTime.getTime() + 200), PROCESS_LIFE_CYCLE, false);
if (status.getInstances().length > 0
&& status.getInstances()[0].status == InstancesResult.
WorkflowStatus.SUCCEEDED) {
Expand Down

0 comments on commit 37cb056

Please sign in to comment.