Skip to content

Commit

Permalink
FALCON-2229 BacklogEmitterMetricService fix for deleting entities' in…
Browse files Browse the repository at this point in the history
…stances in case of removal of SLA or deletion of entity

Author: sandeep <sandysmdl@gmail.com>

Reviewers: @PracheerAgarwal, @pallavi-rao

Closes apache#335 from sandeepSamudrala/FALCON-2229 and squashes the following commits:

280a079 [sandeep] FALCON-2229 Removed cluster from the named query that deletes all the instances irrespective of the cluster
22a80b6 [sandeep] FALCON-2229 Incorporated review comments. Renamed the method
5cb25e8 [sandeep] FALCON-2229 BacklogEmitterMetricService fix for deleting entities' instances in case of removal of SLA or deletion of entity
3558af3 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2229
cc28658 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
089b10d [sandeep] Merge branch 'master' of https://github.com/apache/falcon
456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon
b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon
194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon
e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes
  • Loading branch information
sandeepSamudrala authored and Pallavi Nagesha Rao committed Feb 16, 2018
1 parent 091ce28 commit 13b2303
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
@Entity
@NamedQueries({
@NamedQuery(name = PersistenceConstants.GET_ALL_BACKLOG_INSTANCES, query = "select OBJECT(a) from BacklogMetricBean a "),
@NamedQuery(name = PersistenceConstants.DELETE_BACKLOG_METRIC_INSTANCE, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType")
@NamedQuery(name = PersistenceConstants.DELETE_BACKLOG_METRIC_INSTANCE, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
@NamedQuery(name = PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.entityType = :entityType")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import java.util.Map;

/**
* Backlog Metric Store for entitties.
* Backlog Metric Store for entities' backlog instances.
*/
public class BacklogMetricStore {

Expand Down Expand Up @@ -70,18 +70,19 @@ public synchronized void deleteMetricInstance(String entityName, String cluster,
q.setParameter("clusterName", cluster);
q.setParameter("nominalTime", nominalTime);
q.setParameter("entityType", entityType.name());
try{
try {
q.executeUpdate();
} finally {
commitAndCloseTransaction(entityManager);
}
}

public void deleteEntityInstance(String entityName){
public void deleteEntityBackLogInstances(String entityName, String entityType) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES);
q.setParameter("entityName", entityName);
q.setParameter("entityType", entityType);
try {
q.executeUpdate();
} finally {
Expand Down Expand Up @@ -110,7 +111,7 @@ public Map<Entity, List<MetricInfo>> getAllInstances() throws FalconException {
if (CollectionUtils.isEmpty(result)) {
return null;
}
} finally{
} finally {
entityManager.close();
}

Expand All @@ -121,7 +122,7 @@ public Map<Entity, List<MetricInfo>> getAllInstances() throws FalconException {
if (!backlogMetrics.containsKey(entity)) {
backlogMetrics.put(entity, new ArrayList<MetricInfo>());
}
List<MetricInfo> metrics = backlogMetrics.get(entity);
List<MetricInfo> metrics = backlogMetrics.get(entity);
MetricInfo metricInfo = new MetricInfo(BacklogMetricEmitterService.DATE_FORMAT.get()
.format(backlogMetricBean.getNominalTime()),
backlogMetricBean.getClusterName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -82,7 +81,7 @@ public final class BacklogMetricEmitterService implements FalconService,
Services.get().getService(MetricNotificationService.SERVICE_NAME);

private static final List<LifeCycle> PROCESS_LIFE_CYCLE =
Arrays.asList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
Collections.singletonList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));

public static BacklogMetricEmitterService get() {
return SERVICE;
Expand All @@ -107,18 +106,18 @@ protected SimpleDateFormat initialValue() {
private static ConcurrentHashMap<Entity, List<MetricInfo>> entityBacklogs = new ConcurrentHashMap<>();

@Override
public void onAdd(Entity entity) throws FalconException{
public void onAdd(Entity entity) throws FalconException {
addToBacklog(entity);
}

@Override
public void onRemove(Entity entity) throws FalconException{
if (entity.getEntityType() != EntityType.PROCESS){
public void onRemove(Entity entity) throws FalconException {
if (entity.getEntityType() != EntityType.PROCESS) {
return;
}
Process process = (Process) entity;
if (process.getSla() != null) {
backlogMetricStore.deleteEntityInstance(entity.getName());
backlogMetricStore.deleteEntityBackLogInstances(entity.getName(), entity.getEntityType().name());
entityBacklogs.remove(entity);
process = EntityUtil.getEntity(entity.getEntityType(), entity.getName());
for (Cluster cluster : process.getClusters().getClusters()) {
Expand All @@ -127,7 +126,7 @@ public void onRemove(Entity entity) throws FalconException{
}
}

public void dropMetric(String clusterName, Process process){
private void dropMetric(String clusterName, Process process) {
String pipelinesStr = process.getPipelines();
String metricName;

Expand All @@ -144,15 +143,15 @@ public void dropMetric(String clusterName, Process process){
}

@Override
public void onChange(Entity oldEntity, Entity newEntity) throws FalconException{
if (oldEntity.getEntityType() != EntityType.PROCESS){
public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
if (oldEntity.getEntityType() != EntityType.PROCESS) {
return;
}
Process newProcess = (Process) newEntity;
Process oldProcess = EntityUtil.getEntity(oldEntity.getEntityType(), oldEntity.getName());
if (newProcess.getSla() == null || newProcess.getSla().getShouldEndIn() == null){
if (newProcess.getSla() == null || newProcess.getSla().getShouldEndIn() == null) {
if (oldProcess.getSla() != null) {
backlogMetricStore.deleteEntityInstance(newProcess.getName());
backlogMetricStore.deleteEntityBackLogInstances(newProcess.getName(), newEntity.getEntityType().name());
entityBacklogs.remove(newProcess);
for (Cluster cluster : oldProcess.getClusters().getClusters()) {
dropMetric(cluster.getName(), oldProcess);
Expand All @@ -164,16 +163,16 @@ public void onChange(Entity oldEntity, Entity newEntity) throws FalconException{
}

@Override
public void onReload(Entity entity) throws FalconException{
public void onReload(Entity entity) throws FalconException {
addToBacklog(entity);
}

public void addToBacklog(Entity entity) {
private void addToBacklog(Entity entity) {
if (entity.getEntityType() != EntityType.PROCESS) {
return;
}
Process process = (Process) entity;
if (process.getSla() == null){
if (process.getSla() == null) {
return;
}
entityBacklogs.putIfAbsent(entity, Collections.synchronizedList(new ArrayList<MetricInfo>()));
Expand Down Expand Up @@ -277,9 +276,9 @@ public void onWait(WorkflowExecutionContext context) throws FalconException {
}

/**
* Service which executes backlog evaluation and publishing metrics to Graphite parallel for entities.
* Service that executes backlog evaluation and publishes metrics to Graphite for entities in parallel.
*/
public static class BacklogMetricEmitter implements Runnable {
private static class BacklogMetricEmitter implements Runnable {
private ThreadPoolExecutor executor;

@Override
Expand Down Expand Up @@ -311,9 +310,9 @@ private void waitForFuturesToComplete(List<Future> futures) {
}

/**
* Service which calculates backlog for given entity and publish to graphite.
* Service that calculates backlog for given entity and publishes them to graphite.
*/
public static class BacklogCalcService implements Runnable {
private static class BacklogCalcService implements Runnable {

private Entity entityObj;
private List<MetricInfo> metrics;
Expand All @@ -329,18 +328,17 @@ public void run() {
MetricInfo metricInfo = null;
HashMap<String, Long> backLogsCluster = new HashMap<>();
synchronized (metrics) {
if (metrics.isEmpty()){
Process process = (Process)entityObj;
if (metrics.isEmpty()) {
Process process = (Process) entityObj;
Clusters clusters = process.getClusters();
for (Cluster cluster : clusters.getClusters()){
for (Cluster cluster : clusters.getClusters()) {
publishBacklog(process, cluster.getName(), 0L);
}
}else{
} else {
long currentTime = System.currentTimeMillis();
Iterator iter = metrics.iterator();
while (iter.hasNext()) {
for (MetricInfo metric : metrics) {
try {
metricInfo = (MetricInfo) iter.next();
metricInfo = metric;
long time = DATE_FORMAT.get().parse(metricInfo.getNominalTime()).getTime();
long backlog = backLogsCluster.containsKey(metricInfo.getCluster())
? backLogsCluster.get(metricInfo.getCluster()) : 0;
Expand All @@ -366,7 +364,7 @@ public void run() {
}


public static void publishBacklog(Process process, String clusterName, Long backlog){
private static void publishBacklog(Process process, String clusterName, Long backlog) {
String pipelinesStr = process.getPipelines();
String metricName;

Expand All @@ -382,19 +380,17 @@ public static void publishBacklog(Process process, String clusterName, Long back
}
}

public static String getMetricName(String clusterName, String processName, String pipeline){
String metricName = METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR
private static String getMetricName(String clusterName, String processName, String pipeline) {
return METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR
+ pipeline + METRIC_SEPARATOR + LifeCycle.EXECUTION.name()
+ METRIC_SEPARATOR + processName + METRIC_SEPARATOR
+ "backlogInMins";
return metricName;
}

/**
* Service runs periodically and removes succeeded instances from backlog list.
*/
public static class BacklogCheckService implements Runnable {

private static class BacklogCheckService implements Runnable {
@Override
public void run() {
LOG.trace("BacklogCheckService running for entities");
Expand All @@ -414,7 +410,7 @@ public void run() {
authenticateUser(entity);
if (wfEngine.isMissing(entity)) {
LOG.info("Entity of name {} was deleted so removing instance of "
+ "nominaltime {} ", entity.getName(), nominalTimeStr);
+ "nominal time {} ", entity.getName(), nominalTimeStr);
backlogMetricStore.deleteMetricInstance(entity.getName(),
metricInfo.getCluster(), nominalTime, entity.getEntityType());
iterator.remove();
Expand Down Expand Up @@ -444,7 +440,7 @@ public void run() {
}
}

private static void authenticateUser(Entity entity){
private static void authenticateUser(Entity entity) {
if (!CurrentUser.isAuthenticated()) {
if (StringUtils.isNotBlank(entity.getACL().getOwner())) {
CurrentUser.authenticate(entity.getACL().getOwner());
Expand All @@ -453,5 +449,4 @@ private static void authenticateUser(Entity entity){
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -80,12 +80,12 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList

private static final EntitySLAMonitoringService SERVICE = new EntitySLAMonitoringService();

public static final String TAG_CRITICAL = "Missed-SLA-High";
public static final String TAG_WARN = "Missed-SLA-Low";
static final String TAG_CRITICAL = "Missed-SLA-High";
static final String TAG_WARN = "Missed-SLA-Low";
private static final long MINUTE_DELAY = 60000L;

private static final List<LifeCycle> PROCESS_LIFE_CYCLE =
Arrays.asList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
Collections.singletonList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));

private EntitySLAMonitoringService() {

Expand Down Expand Up @@ -176,7 +176,7 @@ private void startEntityMonitoring(Entity entity, boolean isEntityUpdated) throw
}
}

public Boolean checkFeedClusterSLA(Feed feed){
private Boolean checkFeedClusterSLA(Feed feed){
for(Cluster cluster : feed.getClusters().getClusters()){
Sla sla = FeedHelper.getSLA(cluster, feed);
if (sla != null){
Expand All @@ -187,7 +187,7 @@ public Boolean checkFeedClusterSLA(Feed feed){
}


public Boolean checkProcessClusterSLA(Process process){
private Boolean checkProcessClusterSLA(Process process){
Clusters clusters = process.getClusters();
for(org.apache.falcon.entity.v0.process.Cluster cluster : clusters.getClusters()){
org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(cluster, process);
Expand Down Expand Up @@ -292,7 +292,7 @@ public void onChange(Entity oldEntity, Entity newEntity) throws FalconException
}
}

void updatePendingInstances(String entityName, List<String> slaRemovedClusters , String entityType){
private void updatePendingInstances(String entityName, List<String> slaRemovedClusters, String entityType){
for(String clusterName :slaRemovedClusters){
MONITORING_JDBC_STATE_STORE.deletePendingInstances(entityName, clusterName,
entityType);
Expand Down Expand Up @@ -384,7 +384,7 @@ private void addPendingInstances(String entityType, Entity entity,
}
}

void addPendingEntityInstances(Date checkPointTime) throws FalconException {
private void addPendingEntityInstances(Date checkPointTime) throws FalconException {
Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
List<MonitoredEntityBean> entityBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredEntities();
for(MonitoredEntityBean monitoredEntityBean : entityBeanList) {
Expand Down Expand Up @@ -611,8 +611,8 @@ Set<Pair<Date, String>> getFeedSLAStatus(Sla sla, Date start, Date end, List<Dat
return result;
}

Set<Pair<Date, String>> getProcessSLAStatus(org.apache.falcon.entity.v0.process.Sla sla, Date start,
Date end, List<Date> missingInstances) throws FalconException {
private Set<Pair<Date, String>> getProcessSLAStatus(org.apache.falcon.entity.v0.process.Sla sla, Date start,
Date end, List<Date> missingInstances) throws FalconException {
Date now = new Date();
Frequency slaHigh = sla.getShouldEndIn();
Set<Pair<Date, String>> result = new HashSet<>();
Expand Down
Loading

0 comments on commit 13b2303

Please sign in to comment.