Filter Pipes

okram edited this page Sep 8, 2011 · 8 revisions

A common Pipes pattern is the filtering of objects. A filter-based pipe will consume objects and either emit them or not. If they are not emitted, then they are considered filtered by the pipe. A useful interface to implement that describes this behavior is FilterPipe<S> extends Pipe<S,S>.

Generic Filter Pipe

The generic filter pipe is FilterFunctionPipe. A FilterFunctionPipe takes a PipeFunction (see Pipe Types) that computes on S and either emits it or not. An example PipeFunction is provided below:

public class CharCountPipeFunction implements PipeFunction<String,Boolean> {
  private final int number;
  public CharCountPipeFunction(int number) {
    this.number = number;

  public Boolean compute(String argument) {
     return argument.length() == this.number;

When put in the context of a FilterFunctionPipe, the code looks as follows:

Pipe<String, String> pipe = new FilterFunctionPipe<String>(new CharCountPipeFunction(4));
pipe.setStarts(Arrays.asList("tell", "me", "your", "name"));
// the results of the iteration are: "tell", "your", "name"

Basic Filtering

The RandomFilterPipe comes with the Pipes distribution. RandomFilterPipe will only allow a consumed object to be emitted if a biased coin toss lands on “heads.” At the extremes, if bias is 0.0 then no incoming objects are emitted and if bias is 1.0, then every incoming object is emitted.

public class RandomFilterPipe<S> extends AbstractPipe<S, S> implements FilterPipe<S> {
    private static final Random RANDOM = new Random();
    private final double bias;
    public SampleFilterPipe(final double bias) {
        this.bias = bias;
    protected S processNextStart() {
        while (this.starts.hasNext()) {
            S s =;
            if (bias >= RANDOM.nextDouble()) {
                return s;
        throw new NoSuchElementException();

The processNextStart() method structure above is a common pattern in filter-based pipes. In short, a while(this.starts.hasNext()) is evaluated until an incoming start object meets the criteria and is returned (i.e. emitted). If there are no more starts and the criteria is not met, then a NoSuchElementException is thrown. Beware of using recursion to accomplish a similar behavior. As the amount of data grows that is streaming through, recursion-based methods can easily yield a StackOverflowError.

Comparison-Based Filtering

More complicated filters can be created than what was presented above. These filters usually check objects to determine whether to emit them or not. There is an interface called ComparisonFilterPipe<S,T>. Implementations of this interface consume objects of type S. If a check of that object passes (i.e. return true), then the S object is emitted, else it is filtered. ComparisonFilterPipe has the following signature:

public interface ComparisonFilterPipe<S, T> extends FilterPipe<S> {
    public enum Filter {
    public boolean compareObjects(T leftObject, T rightObject);

The important method is compareObjects(). This method returns true if the left hand object is EQUAL, NOT_EQUAL, etc. to the right hand object.

Next, there is an abstract class called AbstractComparisonFilterPipe<S,T> that implements ComparisonFilterPipe. More specifically, it provides a standard implementation of ComparisonFilterPipe.compareObjects(). For most situations, this method is sufficient. However, if not, simply override the method with an implementation that meets the requirements of the designed pipe.

public boolean compareObjects(final T leftObject, final T rightObject) {
        switch (this.filter) {
            case EQUAL:
                if (null == leftObject)
                    return rightObject == null;
                return leftObject.equals(rightObject);
            case NOT_EQUAL:
                if (null == leftObject)
                    return rightObject != null;
                return !leftObject.equals(rightObject);
            case GREATER_THAN:
                if (null == leftObject || rightObject == null)
                    return false;
                return ((Comparable) leftObject).compareTo(rightObject) == 1;
            case LESS_THAN:
                if (null == leftObject || rightObject == null)
                    return false;
                return ((Comparable) leftObject).compareTo(rightObject) == -1;
            case GREATER_THAN_EQUAL:
                if (null == leftObject || rightObject == null)
                    return false;
                return ((Comparable) leftObject).compareTo(rightObject) >= 0;
            case LESS_THAN_EQUAL:
                if (null == leftObject || rightObject == null)
                    return false;
                return ((Comparable) leftObject).compareTo(rightObject) <= 0;
                throw new RuntimeException("Invalid state as no valid filter was provided");

Note: it many situations, such as ObjectFilterPipe, the right hand object to allow or disallow is stored in the pipe and compared with each object passed through it.