Skip to content

Commit

Permalink
Refactors Fulgora and ScanJob (plus executors) to correspond to TP3 s…
Browse files Browse the repository at this point in the history
…etup and parallel execution semantics. Specifically, each thread now uses its own instance of ScanJob or VertexScanJob - cloning if necessary. Rather than using a ThreadPool the StandardScannerExecutor maintains the threads specifically.

There is now the notion of a workBlockSize - i.e. the number of rows that are worked on in one block of work. We use one instance of ScanJob per block and call workerIterationStart/End before/after such block. This provides better control over resources needed to execute a particular ScanJob.

Fixes #857. Fixes #865.
  • Loading branch information
mbroecheler committed Mar 18, 2015
1 parent d5712c5 commit 93b25d1
Show file tree
Hide file tree
Showing 19 changed files with 333 additions and 86 deletions.
Expand Up @@ -349,7 +349,8 @@ private StandardScanner.Builder buildStoreIndexScanJob(String storeName) {
.setTimestampProvider(provider) .setTimestampProvider(provider)
.setJobConfiguration(jobConfig) .setJobConfiguration(jobConfig)
.setGraphConfiguration(configuration) .setGraphConfiguration(configuration)
.setNumProcessingThreads(1); .setNumProcessingThreads(1)
.setWorkBlockSize(10000);
} }


