Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepSamudrala committed Nov 14, 2016
2 parents d0393e9 + c980aa8 commit 250cc46
Show file tree
Hide file tree
Showing 28 changed files with 597 additions and 311 deletions.
89 changes: 82 additions & 7 deletions common/src/main/java/org/apache/falcon/entity/EntityUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -1024,18 +1024,69 @@ public static Date getPreviousInstanceTime(Date startTime, Frequency frequency,
*/
public static List<Date> getEntityInstanceTimes(Entity entity, String clusterName, Date startRange, Date endRange) {
Date start = null;
Date end = null;

switch (entity.getEntityType()) {

case FEED:
Feed feed = (Feed) entity;
start = FeedHelper.getCluster(feed, clusterName).getValidity().getStart();
org.apache.falcon.entity.v0.feed.Validity feedValidity =
FeedHelper.getCluster(feed, clusterName).getValidity();
start = feedValidity.getStart();
end = feedValidity.getEnd().before(endRange) ? feedValidity.getEnd() : endRange;
return getInstanceTimes(start, feed.getFrequency(), feed.getTimezone(),
startRange, endRange);
startRange, end);

case PROCESS:

Process process = (Process) entity;
start = ProcessHelper.getCluster(process, clusterName).getValidity().getStart();
org.apache.falcon.entity.v0.process.Validity processValidity =
ProcessHelper.getCluster(process, clusterName).getValidity();
start = processValidity.getStart();
end = processValidity.getEnd().before(endRange) ? processValidity.getEnd() : endRange;
return getInstanceTimes(start, process.getFrequency(),
process.getTimezone(), startRange, end);

default:
throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType());
}
}

