Skip to content

Commit

Permalink
Merge pull request apache#32 from PraveenAdlakha/inmobi_0.10
Browse files Browse the repository at this point in the history
FALCON-719,722,723 fixed
  • Loading branch information
pallavi-rao committed Oct 20, 2016
2 parents 0a204b1 + 09afe53 commit ad5ad64
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 10 deletions.
4 changes: 3 additions & 1 deletion common/src/main/resources/startup.properties
Original file line number Diff line number Diff line change
Expand Up @@ -346,5 +346,7 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
*.falcon.postprocessing.enable=true

### LogMoveService Properties
*.falcon.logMoveService.threadCount=200
*.falcon.logMoveService.max.threadCount=200
*.falcon.logMoveService.blockingQueue.length=50
##Note min threadCount should always be smaller than max threadCount.
*.falcon.logMoveService.min.threadCount=20
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ private WorkflowExecutionContext createContext(TextMessage message) throws JMSEx
wfProperties.put(WorkflowExecutionArgs.ENTITY_TYPE, entityTypePair.second.name());
wfProperties.put(WorkflowExecutionArgs.WORKFLOW_USER, message.getStringProperty("user"));
wfProperties.put(WorkflowExecutionArgs.OPERATION, getOperation(appName).name());

wfProperties.put(WorkflowExecutionArgs.USER_SUBFLOW_ID,
json.getString("id").concat("@user-action"));
String appType = message.getStringProperty("appType");
return WorkflowExecutionContext.create(wfProperties, WorkflowExecutionContext.Type.valueOf(appType));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public int run(WorkflowExecutionContext context) {
LOG.error("Error getting jobinfo for: {}", context.getUserSubflowId(), e);
return 0;
}

//Assumption is - Each wf run will have a directory
//the corresponding job logs are stored within the respective dir
Path path = new Path(context.getLogDir() + "/"
Expand Down
18 changes: 14 additions & 4 deletions oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,24 @@ public class LogMoverService implements WorkflowExecutionListener {

private BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(Integer.parseInt(
StartupProperties.get().getProperty("falcon.logMoveService.blockingQueue.length", "50")));
private ExecutorService executorService = new ThreadPoolExecutor(20, getThreadCount(), 120,
private ExecutorService executorService = new ThreadPoolExecutor(getCorePoolSize(), getThreadCount(), 120,
TimeUnit.SECONDS, blockingQueue);

public int getCorePoolSize(){
try{
return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.min.threadCount", "20"));
} catch (NumberFormatException e){
LOG.error("Exception in LogMoverService", e);
return 20;
}
}
public int getThreadCount() {
try{
return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.threadCount", "200"));
return Integer.parseInt(StartupProperties.get()
.getProperty("falcon.logMoveService.max.threadCount", "200"));
} catch (NumberFormatException e){
LOG.error("Exception in LogMoverService", e);
return 50;
return 200;
}
}

Expand Down Expand Up @@ -86,7 +96,7 @@ private void onEnd(WorkflowExecutionContext context){
if (Boolean.parseBoolean(ENABLE_POSTPROCESSING)) {
return;
}
while(0<blockingQueue.remainingCapacity()){
while(blockingQueue.remainingCapacity()<=0){
try {
LOG.trace("Sleeping, no capacity in threadpool....");
TimeUnit.MILLISECONDS.sleep(500);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ public void monitor(ResourceMessage message) {
String entityName = StringUtils.isNotBlank(message.getDimensions().get("entityName"))
? message.getDimensions().get("entityName") :message.getDimensions().get("entity-name");
String prefix = StartupProperties.get().getProperty("falcon.graphite.prefix");
String separator = ".";
LOG.debug("message:" + message.getAction());
if (entityType.equalsIgnoreCase(EntityType.PROCESS.name())) {
if (entityType.equalsIgnoreCase(EntityType.PROCESS.name())
&& ConfigurationStore.get().get(EntityType.PROCESS, entityName) != null) {
Entity entity = ConfigurationStore.get().get(EntityType.PROCESS, entityName);
Process process = (Process) entity;
String pipeline = StringUtils.isNotBlank(process.getPipelines()) ? process.getPipelines() : "default";
Expand Down
4 changes: 3 additions & 1 deletion src/conf/startup.properties
Original file line number Diff line number Diff line change
Expand Up @@ -363,5 +363,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
*.falcon.postprocessing.enable=true

### LogMoveService Properties
*.falcon.logMoveService.threadCount=200
*.falcon.logMoveService.max.threadCount=200
*.falcon.logMoveService.blockingQueue.length=50
##Note min threadCount should always be smaller than max threadCount.
*.falcon.logMoveService.min.threadCount=20

0 comments on commit ad5ad64

Please sign in to comment.