Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Added configuration propety allowing job polling interval to be confi…

…gured per c.f.Flow via Flow#setJobPollingInterval().
  • Loading branch information...
commit 263bc257bdf402cb5c05e48e7f6a215c91ac226a 1 parent 8595a9e
@cwensel cwensel authored
View
3  CHANGES.txt
@@ -2,6 +2,9 @@ Cascading Change Log
unreleased
+ Added configuration propety allowing job polling interval to be configured per c.f.Flow via
+ Flow#setJobPollingInterval().
+
Updated ant build to not hard-code hadoop/lib sub-dir names.
1.0.13
View
45 src/core/cascading/flow/Flow.java
@@ -38,8 +38,19 @@
import org.jgrapht.traverse.TopologicalOrderIterator;
import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
/**
* A {@link Pipe} assembly is connected to the necessary number of {@link Tap} sinks and
@@ -138,7 +149,7 @@ public static boolean getPreserveTemporaryFiles( Map<Object, Object> properties
}
/**
- * Propety stopJobsOnExit will tell the Flow to add a JVM shutdown hook that will kill all running processes if the
+ * Property stopJobsOnExit will tell the Flow to add a JVM shutdown hook that will kill all running processes if the
* underlying computing system supports it. Defaults to {@code true}.
*
* @param properties of type Map
@@ -160,6 +171,34 @@ public static boolean getStopJobsOnExit( Map<Object, Object> properties )
return Boolean.parseBoolean( Util.getProperty( properties, "cascading.flow.stopjobsonexit", Boolean.toString( true ) ) );
}
+ /**
+ * Property jobPollingInterval will set the time to wait between polling the remote server for the status of a job.
+ * The default value is 5000 msec (5 seconds). *
+ *
+ * @param properties of type Map
+ * @param interval of type long
+ */
+ public static void setJobPollingInterval( Map<Object, Object> properties, long interval )
+ {
+ properties.put( "cascading.flow.job.pollinginterval", Long.toString( interval ) );
+ }
+
+ /**
+ * Returns property jobPollingInterval. The default is 5000 (5 sec).
+ *
+ * @param properties of type Map
+ * @return a long
+ */
+ public static long getJobPollingInterval( Map<Object, Object> properties )
+ {
+ return Long.parseLong( Util.getProperty( properties, "cascading.flow.job.pollinginterval", Long.toString( 5000 ) ) );
+ }
+
+ static long getJobPollingInterval( JobConf jobConf )
+ {
+ return jobConf.getLong( "cascading.flow.job.pollinginterval", 5000 );
+ }
+
/** Used for testing. */
protected Flow()
{
View
14 src/core/cascading/flow/FlowStep.java
@@ -427,11 +427,13 @@ public FlowStepJob getFlowStepJob( JobConf parentConf ) throws IOException
private List<FlowStepJob> predecessors;
private final CountDownLatch latch = new CountDownLatch( 1 );
private boolean stop = false;
+ private long pollingInterval;
public FlowStepJob( String stepName, JobConf currentConf )
{
this.stepName = stepName;
this.currentConf = currentConf;
+ this.pollingInterval = Flow.getJobPollingInterval( currentConf );
}
public void stop()
@@ -480,7 +482,17 @@ public Throwable call()
currentJobClient = new JobClient( currentConf );
runningJob = currentJobClient.submitJob( currentConf );
- runningJob.waitForCompletion();
+ while( !runningJob.isComplete() )
+ {
+ try
+ {
+ Thread.sleep( pollingInterval );
+ }
+ catch( InterruptedException exception )
+ {
+ // ignore exception
+ }
+ }
if( !stop && !runningJob.isSuccessful() )
{
View
4 src/test/cascading/ClusterTestCase.java
@@ -21,14 +21,15 @@
package cascading;
+import cascading.flow.Flow;
import cascading.flow.MultiMapReducePlanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
import java.io.File;
import java.io.IOException;
@@ -115,6 +116,7 @@ public void setUp() throws IOException
if( logger != null )
properties.put( "log4j.logger", logger );
+ Flow.setJobPollingInterval( properties, 500 ); // should speed up tests
MultiMapReducePlanner.setJobConf( properties, jobConf );
}
Please sign in to comment.
Something went wrong with that request. Please try again.