Skip to content

Commit

Permalink
Simplified PipesFluentPipeline interface.
Browse files Browse the repository at this point in the history
  • Loading branch information
okram committed Apr 4, 2012
1 parent 996be4d commit a2aa669
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 63 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.textile
Expand Up @@ -24,6 +24,7 @@ h3. Version 1.1 (NOT OFFICIALLY RELEASED YET)
* Merged @PathPipe@ and @PathFunctionPipe@ into one class called @PathPipe@
* Added @AbstractMetaPipe@ to encapsulate reused behavior in all @MetaPipe@ implementations
* The @CopySplitPipe@, @FairMergePipe@, and @ExhaustMergePipe@ all have functioning path calculations
* Removed unused extra methods for merging in @PipesFluentPipeline@ interface (and updated respective implementations)

h2. Pipes 0.X

Expand Down
13 changes: 9 additions & 4 deletions src/main/java/com/tinkerpop/pipes/branch/CopySplitPipe.java
Expand Up @@ -24,10 +24,15 @@ public class CopySplitPipe<S> extends AbstractMetaPipe<S, S> implements MetaPipe

public CopySplitPipe(final List<Pipe> pipes) {
for (final Pipe pipe : pipes) {
final Pipeline<S, ?> pipeline = new Pipeline<S, Object>();
pipeline.addPipe(new CopyExpandablePipe<S>(this));
pipeline.addPipe(pipe);
this.pipes.add(pipeline);
if (pipe instanceof Pipeline) {
((Pipeline) pipe).addPipe(0, new CopyExpandablePipe<S>(this));
this.pipes.add((Pipeline)pipe);
} else {
final Pipeline<S, ?> pipeline = new Pipeline<S, Object>();
pipeline.addPipe(new CopyExpandablePipe<S>(this));
pipeline.addPipe(pipe);
this.pipes.add(pipeline);
}
}
}

Expand Down
Expand Up @@ -26,15 +26,6 @@ public ExhaustMergePipe(final List<Pipe> pipes) {
this.total = pipes.size();
}

public ExhaustMergePipe(final Pipe... pipes) {
this(Arrays.asList(pipes));
}

@Override
public void setStarts(final Iterator<S> iterator) {

}

