From 37cb056b93b6e414172e12605c33c27223d2800e Mon Sep 17 00:00:00 2001 From: Ajay Yadava Date: Thu, 1 Dec 2016 09:33:13 +0530 Subject: [PATCH] FALCON-2204 Change mode for falcon_merge_pr.py to executable This pull request changes the permissions to be executable. Author: Ajay Yadava Reviewers: @pallavi-rao, @sandeepSamudrala Closes #310 from ajayyadava/2204 --- .../workflow/WorkflowExecutionArgs.java | 2 +- falcon_merge_pr.py | 0 .../apache/falcon/logging/JobLogMover.java | 7 + .../falcon/jdbc/MonitoringJdbcStateStore.java | 11 +- .../AbstractSchedulableEntityManager.java | 21 ++- .../service/BacklogMetricEmitterService.java | 8 +- .../service/EntitySLAMonitoringService.java | 160 ++++++++++-------- 7 files changed, 121 insertions(+), 88 deletions(-) mode change 100644 => 100755 falcon_merge_pr.py diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java index 2171092fe..682b14eb6 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java @@ -41,7 +41,7 @@ public enum WorkflowExecutionArgs { DATASOURCE_NAME("datasource", "name of the datasource", false), // who - WORKFLOW_USER("workflowUser", "user who owns the feed instance (partition)"), + WORKFLOW_USER("workflowUser", "user who ran the instance"), // what // workflow details diff --git a/falcon_merge_pr.py b/falcon_merge_pr.py old mode 100644 new mode 100755 diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java index 535f62a59..5023db327 100644 --- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java +++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java @@ -22,6 +22,7 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.process.EngineType; import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.security.CurrentUser; import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.falcon.workflow.util.OozieActionConfigurationHelper; import org.apache.hadoop.conf.Configuration; @@ -91,6 +92,12 @@ public int run(WorkflowExecutionContext context) { context.getWorkflowId(), context.getWorkflowStatus()); return 0; } + String instanceOwner = context.getWorkflowUser(); + if (StringUtils.isNotBlank(instanceOwner)) { + CurrentUser.authenticate(instanceOwner); + } else { + CurrentUser.authenticate(System.getProperty("user.name")); + } OozieClient client = new OozieClient(engineUrl); WorkflowJob jobInfo; try { diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java index 669e18d9e..7f5776e16 100644 --- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java +++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java @@ -17,8 +17,6 @@ */ package org.apache.falcon.jdbc; -import org.apache.commons.collections.CollectionUtils; - import org.apache.falcon.FalconException; import org.apache.falcon.persistence.MonitoredEntityBean; import org.apache.falcon.persistence.PendingInstanceBean; @@ -188,14 +186,7 @@ public List getAllPendingInstances(){ EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES); List result = q.getResultList(); - - try { - if (CollectionUtils.isEmpty(result)) { - return null; - } - } finally{ - entityManager.close(); - } + entityManager.close(); return result; } diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java index ef9cf69e2..c5b3ecc83 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java @@ -164,27 +164,32 @@ public static void validateSlaParams(String entityType, String entityName, Strin * @param endStr */ public SchedulableEntityInstanceResult getEntitySLAMissPendingAlerts(String entityName, String entityType, - String startStr, String endStr, String colo) { - + String startStr, String endStr, + String colo) { Set instances = new HashSet<>(); + String resultMessage = "Success!"; try { checkColo(colo); Date start = EntityUtil.parseDateUTC(startStr); Date end = (endStr == null) ? new Date() : EntityUtil.parseDateUTC(endStr); - if (StringUtils.isBlank(entityName)) { - instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(start, end)); + instances = EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(start, end); } else { - for (String clusterName : DeploymentUtil.getCurrentClusters()) { - instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(entityName, - clusterName, start, end, entityType)); + String status = getStatusString(EntityUtil.getEntity(entityType, entityName)); + if (status.equals(EntityStatus.RUNNING.name())) { + for (String clusterName : DeploymentUtil.getCurrentClusters()) { + instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(entityName, + clusterName, start, end, entityType)); + } + } else { + resultMessage = entityName + " is " + status; } } } catch (FalconException e) { throw FalconWebException.newAPIException(e); } SchedulableEntityInstanceResult result = new SchedulableEntityInstanceResult(APIResult.Status.SUCCEEDED, - "Success!"); + resultMessage); result.setCollection(instances.toArray()); return result; } diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java index b01b181c9..768861906 100644 --- a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java +++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java @@ -42,6 +42,7 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -80,6 +81,9 @@ public final class BacklogMetricEmitterService implements FalconService, private static MetricNotificationService metricNotificationService = Services.get().getService(MetricNotificationService.SERVICE_NAME); + private static final List PROCESS_LIFE_CYCLE = + Arrays.asList(LifeCycle.valueOf(LifeCycle.EXECUTION.name())); + public static BacklogMetricEmitterService get() { return SERVICE; } @@ -149,7 +153,7 @@ public void onChange(Entity oldEntity, Entity newEntity) throws FalconException{ for(Cluster cluster : process.getClusters().getClusters()){ dropMetric(cluster.getName(), process); } - }else{ + } else { addToBacklog(newEntity); } } @@ -412,7 +416,7 @@ public void run() { continue; } InstancesResult status = wfEngine.getStatus(entity, nominalTime, - nominalTime, null, null); + new Date(nominalTime.getTime() + 200), PROCESS_LIFE_CYCLE, false); if (status.getInstances().length > 0 && status.getInstances()[0].status == InstancesResult. WorkflowStatus.SUCCEEDED) { diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java index 00e116be3..451fb95b0 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import java.text.ParseException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.HashSet; import java.util.List; @@ -27,7 +28,9 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; +import org.apache.falcon.LifeCycle; import org.apache.falcon.Pair; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; @@ -47,7 +50,6 @@ import org.apache.falcon.jdbc.MonitoringJdbcStateStore; import org.apache.falcon.persistence.MonitoredEntityBean; import org.apache.falcon.persistence.PendingInstanceBean; -import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.SchedulableEntityInstance; import org.apache.falcon.security.CurrentUser; @@ -82,6 +84,9 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList public static final String TAG_WARN = "Missed-SLA-Low"; private static final long MINUTE_DELAY = 60000L; + private static final List PROCESS_LIFE_CYCLE = + Arrays.asList(LifeCycle.valueOf(LifeCycle.EXECUTION.name())); + private EntitySLAMonitoringService() { } @@ -127,22 +132,27 @@ public static EntitySLAMonitoringService get() { @Override public void onAdd(Entity entity) throws FalconException { + startEntityMonitoring(entity, false); + } + + private void startEntityMonitoring(Entity entity, boolean isEntityUpdated) throws FalconException{ Set currentClusters = DeploymentUtil.getCurrentClusters(); Set clustersDefined = EntityUtil.getClustersDefined(entity); if (entity.getEntityType() == EntityType.FEED) { Feed feed = (Feed) entity; - // currently sla service is enabled only for fileSystemStorage + // currently sla service for feed is enabled only for fileSystemStorage if (feed.getLocations() != null || feed.getSla() != null || checkFeedClusterSLA(feed)) { for (String cluster : clustersDefined) { if (currentClusters.contains(cluster)) { if (FeedHelper.getSLA(cluster, feed) != null) { LOG.debug("Adding feed:{} for monitoring", feed.getName()); - MONITORING_JDBC_STATE_STORE.putMonitoredEntity(feed.getName(), EntityType.FEED.toString(), - new Date(now().getTime() + MINUTE_DELAY)); - List instances = EntityUtil.getEntityInstanceTimesInBetween(entity, cluster, - getStartTime(entity, cluster), now()); - addPendingInstances(entity.getEntityType().name().toLowerCase(), entity, cluster, - instances); + if (isEntityUpdated) { + MONITORING_JDBC_STATE_STORE.putMonitoredEntity(feed.getName(), + EntityType.FEED.toString(), now()); + } else { + MONITORING_JDBC_STATE_STORE.putMonitoredEntity(feed.getName(), + EntityType.FEED.toString(), getStartTime(entity, cluster)); + } } } } @@ -153,11 +163,13 @@ public void onAdd(Entity entity) throws FalconException { for (String cluster : clustersDefined) { if (currentClusters.contains(cluster)) { LOG.debug("Adding process:{} for monitoring", process.getName()); - MONITORING_JDBC_STATE_STORE.putMonitoredEntity(process.getName(), - EntityType.PROCESS.toString(), new Date(now().getTime() + MINUTE_DELAY)); - List instances = EntityUtil.getEntityInstanceTimesInBetween(entity, cluster, - getStartTime(entity, cluster), now()); - addPendingInstances(entity.getEntityType().name().toLowerCase(), entity, cluster, instances); + if (isEntityUpdated) { + MONITORING_JDBC_STATE_STORE.putMonitoredEntity(process.getName(), + EntityType.PROCESS.toString(), now()); + } else { + MONITORING_JDBC_STATE_STORE.putMonitoredEntity(process.getName(), + EntityType.PROCESS.toString(), getStartTime(entity, cluster)); + } } } } @@ -298,20 +310,14 @@ public String getName() { @Override public void init() throws FalconException { - String uri = StartupProperties.get().getProperty("entity.sla.service.store.uri"); - storePath = new Path(uri); - filePath = new Path(storePath, "entitySLAMonitoringService"); - fileSystem = initializeFileSystem(); String freq = StartupProperties.get().getProperty("entity.sla.statusCheck.frequency.seconds", "600"); statusCheckFrequencySeconds = Integer.parseInt(freq); freq = StartupProperties.get().getProperty("entity.sla.lookAheadWindow.millis", "900000"); lookAheadWindowMillis = Integer.parseInt(freq); - LOG.info("Initializing EntitySLAMonitoringService from ", filePath.toString()); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); - addPendingEntityInstances(EntityType.FEED.name(), null, now()); - addPendingEntityInstances(EntityType.PROCESS.name(), null, now()); + addPendingEntityInstances(now()); executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS); } @@ -353,13 +359,11 @@ private class Monitor implements Runnable { public void run() { try { if (MONITORING_JDBC_STATE_STORE.getAllMonitoredEntities().size() > 0) { - checkPendingInstanceAvailability(EntityType.FEED.toString()); - checkPendingInstanceAvailability(EntityType.PROCESS.toString()); + checkPendingInstanceAvailability(); // add Instances from last checked time to 10 minutes from now(some buffer for status check) Date newCheckPointTime = new Date(now().getTime() + lookAheadWindowMillis); - addPendingEntityInstances(EntityType.FEED.toString(), null, newCheckPointTime); - addPendingEntityInstances(EntityType.PROCESS.toString(), null, newCheckPointTime); + addPendingEntityInstances(newCheckPointTime); } } catch (Throwable e) { LOG.error("Feed SLA monitoring failed: ", e); @@ -380,29 +384,35 @@ private void addPendingInstances(String entityType, Entity entity, } } - void addPendingEntityInstances(String entityType, Date startTime, Date endTime) throws FalconException { + void addPendingEntityInstances(Date checkPointTime) throws FalconException { Set currentClusters = DeploymentUtil.getCurrentClusters(); - List entityBeanList = MONITORING_JDBC_STATE_STORE. - getAllMonitoredEntities(entityType); + List entityBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredEntities(); for(MonitoredEntityBean monitoredEntityBean : entityBeanList) { String entityName = monitoredEntityBean.getEntityName(); - Date lastMonitoredInstanceTime = (startTime != null) ? startTime - : monitoredEntityBean.getLastMonitoredTime(); - Date newCheckPointTime = endTime != null ? endTime : now(); - Entity entity = EntityUtil.getEntity(entityType, entityName); - Set clustersDefined = EntityUtil.getClustersDefined(entity); - List clusters = new ArrayList(); - for(String cluster : clustersDefined){ - clusters.add(ClusterHelper.getCluster(cluster)); - } - for (org.apache.falcon.entity.v0.cluster.Cluster entityCluster : clusters) { - if (currentClusters.contains(entityCluster.getName())) { - List instances = EntityUtil.getEntityInstanceTimesInBetween(entity, entityCluster.getName(), - lastMonitoredInstanceTime, newCheckPointTime); - addPendingInstances(entityType, entity, entityCluster.getName(), instances); - // update last monitored time with the new checkpoint time - MONITORING_JDBC_STATE_STORE.updateLastMonitoredTime(entityName, entityType, - new Date(newCheckPointTime.getTime() + MINUTE_DELAY)); + String entityType = monitoredEntityBean.getEntityType(); + if (EntityType.FEED.name().equalsIgnoreCase(entityType) + || isEntityRunning(EntityUtil.getEntity(entityType, entityName))) { + Date lastMonitoredInstanceTime = monitoredEntityBean.getLastMonitoredTime(); + Date newCheckPointTime = checkPointTime; + Entity entity = EntityUtil.getEntity(entityType, entityName); + Set clustersDefined = EntityUtil.getClustersDefined(entity); + List clusters = new ArrayList(); + for (String cluster : clustersDefined) { + clusters.add(ClusterHelper.getCluster(cluster)); + } + for (org.apache.falcon.entity.v0.cluster.Cluster entityCluster : clusters) { + if (currentClusters.contains(entityCluster.getName())) { + Date endTime = EntityUtil.getEndTime(entity, entityCluster.getName()); + if (endTime.before(now())) { + newCheckPointTime = endTime; + } + List instances = EntityUtil.getEntityInstanceTimesInBetween(entity, + entityCluster.getName(), lastMonitoredInstanceTime, newCheckPointTime); + addPendingInstances(entityType, entity, entityCluster.getName(), instances); + // update last monitored time with the new checkpoint time + MONITORING_JDBC_STATE_STORE.updateLastMonitoredTime(entityName, entityType, + new Date(newCheckPointTime.getTime() + MINUTE_DELAY)); + } } } } @@ -412,20 +422,20 @@ void addPendingEntityInstances(String entityType, Date startTime, Date endTime) /** * Checks the availability of all the pendingInstances and removes the ones which have become available. */ - private void checkPendingInstanceAvailability(String entityType) throws FalconException { - if (MONITORING_JDBC_STATE_STORE.getAllPendingInstances() == null){ + private void checkPendingInstanceAvailability() throws FalconException { + List pendingInstanceBeans = MONITORING_JDBC_STATE_STORE.getAllPendingInstances(); + if (pendingInstanceBeans.isEmpty()){ LOG.info("No pending instances to be checked"); return; } - for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllPendingInstances()){ - for (Date instanceTime : MONITORING_JDBC_STATE_STORE.getNominalInstances( - pendingInstanceBean.getEntityName(), pendingInstanceBean.getClusterName(), entityType)) { - boolean status = checkEntityInstanceAvailability(pendingInstanceBean.getEntityName(), - pendingInstanceBean.getClusterName(), instanceTime, entityType); - if (status) { - MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getEntityName(), - pendingInstanceBean.getClusterName(), instanceTime, EntityType.FEED.toString()); - } + for(PendingInstanceBean pendingInstanceBean : pendingInstanceBeans){ + boolean status = checkEntityInstanceAvailability(pendingInstanceBean.getEntityName(), + pendingInstanceBean.getClusterName(), pendingInstanceBean.getNominalTime(), + pendingInstanceBean.getEntityType()); + if (status) { + MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getEntityName(), + pendingInstanceBean.getClusterName(), pendingInstanceBean.getNominalTime(), + pendingInstanceBean.getEntityType()); } } } @@ -434,21 +444,25 @@ private void checkPendingInstanceAvailability(String entityType) throws FalconEx private boolean checkEntityInstanceAvailability(String entityName, String clusterName, Date nominalTime, String entityType) throws FalconException { Entity entity = EntityUtil.getEntity(entityType, entityName); - authenticateUser(); + authenticateUser(entity); try { - if (entityType.equals(EntityType.PROCESS.toString())){ + if (entityType.equalsIgnoreCase(EntityType.PROCESS.toString())){ LOG.trace("Checking instance availability status for entity:{}, cluster:{}, " + "instanceTime:{}", entity.getName(), clusterName, nominalTime, entityType); AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine(); - InstancesResult instancesResult = wfEngine.getStatus(entity, nominalTime, nominalTime, null, null); - if (instancesResult.getStatus().equals(APIResult.Status.SUCCEEDED)){ - LOG.trace("Entity instance(Process:{}, cluster:{}, instanceTime:{}) is available.", - entity.getName(), clusterName, nominalTime); - return true; + + InstancesResult instancesResult = wfEngine.getStatus(entity, nominalTime, + new Date(nominalTime.getTime() + 200), PROCESS_LIFE_CYCLE, false); + if (instancesResult.getInstances().length > 0) { + if (instancesResult.getInstances()[0].status.equals(InstancesResult.WorkflowStatus.SUCCEEDED)){ + LOG.trace("Entity instance(Process:{}, cluster:{}, instanceTime:{}) is available.", + entity.getName(), clusterName, nominalTime); + return true; + } } return false; } - if (entityType.equals(EntityType.FEED.toString())){ + if (entityType.equalsIgnoreCase(EntityType.FEED.toString())){ LOG.trace("Checking instance availability status for feed:{}, cluster:{}, instanceTime:{}", entity.getName(), clusterName, nominalTime); @@ -466,7 +480,7 @@ private boolean checkEntityInstanceAvailability(String entityName, String cluste clusterName, entityType, nominalTime, e); } LOG.debug("Entity instance(entity:{}, cluster:{}, instanceTime:{}) is not available.", entity.getName(), - clusterName, nominalTime); + clusterName, nominalTime); return false; } @@ -509,7 +523,7 @@ public Set getEntitySLAMissPendingAlerts(Date start, org.apache.falcon.entity.v0.process.Cluster cluster = ProcessHelper.getCluster(process, entityClusterPair.second); org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(cluster, process); - if (sla != null){ + if (sla != null && isEntityRunning(process)){ Set> slaStatus = getProcessSLAStatus(sla, start, end, MONITORING_JDBC_STATE_STORE.getNominalInstances(entityClusterPair.first, entityClusterPair.second, entityType)); @@ -659,9 +673,21 @@ public void makeProcessInstanceAvailable(String clusterName, String entityName, } // Authenticate user only if not already authenticated. - private void authenticateUser(){ + private void authenticateUser(Entity entity){ if (!CurrentUser.isAuthenticated()) { - CurrentUser.authenticate(System.getProperty("user.name")); + if (StringUtils.isNotBlank(entity.getACL().getOwner())) { + CurrentUser.authenticate(entity.getACL().getOwner()); + } else { + CurrentUser.authenticate(System.getProperty("user.name")); + } } } + + + private boolean isEntityRunning(Entity entity) throws FalconException { + authenticateUser(entity); + AbstractWorkflowEngine workflowEngine = WorkflowEngineFactory.getWorkflowEngine(); + return workflowEngine.isActive(entity) && !workflowEngine.isSuspended(entity) + && !workflowEngine.isCompleted(entity); + } }