From 7cc3f1c0d04cbecdf2f3f39fafb3c030e55196ca Mon Sep 17 00:00:00 2001 From: Praveen Adlakha Date: Wed, 12 Oct 2016 15:38:01 +0530 Subject: [PATCH 1/2] FALCON-719,722,723 fixed --- common/src/main/resources/startup.properties | 2 ++ .../apache/falcon/messaging/JMSMessageConsumer.java | 3 ++- .../java/org/apache/falcon/logging/JobLogMover.java | 1 - .../org/apache/falcon/service/LogMoverService.java | 13 +++++++++++-- .../falcon/plugin/GraphiteNotificationPlugin.java | 5 ++--- src/conf/startup.properties | 2 ++ 6 files changed, 19 insertions(+), 7 deletions(-) diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index aada5bfb3..ab720ae4a 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -348,3 +348,5 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle ### LogMoveService Properties *.falcon.logMoveService.threadCount=200 *.falcon.logMoveService.blockingQueue.length=50 +*.falcon.logMoveService.CorePoolSize=20 +##Note CorePoolSize should always be smaller than threadCount. diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java index 90bbdd3a4..11e9ec979 100644 --- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java +++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java @@ -156,7 +156,8 @@ private WorkflowExecutionContext createContext(TextMessage message) throws JMSEx wfProperties.put(WorkflowExecutionArgs.ENTITY_TYPE, entityTypePair.second.name()); wfProperties.put(WorkflowExecutionArgs.WORKFLOW_USER, message.getStringProperty("user")); wfProperties.put(WorkflowExecutionArgs.OPERATION, getOperation(appName).name()); - + wfProperties.put(WorkflowExecutionArgs.USER_SUBFLOW_ID, + json.getString("id").concat("@user-action")); String appType = message.getStringProperty("appType"); return WorkflowExecutionContext.create(wfProperties, WorkflowExecutionContext.Type.valueOf(appType)); diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java index 6ec2a2020..72c3dc502 100644 --- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java +++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java @@ -92,7 +92,6 @@ public int run(WorkflowExecutionContext context) { LOG.error("Error getting jobinfo for: {}", context.getUserSubflowId(), e); return 0; } - //Assumption is - Each wf run will have a directory //the corresponding job logs are stored within the respective dir Path path = new Path(context.getLogDir() + "/" diff --git a/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java index 7d1425a95..bc31f116f 100644 --- a/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java +++ b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java @@ -46,8 +46,17 @@ public class LogMoverService implements WorkflowExecutionListener { private BlockingQueue blockingQueue = new ArrayBlockingQueue<>(Integer.parseInt( StartupProperties.get().getProperty("falcon.logMoveService.blockingQueue.length", "50"))); - private ExecutorService executorService = new ThreadPoolExecutor(20, getThreadCount(), 120, + private ExecutorService executorService = new ThreadPoolExecutor(getCorePoolSize(), getThreadCount(), 120, TimeUnit.SECONDS, blockingQueue); + + public int getCorePoolSize(){ + try{ + return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.CorePoolSize", "20")); + } catch (NumberFormatException e){ + LOG.error("Exception in LogMoverService", e); + return 20; + } + } public int getThreadCount() { try{ return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.threadCount", "200")); @@ -86,7 +95,7 @@ private void onEnd(WorkflowExecutionContext context){ if (Boolean.parseBoolean(ENABLE_POSTPROCESSING)) { return; } - while(0=blockingQueue.remainingCapacity()){ try { LOG.trace("Sleeping, no capacity in threadpool...."); TimeUnit.MILLISECONDS.sleep(500); diff --git a/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java b/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java index abe6777fb..81ade6384 100644 --- a/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java +++ b/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java @@ -20,7 +20,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.falcon.aspect.ResourceMessage; -import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.process.Process; @@ -50,10 +50,9 @@ public void monitor(ResourceMessage message) { String entityName = StringUtils.isNotBlank(message.getDimensions().get("entityName")) ? message.getDimensions().get("entityName") :message.getDimensions().get("entity-name"); String prefix = StartupProperties.get().getProperty("falcon.graphite.prefix"); - String separator = "."; LOG.debug("message:" + message.getAction()); if (entityType.equalsIgnoreCase(EntityType.PROCESS.name())) { - Entity entity = ConfigurationStore.get().get(EntityType.PROCESS, entityName); + Entity entity = EntityUtil.getEntity(EntityType.PROCESS, entityName); Process process = (Process) entity; String pipeline = StringUtils.isNotBlank(process.getPipelines()) ? process.getPipelines() : "default"; diff --git a/src/conf/startup.properties b/src/conf/startup.properties index a2c34b6c4..f200b41aa 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -365,3 +365,5 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ ### LogMoveService Properties *.falcon.logMoveService.threadCount=200 *.falcon.logMoveService.blockingQueue.length=50 +*.falcon.logMoveService.CorePoolSize=20 +##Note CorePoolSize should always be smaller than threadCount. From 09afe53b277efa39c9671b13135c15f33448538f Mon Sep 17 00:00:00 2001 From: Praveen Adlakha Date: Thu, 20 Oct 2016 12:52:05 +0530 Subject: [PATCH 2/2] comments addressed --- common/src/main/resources/startup.properties | 6 +++--- .../java/org/apache/falcon/service/LogMoverService.java | 9 +++++---- .../apache/falcon/plugin/GraphiteNotificationPlugin.java | 7 ++++--- src/conf/startup.properties | 6 +++--- 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index ab720ae4a..d3f6e6936 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -346,7 +346,7 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle *.falcon.postprocessing.enable=true ### LogMoveService Properties -*.falcon.logMoveService.threadCount=200 +*.falcon.logMoveService.max.threadCount=200 *.falcon.logMoveService.blockingQueue.length=50 -*.falcon.logMoveService.CorePoolSize=20 -##Note CorePoolSize should always be smaller than threadCount. +##Note min threadCount should always be smaller than max threadCount. +*.falcon.logMoveService.min.threadCount=20 diff --git a/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java index bc31f116f..7e4640e8f 100644 --- a/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java +++ b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java @@ -51,7 +51,7 @@ public class LogMoverService implements WorkflowExecutionListener { public int getCorePoolSize(){ try{ - return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.CorePoolSize", "20")); + return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.min.threadCount", "20")); } catch (NumberFormatException e){ LOG.error("Exception in LogMoverService", e); return 20; @@ -59,10 +59,11 @@ public int getCorePoolSize(){ } public int getThreadCount() { try{ - return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.threadCount", "200")); + return Integer.parseInt(StartupProperties.get() + .getProperty("falcon.logMoveService.max.threadCount", "200")); } catch (NumberFormatException e){ LOG.error("Exception in LogMoverService", e); - return 50; + return 200; } } @@ -95,7 +96,7 @@ private void onEnd(WorkflowExecutionContext context){ if (Boolean.parseBoolean(ENABLE_POSTPROCESSING)) { return; } - while(0>=blockingQueue.remainingCapacity()){ + while(blockingQueue.remainingCapacity()<=0){ try { LOG.trace("Sleeping, no capacity in threadpool...."); TimeUnit.MILLISECONDS.sleep(500); diff --git a/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java b/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java index 81ade6384..06636272d 100644 --- a/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java +++ b/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java @@ -20,7 +20,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.falcon.aspect.ResourceMessage; -import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.process.Process; @@ -51,8 +51,9 @@ public void monitor(ResourceMessage message) { ? message.getDimensions().get("entityName") :message.getDimensions().get("entity-name"); String prefix = StartupProperties.get().getProperty("falcon.graphite.prefix"); LOG.debug("message:" + message.getAction()); - if (entityType.equalsIgnoreCase(EntityType.PROCESS.name())) { - Entity entity = EntityUtil.getEntity(EntityType.PROCESS, entityName); + if (entityType.equalsIgnoreCase(EntityType.PROCESS.name()) + && ConfigurationStore.get().get(EntityType.PROCESS, entityName) != null) { + Entity entity = ConfigurationStore.get().get(EntityType.PROCESS, entityName); Process process = (Process) entity; String pipeline = StringUtils.isNotBlank(process.getPipelines()) ? process.getPipelines() : "default"; diff --git a/src/conf/startup.properties b/src/conf/startup.properties index f200b41aa..d48520670 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -363,7 +363,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ *.falcon.postprocessing.enable=true ### LogMoveService Properties -*.falcon.logMoveService.threadCount=200 +*.falcon.logMoveService.max.threadCount=200 *.falcon.logMoveService.blockingQueue.length=50 -*.falcon.logMoveService.CorePoolSize=20 -##Note CorePoolSize should always be smaller than threadCount. +##Note min threadCount should always be smaller than max threadCount. +*.falcon.logMoveService.min.threadCount=20