/**
* Find the entity instance times in between the given time range.
* <p/>
* Both start and end Date are inclusive.
*
* @param entity feed or process entity whose instance times are to be found
* @param clusterName name of the cluster
* @param startRange start time for the input range
* @param endRange end time for the input range
* @return List of instance times in between the given time range
*/
public static List<Date> getEntityInstanceTimesInBetween(Entity entity, String clusterName, Date startRange,
Date endRange) {
Date start = null;
Date end = null;


switch (entity.getEntityType()) {
case FEED:
Feed feed = (Feed) entity;
org.apache.falcon.entity.v0.feed.Validity feedValidity =
FeedHelper.getCluster(feed, clusterName).getValidity();
start = feedValidity.getStart();
end = feedValidity.getEnd();
return getInstancesInBetween(start, end, feed.getFrequency(), feed.getTimezone(),
startRange, endRange);

case PROCESS:
Process process = (Process) entity;
org.apache.falcon.entity.v0.process.Validity processValidity =
ProcessHelper.getCluster(process, clusterName).getValidity();
start = processValidity.getStart();
end = processValidity.getEnd();

return getInstancesInBetween(start, end, process.getFrequency(),
process.getTimezone(), startRange, endRange);

default:
Expand Down Expand Up @@ -1066,13 +1117,37 @@ public static List<Date> getInstanceTimes(Date startTime, Frequency frequency, T

Date current = getPreviousInstanceTime(startTime, frequency, timeZone, startRange);
while (true) {
Date nextStartTime = getNextStartTime(startTime, frequency, timeZone, current);
if (nextStartTime.after(endRange)){
Date nextInstanceTime = getNextStartTime(startTime, frequency, timeZone, current);
if (nextInstanceTime.after(endRange)){
break;
}
result.add(nextStartTime);
result.add(nextInstanceTime);
// this is required because getNextStartTime returns greater than or equal to referenceTime
current = new Date(nextStartTime.getTime() + ONE_MS); // 1 milli seconds later
current = new Date(nextInstanceTime.getTime() + ONE_MS); // 1 milli seconds later
}
return result;
}


public static List<Date> getInstancesInBetween(Date startTime, Date endTime, Frequency frequency, TimeZone timeZone,
Date startRange, Date endRange) {
List<Date> result = new LinkedList<>();
if (endRange.before(startRange)) {
return result;
}
if (timeZone == null) {
timeZone = TimeZone.getTimeZone("UTC");
}
Date current = getPreviousInstanceTime(startTime, frequency, timeZone, startRange);
while (true) {
if (!current.before(startRange) && !current.after(endRange)
&& current.before(endTime) && !current.before(startTime)) {
result.add(current);
}
current = getNextInstanceTime(current, frequency, timeZone, 1);
if (current.after(endRange)){
break;
}
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -94,6 +95,16 @@ public void publish(String metricsName, Long value){
}
}

public void deleteMetric(String metricName){
synchronized (this){
SortedMap<String, Gauge> gaugeMap = metricRegistry.getGauges();
if (gaugeMap.get(metricName) != null){
metricRegistry.remove(metricName);
metricMap.remove(metricName);
}
}
}

private static class MetricGauge implements Gauge<Long> {

private Long value=0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

//SUSPEND CHECKSTYLE CHECK LineLengthCheck
/**
* Entity SLA monitoring.
* Feed SLA monitoring.
* */
@Entity
@NamedQueries({
Expand Down Expand Up @@ -148,12 +148,12 @@ public void setIsSLAHighMissed(Boolean isSLAHighMissed) {
this.isSLAHighMissed = isSLAHighMissed;
}

public static final String ENTITYNAME = "entityName";
public static final String ENTITY_NAME = "entityName";

public static final String CLUSTERNAME = "clusterName";
public static final String CLUSTER_NAME = "clusterName";

public static final String ENTITYTYPE = "entityType";
public static final String ENTITY_TYPE = "entityType";

public static final String NOMINALTIME = "nominalTime";
public static final String NOMINAL_TIME = "nominalTime";

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,26 @@
import javax.persistence.Column;
import javax.persistence.Basic;
import javax.validation.constraints.NotNull;
import java.util.Date;

//SUSPEND CHECKSTYLE CHECK LineLengthCheck
/**
* The Feeds that are to be monitered will be stored in the db.
* The Entities that are to be monitored will be stored in MONITORED_ENTITY table.
* */

@Entity
@NamedQueries({
@NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from "
@NamedQuery(name = PersistenceConstants.GET_MONITORED_ENTITY, query = "select OBJECT(a) from "
+ "MonitoredEntityBean a where a.entityName = :entityName and a.entityType = :entityType"),
@NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredEntityBean "
@NamedQuery(name = PersistenceConstants.DELETE_MONITORED_ENTITIES, query = "delete from MonitoredEntityBean "
+ "a where a.entityName = :entityName and a.entityType = :entityType"),
@NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_ENTITY_FOR_TYPE, query = "select OBJECT(a) "
@NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_ENTITIES_FOR_TYPE, query = "select OBJECT(a) "
+ "from MonitoredEntityBean a where a.entityType = :entityType"),
@NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_ENTITY, query = "select OBJECT(a) "
+ "from MonitoredEntityBean a")
+ "from MonitoredEntityBean a"),
@NamedQuery(name = PersistenceConstants.UPDATE_LAST_MONITORED_TIME, query = "update MonitoredEntityBean a "
+ "set a.lastMonitoredTime = :lastMonitoredTime where a.entityName = :entityName and a.entityType = "
+ ":entityType")
})
@Table(name="MONITORED_ENTITY")
//RESUME CHECKSTYLE CHECK LineLengthCheck
Expand Down Expand Up @@ -73,12 +77,25 @@ public void setEntityType(String entityType) {
@Column(name = "entity_type")
private String entityType;

public String getFeedName() {
public String getEntityName() {
return entityName;
}

public void setEntityName(String feedName) {
this.entityName = feedName;
public void setEntityName(String entityName) {
this.entityName = entityName;
}

@Basic
@NotNull
@Column(name = "last_monitored_time")
private Date lastMonitoredTime;

public Date getLastMonitoredTime() {
return lastMonitoredTime;
}

public void setLastMonitoredTime(Date lastMonitoredTime) {
this.lastMonitoredTime = lastMonitoredTime;
}

public String getId() {
Expand All @@ -89,8 +106,10 @@ public void setId(String id) {
this.id = id;
}

public static final String ENTITYNAME = "entityName";
public static final String ENTITY_NAME = "entityName";

public static final String ENTITY_TYPE = "entityType";

public static final String ENTITYTYPE = "entityType";
public static final String LAST_MONITORED_TIME = "lastMonitoredTime";

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
@NamedQuery(name = PersistenceConstants.GET_LATEST_INSTANCE_TIME, query = "select max(a.nominalTime) from PendingInstanceBean a where a.entityName = :entityName and a.entityType = :entityType"),
@NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.entityName = :entityName and a.entityType = :entityType"),
@NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
@NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_ENTITY, query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"),
@NamedQuery(name = PersistenceConstants.DELETE_ALL_PENDING_INSTANCES_FOR_ENTITY, query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"),
@NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"),
@NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a "),
@NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a order by a.nominalTime asc"),
@NamedQuery(name= PersistenceConstants.GET_PENDING_INSTANCE , query = "select OBJECT(a) from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType")
})
@Table(name = "PENDING_INSTANCES")
Expand Down Expand Up @@ -114,12 +114,12 @@ public void setEntityName(String entityName) {
this.entityName = entityName;
}

public static final String ENTITYNAME = "entityName";
public static final String ENTITY_NAME = "entityName";

public static final String CLUSTERNAME = "clusterName";
public static final String CLUSTER_NAME = "clusterName";

public static final String NOMINALTIME = "nominalTime";
public static final String NOMINAL_TIME = "nominalTime";

public static final String ENTITYTYPE = "entityType";
public static final String ENTITY_TYPE = "entityType";

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ public final class PersistenceConstants {
private PersistenceConstants(){

}
public static final String GET_MONITERED_INSTANCE = "GET_MONITERED_INSTANCE";
public static final String DELETE_MONITORED_INSTANCES = "DELETE_MONITORED_INSTANCES";
public static final String GET_ALL_MONITORING_ENTITY_FOR_TYPE = "GET_ALL_MONITORING_ENTITY_FOR_TYPE";
public static final String GET_MONITORED_ENTITY = "GET_MONITORED_ENTITY";
public static final String DELETE_MONITORED_ENTITIES = "DELETE_MONITORED_ENTITIES";
public static final String GET_ALL_MONITORING_ENTITIES_FOR_TYPE = "GET_ALL_MONITORING_ENTITIES_FOR_TYPE";
public static final String GET_ALL_MONITORING_ENTITY = "GET_ALL_MONITORING_ENTITY";
public static final String UPDATE_LAST_MONITORED_TIME = "UPDATE_LAST_MONITORED_TIME";

public static final String GET_PENDING_INSTANCES = "GET_PENDING_INSTANCES";
public static final String GET_PENDING_INSTANCE = "GET_PENDING_INSTANCE";
public static final String DELETE_PENDING_NOMINAL_INSTANCES = "DELETE_PENDING_NOMINAL_INSTANCES";
public static final String DELETE_ALL_INSTANCES_FOR_ENTITY = "DELETE_ALL_INSTANCES_FOR_ENTITY";
public static final String DELETE_ALL_PENDING_INSTANCES_FOR_ENTITY = "DELETE_ALL_PENDING_INSTANCES_FOR_ENTITY";
public static final String GET_DATE_FOR_PENDING_INSTANCES = "GET_DATE_FOR_PENDING_INSTANCES";
public static final String GET_ALL_PENDING_INSTANCES = "GET_ALL_PENDING_INSTANCES";
public static final String GET_ENTITY = "GET_ENTITY";
Expand Down Expand Up @@ -63,5 +66,5 @@ private PersistenceConstants(){
public static final String DELETE_ENTITY_ALERT_INSTANCE = "DELETE_ENTITY_ALERT_INSTANCE";
public static final String DELETE_BACKLOG_METRIC_INSTANCE = "DELETE_BACKLOG_METRIC_INSTANCE";
public static final String GET_ALL_BACKLOG_INSTANCES = "GET_ALL_BACKLOG_INSTANCES";
public static final String GET_ALL_MONITORING_ENTITY = "GET_ALL_MONITORING_ENTITY";
public static final String DELETE_ALL_BACKLOG_ENTITY_INSTANCES ="DELETE_ALL_BACKLOG_ENTITY_INSTANCES";
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static AbstractWorkflowEngine getWorkflowEngine(Entity entity) throws Fal
LOG.debug("Returning native workflow engine for entity {}", entity.getName());
return nativeWorkflowEngine;
}
LOG.debug("Returning configured workflow engine for entity {}.", entity);
LOG.debug("Returning configured workflow engine for entity {}", (entity == null)? null : entity.getName());
return getWorkflowEngine();
}

Expand All @@ -70,7 +70,7 @@ public static AbstractWorkflowEngine getWorkflowEngine(Entity entity, Map<String
throws FalconException {
// If entity is null or not schedulable and the engine property is not specified, return the configured WE.
if (entity == null || !entity.getEntityType().isSchedulable()) {
LOG.debug("Returning configured workflow engine for entity {}.", entity);
LOG.debug("Returning configured workflow engine for entity {}", (entity == null)? null : entity.getName());
return getWorkflowEngine();
}

Expand Down
19 changes: 9 additions & 10 deletions common/src/main/resources/startup.properties
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@
org.apache.falcon.entity.store.FeedLocationStore,\
org.apache.falcon.service.EntitySLAMonitoringService,\
org.apache.falcon.service.SharedLibraryHostingService

## if you wish to use BacklogEmitterService please add BackLogEmitter service as a configstore listners.##
# org.apache.falcon.service.BacklogMetricEmitterService

## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ##
# org.apache.falcon.state.store.jdbc.JdbcStateStore

Expand All @@ -83,7 +87,9 @@

##### Workflow Job Execution Completion listeners #####
*.workflow.execution.listeners=
#org.apache.falcon.handler.SLAMonitoringHandler
#org.apache.falcon.service.LogMoverService
#org.apache.falcon.service.BacklogMetricEmitterService

######### Implementation classes #########

Expand Down Expand Up @@ -244,15 +250,6 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
# Authorization Enabled flag: false (default)|true
*.falcon.security.authorization.enabled=false

# CSRF filter enabled flag: false (default) | true
*.falcon.security.csrf.enabled=false

# Custom header for CSRF filter
*.falcon.security.csrf.header=FALCON-CSRF-FILTER

# Browser user agents to be filtered
*.falcon.security.csrf.browser=^Mozilla.*,^Opera.*

# The name of the group of super-users
*.falcon.security.authorization.superusergroup=falcon

Expand Down Expand Up @@ -351,5 +348,7 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
*.falcon.postprocessing.enable=true

### LogMoveService Properties
*.falcon.logMoveService.threadCount=200
*.falcon.logMoveService.max.threadCount=200
*.falcon.logMoveService.blockingQueue.length=50
##Note min threadCount should always be smaller than max threadCount.
*.falcon.logMoveService.min.threadCount=20
4 changes: 4 additions & 0 deletions common/src/main/resources/statestore.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
## Falcon currently supports derby, mysql and postgreSQL, change url based on DB.
#*.falcon.statestore.jdbc.url=jdbc:derby:data/falcon.db;create=true

##Note : In case you are using mysql or postgres please specify URL as
## jdbc:mysql://localhost:3306/dbname?serverTimezone=UTC
## serverTimezone should be same as that of Oozie server.

## StateStore credentials file where username,password and other properties can be stored securely.
## Set this credentials file permission 400 and make sure user who starts falcon should only have read permission.
## Give Absolute path to credentials file along with file name or put in classpath with filename statestore.credentials.
Expand Down
Loading

0 comments on commit 250cc46

Please sign in to comment.