public S processNextStart() {
while (true) {
final Pipe pipe = this.pipes.get(this.current);
Expand Down
19 changes: 7 additions & 12 deletions src/main/java/com/tinkerpop/pipes/branch/FairMergePipe.java
Expand Up @@ -5,8 +5,6 @@
import com.tinkerpop.pipes.util.MetaPipe;
import com.tinkerpop.pipes.util.PipeHelper;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

Expand All @@ -26,15 +24,6 @@ public FairMergePipe(final List<Pipe> pipes) {
this.total = pipes.size();
}

public FairMergePipe(final Pipe... pipes) {
this(Arrays.asList(pipes));
}

@Override
public void setStarts(final Iterator<S> iterator) {

}

public S processNextStart() {
int counter = 0;
while (true) {
Expand All @@ -45,7 +34,13 @@ public S processNextStart() {
this.current = (this.current + 1) % this.total;
return s;
} else if (counter == this.total) {
throw new NoSuchElementException();
boolean quit = true;
for (Pipe pipe : this.pipes) {
if (pipe.hasNext())
quit = false;
}
if (quit)
throw new NoSuchElementException();
} else {
this.current = (this.current + 1) % this.total;
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/tinkerpop/pipes/branch/LoopPipe.java
Expand Up @@ -59,7 +59,7 @@ protected S processNextStart() {
}

public List<Pipe> getPipes() {
return (List) Arrays.asList(pipe);
return (List) Arrays.asList(this.pipe);
}

public void setStarts(final Iterator<S> iterator) {
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/tinkerpop/pipes/util/FluentUtility.java
Expand Up @@ -77,6 +77,19 @@ public static List<Pipe> removePreviousPipes(final Pipeline pipeline, final Stri
return previousPipes;
}

public static Pipe getPreviousPipe(final Pipeline pipeline) {
return pipeline.get(pipeline.size() - 1);
}

public static List<Pipe> getPreviousPipes(final Pipeline pipeline, final int numberedStep) {
final List<Pipe> previousPipes = new ArrayList<Pipe>();
int pipelineSize = pipeline.size();
for (int i = 0; i < numberedStep; i++) {
previousPipes.add(pipeline.get(pipelineSize - i + 1));
}
return previousPipes;
}

public static void setStarts(final Pipeline pipeline, final Object starts) {
if (starts instanceof Iterator) {
pipeline.setStarts((Iterator) starts);
Expand Down
14 changes: 5 additions & 9 deletions src/main/java/com/tinkerpop/pipes/util/Pipeline.java
Expand Up @@ -65,15 +65,6 @@ protected void setPipes(final List<Pipe> pipes) {
}
}

/**
* Useful for constructing the pipeline chain without making use of the constructor.
*
* @param pipes the ordered array of pipes to chain together into a pipeline
*/
protected void setPipes(final Pipe... pipes) {
this.setPipes(Arrays.asList(pipes));
}

/**
* Adds a new pipe to the end of the pipeline and then reconstructs the pipeline chain.
*
Expand All @@ -84,6 +75,11 @@ public void addPipe(final Pipe pipe) {
this.setPipes(this.pipes);
}

public void addPipe(final int location, final Pipe pipe) {
this.pipes.add(location, pipe);
this.setPipes(this.pipes);
}

public void setStarts(final Iterator<S> starts) {
this.starts = starts;
this.startPipe.setStarts(starts);
Expand Down
18 changes: 0 additions & 18 deletions src/main/java/com/tinkerpop/pipes/util/PipesFluentPipeline.java
Expand Up @@ -50,15 +50,6 @@ public interface PipesFluentPipeline<S, E> {
*/
public PipesFluentPipeline<S, ?> copySplit(final Pipe<E, ?>... pipes);

/**
* Add an ExhaustMergePipe to the end of the pipeline.
* The provided pipes' emitted objects are merged where the first pipe's objects are exhausted, then the second, etc.
*
* @param pipes the internal pipes ExhaustMergePipe
* @return the extended Pipeline
*/
public PipesFluentPipeline<S, ?> exhaustMerge(final Pipe<E, ?>... pipes);

/**
* Add an ExhaustMergePipe to the end of the pipeline.
* The one-step previous MetaPipe in the pipeline's pipes are used as the internal pipes.
Expand All @@ -68,15 +59,6 @@ public interface PipesFluentPipeline<S, E> {
*/
public PipesFluentPipeline<S, ?> exhaustMerge();

/**
* Add a FairMergePipe to the end of the pipeline.
* The provided pipes' emitted objects are merged in a round robin fashion.
*
* @param pipes the internal pipes of the FairMergePipe
* @return the extended Pipeline
*/
public PipesFluentPipeline<S, ?> fairMerge(final Pipe<E, ?>... pipes);

/**
* Add a FairMergePipe to the end of the Pipeline.
* The one-step previous MetaPipe in the pipeline's pipes are used as the internal pipes.
Expand Down
12 changes: 2 additions & 10 deletions src/main/java/com/tinkerpop/pipes/util/PipesPipeline.java
Expand Up @@ -86,20 +86,12 @@ public <T> PipesPipeline<S, T> step(final Pipe<E, T> pipe) {
return this.add(new CopySplitPipe(pipes));
}

public PipesPipeline<S, ?> exhaustMerge(final Pipe... pipes) {
return this.add(new ExhaustMergePipe(pipes));
}

public PipesPipeline<S, ?> exhaustMerge() {
return this.add(new ExhaustMergePipe((((MetaPipe) FluentUtility.removePreviousPipes(this, 1).get(0)).getPipes())));
}

public PipesPipeline<S, ?> fairMerge(final Pipe... pipes) {
return this.add(new FairMergePipe(pipes));
return this.add(new ExhaustMergePipe((((MetaPipe) FluentUtility.getPreviousPipe(this)).getPipes())));
}

public PipesPipeline<S, ?> fairMerge() {
return this.add(new FairMergePipe((((MetaPipe) FluentUtility.removePreviousPipes(this, 1).get(0)).getPipes())));
return this.add(new FairMergePipe((((MetaPipe) FluentUtility.getPreviousPipe(this)).getPipes())));
}

public PipesPipeline<S, ?> ifThenElse(final PipeFunction<E, Boolean> ifFunction, final PipeFunction<E, ?> thenFunction, final PipeFunction<E, ?> elseFunction) {
Expand Down

0 comments on commit a2aa669

Please sign in to comment.