public StandardScanner.ScanResult getScanJobStatus(Object jobId) { public StandardScanner.ScanResult getScanJobStatus(Object jobId) {
Expand Down
Expand Up @@ -10,30 +10,40 @@
import java.util.function.Predicate; import java.util.function.Predicate;


/** /**
* A computation over edgestore entries. * A global computation over
* *
* @author Matthias Broecheler (me@matthiasb.com) * @author Matthias Broecheler (me@matthiasb.com)
*/ */
public interface ScanJob { public interface ScanJob extends Cloneable {


/** /**
* Invoked prior to any other method on this {@code ScanJob} instance. * Invoked before a block of computation (i.e. multiple process() calls) is handed to this particular ScanJob.
* This will only be called once per instance. * Can be used to initialize the iteration. This method is called exactly once for each before a block of computation.
* This method is semantically aligned with {@link com.tinkerpop.gremlin.process.computer.VertexProgram#workerIterationStart()}
* *
* @param config * This method may not be called if there is no data to be processed. Correspondingly, the end method won't be called either.
* @param metrics *
* No-op default implementation.
*
* @param jobConfiguration configuration for this particular job
* @param graphConfiguration configuration options for the entire graph against which this job is executed
* @param metrics {@link com.thinkaurelius.titan.diskstorage.keycolumnvalue.scan.ScanMetrics} for this job
*/ */
public default void setup(Configuration jobConfiguration, public default void workerIterationStart(Configuration jobConfiguration,
Configuration graphConfiguration, ScanMetrics metrics) {} Configuration graphConfiguration, ScanMetrics metrics) {}


/** /**
* After this method is invoked, no additional method calls on this * Invoked after a block of computation (i.e. multiple process() calls) is handed to this particular ScanJob.
* {@code ScanJob} instance are permitted. * Can be used to close any resources held by this job. This method is called exactly once for each after a block of computation.
* This will only be called once per instance. * This method is semantically aligned with {@link com.tinkerpop.gremlin.process.computer.VertexProgram#workerIterationEnd()}
* *
* @param metrics * This method may not be called if there is no data to be processed. Correspondingly, the start method won't be called either.
*
* No-op default implementation.
*
* @param metrics {@link com.thinkaurelius.titan.diskstorage.keycolumnvalue.scan.ScanMetrics} for this job
*/ */
public default void teardown(ScanMetrics metrics) {} public default void workerIterationEnd(ScanMetrics metrics) {}


/** /**
* Run this {@code ScanJob}'s computation on the supplied row-key and entries. * Run this {@code ScanJob}'s computation on the supplied row-key and entries.
Expand Down Expand Up @@ -109,4 +119,12 @@ public default Predicate<StaticBuffer> getKeyFilter() {
return b -> true; //No filter by default return b -> true; //No filter by default
} }


/**
* Returns a clone of this ScanJob. The clone will not yet be initialized for computation but all of
* its internal state (if any) must match that of the original copy.
*
* @return A clone of this {@link com.thinkaurelius.titan.diskstorage.keycolumnvalue.scan.ScanJob}
*/
public ScanJob clone();

} }
Expand Up @@ -72,8 +72,11 @@ public ScanResult getRunningJob(Object jobId) {


public class Builder { public class Builder {


private static final int DEFAULT_WORKBLOCK_SIZE = 10000;

private ScanJob job; private ScanJob job;
private int numProcessingThreads; private int numProcessingThreads;
private int workBlockSize;
private TimestampProvider times; private TimestampProvider times;
private Configuration graphConfiguration; private Configuration graphConfiguration;
private Configuration jobConfiguration; private Configuration jobConfiguration;
Expand All @@ -83,6 +86,7 @@ public class Builder {


private Builder() { private Builder() {
numProcessingThreads = 1; numProcessingThreads = 1;
workBlockSize = DEFAULT_WORKBLOCK_SIZE;
job = null; job = null;
times = null; times = null;
graphConfiguration = Configuration.EMPTY; graphConfiguration = Configuration.EMPTY;
Expand All @@ -99,6 +103,12 @@ public Builder setNumProcessingThreads(int numThreads) {
return this; return this;
} }


public Builder setWorkBlockSize(int size) {
Preconditions.checkArgument(size>0, "Need to specify a positive work block size: %s",size);
this.workBlockSize = size;
return this;
}

public Builder setTimestampProvider(TimestampProvider times) { public Builder setTimestampProvider(TimestampProvider times) {
Preconditions.checkArgument(times!=null); Preconditions.checkArgument(times!=null);
this.times=times; this.times=times;
Expand Down Expand Up @@ -178,7 +188,7 @@ public ScanResult execute() throws BackendException {
openStores.add(kcvs); openStores.add(kcvs);
try { try {
StandardScannerExecutor executor = new StandardScannerExecutor(job, finishJob, kcvs, storeTx, StandardScannerExecutor executor = new StandardScannerExecutor(job, finishJob, kcvs, storeTx,
manager.getFeatures(), numProcessingThreads, jobConfiguration, graphConfiguration); manager.getFeatures(), numProcessingThreads, workBlockSize, jobConfiguration, graphConfiguration);
addJob(jobId,executor); addJob(jobId,executor);
new Thread(executor).start(); new Thread(executor).start();
return executor; return executor;
Expand Down
Expand Up @@ -9,6 +9,7 @@
import com.thinkaurelius.titan.diskstorage.util.RecordIterator; import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry; import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntryList; import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntryList;
import com.thinkaurelius.titan.util.system.Threads;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -36,6 +37,7 @@ class StandardScannerExecutor extends AbstractFuture<ScanMetrics> implements Sta
private final StoreTransaction storeTx; private final StoreTransaction storeTx;
private final KeyColumnValueStore store; private final KeyColumnValueStore store;
private final int numProcessors; private final int numProcessors;
private final int workBlockSize;
private final Configuration jobConfiguration; private final Configuration jobConfiguration;
private final Configuration graphConfiguration; private final Configuration graphConfiguration;
private final ScanMetrics metrics; private final ScanMetrics metrics;
Expand All @@ -48,18 +50,19 @@ class StandardScannerExecutor extends AbstractFuture<ScanMetrics> implements Sta
private List<BlockingQueue<SliceResult>> dataQueues; private List<BlockingQueue<SliceResult>> dataQueues;
private DataPuller[] pullThreads; private DataPuller[] pullThreads;



StandardScannerExecutor(final ScanJob job, final Consumer<ScanMetrics> finishJob, StandardScannerExecutor(final ScanJob job, final Consumer<ScanMetrics> finishJob,
final KeyColumnValueStore store, final StoreTransaction storeTx, final KeyColumnValueStore store, final StoreTransaction storeTx,
final StoreFeatures storeFeatures, final StoreFeatures storeFeatures,
final int numProcessors, final Configuration jobConfiguration, final int numProcessors, final int workBlockSize,
final Configuration jobConfiguration,
final Configuration graphConfiguration) throws BackendException { final Configuration graphConfiguration) throws BackendException {
this.job = job; this.job = job;
this.finishJob = finishJob; this.finishJob = finishJob;
this.store = store; this.store = store;
this.storeTx = storeTx; this.storeTx = storeTx;
this.storeFeatures = storeFeatures; this.storeFeatures = storeFeatures;
this.numProcessors = numProcessors; this.numProcessors = numProcessors;
this.workBlockSize = workBlockSize;
this.jobConfiguration = jobConfiguration; this.jobConfiguration = jobConfiguration;
this.graphConfiguration = graphConfiguration; this.graphConfiguration = graphConfiguration;


Expand All @@ -80,7 +83,7 @@ private final DataPuller addDataPuller(SliceQuery sq, StoreTransaction stx) thro
@Override @Override
public void run() { public void run() {
try { try {
job.setup(jobConfiguration, graphConfiguration, metrics); job.workerIterationStart(jobConfiguration, graphConfiguration, metrics);


queries = job.getQueries(); queries = job.getQueries();
numQueries = queries.size(); numQueries = queries.size();
Expand All @@ -104,13 +107,19 @@ public void run() {
} catch (Throwable e) { } catch (Throwable e) {
log.error("Exception trying to setup the job:", e); log.error("Exception trying to setup the job:", e);
cleanupSilent(); cleanupSilent();
job.teardown(metrics); job.workerIterationEnd(metrics);
setException(e); setException(e);
return; return;
} }


ThreadPoolExecutor processor = new ThreadPoolExecutor(numProcessors, numProcessors, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(QUEUE_SIZE)); BlockingQueue<Row> processorQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
processor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
Processor[] processors = new Processor[numProcessors];
for (int i=0;i<processors.length;i++) {
processors[i]= new Processor(job.clone(),processorQueue);
processors[i].start();
}

try { try {
SliceResult[] currentResults = new SliceResult[numQueries]; SliceResult[] currentResults = new SliceResult[numQueries];
while (!interrupted) { while (!interrupted) {
Expand Down Expand Up @@ -142,8 +151,7 @@ public void run() {
} }
queryResults.put(query,entries); queryResults.put(query,entries);
} }
processor.submit(new RowProcessor(key, queryResults)); processorQueue.put(new Row(key, queryResults));

} }


for (int i = 0; i < pullThreads.length; i++) { for (int i = 0; i < pullThreads.length; i++) {
Expand All @@ -154,12 +162,13 @@ public void run() {
} }
} }


processor.shutdown(); for (int i=0; i<processors.length;i++) {
processor.awaitTermination(TIMEOUT_MS,TimeUnit.MILLISECONDS); processors[i].finish();
if (!processor.isTerminated()) log.error("Processor did not terminate in time"); }
if (!Threads.waitForCompletion(processors,TIMEOUT_MS)) log.error("Processor did not terminate in time");


cleanup(); cleanup();
job.teardown(metrics); job.workerIterationEnd(metrics);


if (interrupted) { if (interrupted) {
setException(new InterruptedException("Scanner got interrupted")); setException(new InterruptedException("Scanner got interrupted"));
Expand All @@ -169,10 +178,10 @@ public void run() {
} }
} catch (Throwable e) { } catch (Throwable e) {
log.error("Exception occured during job execution: {}",e); log.error("Exception occured during job execution: {}",e);
job.teardown(metrics); job.workerIterationEnd(metrics);
setException(e); setException(e);
} finally { } finally {
processor.shutdownNow(); Threads.terminate(processors);
cleanupSilent(); cleanupSilent();
} }
} }
Expand Down Expand Up @@ -209,28 +218,75 @@ public ScanMetrics getIntermediateResult() {
return metrics; return metrics;
} }


private class RowProcessor implements Runnable { private static class Row {


private final StaticBuffer key; final StaticBuffer key;
private final Map<SliceQuery,EntryList> entries; final Map<SliceQuery,EntryList> entries;


private RowProcessor(StaticBuffer key, Map<SliceQuery, EntryList> entries) { private Row(StaticBuffer key, Map<SliceQuery, EntryList> entries) {
this.key = key; this.key = key;
this.entries = entries; this.entries = entries;
} }
}



private class Processor extends Thread {

private ScanJob job;
private final BlockingQueue<Row> processorQueue;

private volatile boolean finished;
private int numProcessed;


private Processor(ScanJob job, BlockingQueue<Row> processorQueue) {
this.job = job;
this.processorQueue = processorQueue;

this.finished = false;
this.numProcessed = 0;
}


@Override @Override
public void run() { public void run() {
try { try {
job.process(key,entries,metrics); job.workerIterationStart(jobConfiguration, graphConfiguration, metrics);
metrics.increment(ScanMetrics.Metric.SUCCESS); while (!finished) {
} catch (Throwable ex) { Row row;
log.error("Exception processing row ["+key+"]: ",ex); while ((row=processorQueue.poll(100,TimeUnit.MILLISECONDS))!=null) {
metrics.increment(ScanMetrics.Metric.FAILURE); if (numProcessed>=workBlockSize) {
//Setup new chunk of work
job.workerIterationEnd(metrics);
job = job.clone();
job.workerIterationStart(jobConfiguration, graphConfiguration, metrics);
numProcessed=0;
}
try {
job.process(row.key,row.entries,metrics);
metrics.increment(ScanMetrics.Metric.SUCCESS);
} catch (Throwable ex) {
log.error("Exception processing row ["+row.key+"]: ",ex);
metrics.increment(ScanMetrics.Metric.FAILURE);
}
numProcessed++;
}
}
} catch (InterruptedException e) {
log.error("Processing thread interrupted while waiting on queue or processing data", e);
} catch (Throwable e) {
log.error("Unexpected error processing data: {}",e);
} finally {
job.workerIterationEnd(metrics);
} }
} }

public void finish() {
this.finished=true;
}
} }



private static class DataPuller extends Thread { private static class DataPuller extends Thread {


private final BlockingQueue<SliceResult> queue; private final BlockingQueue<SliceResult> queue;
Expand Down
Expand Up @@ -53,10 +53,16 @@ public class VertexJobConverter implements ScanJob {
protected VertexJobConverter(TitanGraph graph, VertexScanJob job) { protected VertexJobConverter(TitanGraph graph, VertexScanJob job) {
Preconditions.checkArgument(job!=null); Preconditions.checkArgument(job!=null);
this.graph = new GraphProvider(); this.graph = new GraphProvider();
this.graph.setGraph(graph); if (graph!=null) this.graph.setGraph(graph);
this.job = job; this.job = job;
} }


protected VertexJobConverter(VertexJobConverter copy) {
this.graph = new GraphProvider();
if (copy.graph.isProvided()) this.graph.setGraph(copy.graph.get());
this.job = copy.job.clone();
}

public static ScanJob convert(TitanGraph graph, VertexScanJob vertexJob) { public static ScanJob convert(TitanGraph graph, VertexScanJob vertexJob) {
return new VertexJobConverter(graph,vertexJob); return new VertexJobConverter(graph,vertexJob);
} }
Expand All @@ -66,7 +72,7 @@ public static ScanJob convert(VertexScanJob vertexJob) {
} }


@Override @Override
public void setup(Configuration jobConfig, Configuration graphConfig, ScanMetrics metrics) { public void workerIterationStart(Configuration jobConfig, Configuration graphConfig, ScanMetrics metrics) {
graph.initializeGraph(graphConfig); graph.initializeGraph(graphConfig);
idManager = graph.get().getIDManager(); idManager = graph.get().getIDManager();
StandardTransactionBuilder txb = graph.get().buildTransaction().readOnly(); StandardTransactionBuilder txb = graph.get().buildTransaction().readOnly();
Expand All @@ -76,7 +82,7 @@ public void setup(Configuration jobConfig, Configuration graphConfig, ScanMetric
txb.vertexCacheSize(500); txb.vertexCacheSize(500);
try { try {
tx = (StandardTitanTx)txb.start(); tx = (StandardTitanTx)txb.start();
job.setup(graph.get(), jobConfig, metrics); job.workerIterationStart(graph.get(), jobConfig, metrics);
} catch (Throwable e) { } catch (Throwable e) {
close(); close();
throw e; throw e;
Expand All @@ -90,8 +96,8 @@ private void close() {
} }


@Override @Override
public void teardown(ScanMetrics metrics) { public void workerIterationEnd(ScanMetrics metrics) {
job.teardown(metrics); job.workerIterationEnd(metrics);
close(); close();
} }


Expand Down Expand Up @@ -154,6 +160,11 @@ public Predicate<StaticBuffer> getKeyFilter() {
}; };
} }


@Override
public VertexJobConverter clone() {
return new VertexJobConverter(this);
}

protected long getVertexId(StaticBuffer key) { protected long getVertexId(StaticBuffer key) {
return idManager.getKeyID(key); return idManager.getKeyID(key);
} }
Expand Down Expand Up @@ -182,6 +193,10 @@ public void close() {
} }
} }


public boolean isProvided() {
return provided;
}

public final StandardTitanGraph get() { public final StandardTitanGraph get() {
Preconditions.checkState(graph!=null); Preconditions.checkState(graph!=null);
return graph; return graph;
Expand Down

1 comment on commit 93b25d1

@mbroecheler
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dalaro Have a look at this commit - it addresses some of the semantic differences in how the start and ends of computation are defined in TP3. Please verify that this doesn't mess with the work you are currently doing on GraphComputer.

Please sign in to comment.