Skip to content

Commit

Permalink
Input filtering in importer still shows progress
Browse files Browse the repository at this point in the history
Previously filtering of input data could potentially have large sections
of input be filtered and so now progress displayed. Since both progress
and processor assignments is based on number of processed batches the
behaviour of the iterator batch now changed so that it sends empty no-op
batches for the skipped entries.
  • Loading branch information
tinwelint committed Apr 18, 2017
1 parent 0568632 commit 008f2af
Showing 1 changed file with 38 additions and 8 deletions.
Expand Up @@ -33,38 +33,68 @@ public abstract class IteratorBatcherStep<T> extends IoProducerStep
{ {
private final Iterator<T> data; private final Iterator<T> data;
private final Class<T> itemClass; private final Class<T> itemClass;
protected long cursor;
private final Predicate<T> filter; private final Predicate<T> filter;


protected long cursor;
private T[] batch;
private int batchCursor;
private int skipped;

public IteratorBatcherStep( StageControl control, Configuration config, Iterator<T> data, Class<T> itemClass, public IteratorBatcherStep( StageControl control, Configuration config, Iterator<T> data, Class<T> itemClass,
Predicate<T> filter ) Predicate<T> filter )
{ {
super( control, config ); super( control, config );
this.data = data; this.data = data;
this.itemClass = itemClass; this.itemClass = itemClass;
this.filter = filter; this.filter = filter;
newBatch();
}

@SuppressWarnings( "unchecked" )
private void newBatch()
{
batchCursor = 0;
batch = (T[]) Array.newInstance( itemClass, batchSize );
} }


@Override @Override
protected Object nextBatchOrNull( long ticket, int batchSize ) protected Object nextBatchOrNull( long ticket, int batchSize )
{ {
@SuppressWarnings( "unchecked" ) while ( data.hasNext() )
T[] batch = (T[]) Array.newInstance( itemClass, batchSize );
int i = 0;
for ( ; i < batchSize && data.hasNext(); )
{ {
T candidate = data.next(); T candidate = data.next();
if ( filter.test( candidate ) ) if ( filter.test( candidate ) )
{ {
batch[i++] = candidate; batch[batchCursor++] = candidate;
cursor++; cursor++;
if ( batchCursor == batchSize )
{
T[] result = batch;
newBatch();
return result;
}
}
else
{
if ( ++skipped == batchSize )
{
skipped = 0;
return Array.newInstance( itemClass, 0 );
}
} }
} }


if ( i == 0 ) if ( batchCursor == 0 )
{ {
return null; // marks the end return null; // marks the end
} }
return i == batchSize ? batch : Arrays.copyOf( batch, i ); try
{
return batchCursor == batchSize ? batch : Arrays.copyOf( batch, batchCursor );
}
finally
{
batchCursor = 0;
}
} }
} }

0 comments on commit 008f2af

Please sign in to comment.