Skip to content

Commit

Permalink
Added StreamDelegate so we can access stream properties from closures
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Yates committed Apr 3, 2013
1 parent d715f7c commit 0b3ee32
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 35 deletions.
70 changes: 50 additions & 20 deletions src/main/groovy/groovy/stream/AbstractStream.java
Expand Up @@ -16,6 +16,7 @@
package groovy.stream ;

import groovy.lang.Closure ;
import groovy.lang.GroovyObjectSupport ;
import java.util.List ;
import java.util.Map ;
import java.util.HashMap ;
Expand All @@ -26,9 +27,45 @@
* @author Tim Yates
*/
abstract class AbstractStream<T,D> implements StreamInterface<T> {
protected static final Map<String,StreamStopper> stopDelegate = new HashMap<String,StreamStopper>() {{
put( "STOP", StreamStopper.getInstance() ) ;
}} ;
protected class StreamDelegate extends GroovyObjectSupport {
private Map<String,Object> backingMap ;
private Map<String,Object> currentMap ;

private StreamDelegate( Map<String,Object> using ) {
this( using, new HashMap<String,Object>() ) ;
}

private StreamDelegate( Map<String,Object> using, Map<String,Object> current ) {
this.backingMap = using ;
this.currentMap = current ;
}

public void propertyMissing( String name, Object value ) {
System.out.println( "PM: '" + name + "' " + value ) ;
if( currentMap.keySet().contains( name ) ) {
currentMap.put( name, value ) ;
}
else if( backingMap.keySet().contains( name ) ) {
backingMap.put( name, value ) ;
}
}

public Object propertyMissing( String name ) {
if( "streamIndex".equals( name ) ) { return getStreamIndex() ; }
if( "unfilteredIndex".equals( name ) ) { return getUnfilteredIndex() ; }
if( "exhausted".equals( name ) ) { return isExhausted() ; }
if( "STOP".equals( name ) ) { return StreamStopper.getInstance() ; }
if( currentMap.keySet().contains( name ) ) { return currentMap.get( name ) ; }
else { return backingMap.get( name ) ; }
}

@SuppressWarnings("unchecked")
protected StreamDelegate integrateCurrent( Map currentMap ) {
return new StreamDelegate( backingMap, currentMap ) ;
}
}

final StreamDelegate delegate ;
protected int streamIndex = -1 ;
protected int unfilteredIndex = -1 ;
protected boolean exhausted = false ;
Expand All @@ -44,21 +81,23 @@ abstract class AbstractStream<T,D> implements StreamInterface<T> {
protected AbstractStream( Closure<D> definition, Closure condition, Closure<T> transform, Map<String,Object> using, Closure<Boolean> until ) {
this.using = using ;

this.delegate = new StreamDelegate( this.using ) ;

this.definition = definition ;
this.definition.setDelegate( this.using ) ;
this.definition.setResolveStrategy( Closure.DELEGATE_FIRST ) ;
this.definition.setDelegate( this.delegate ) ;
this.definition.setResolveStrategy( Closure.DELEGATE_ONLY ) ;

this.condition = condition ;
this.condition.setDelegate( this.using ) ;
this.condition.setResolveStrategy( Closure.DELEGATE_FIRST ) ;
this.condition.setDelegate( this.delegate ) ;
this.condition.setResolveStrategy( Closure.DELEGATE_ONLY ) ;

this.transform = transform ;
this.transform.setDelegate( this.using ) ;
this.transform.setResolveStrategy( Closure.DELEGATE_FIRST ) ;
this.transform.setDelegate( this.delegate ) ;
this.transform.setResolveStrategy( Closure.DELEGATE_ONLY ) ;

this.until = until ;
this.until.setDelegate( this.using ) ;
this.until.setResolveStrategy( Closure.DELEGATE_FIRST ) ;
this.until.setDelegate( this.delegate ) ;
this.until.setResolveStrategy( Closure.DELEGATE_ONLY ) ;
}

protected Closure<D> getDefinition() { return definition ; }
Expand All @@ -83,15 +122,6 @@ public boolean isExhausted() {

protected abstract void loadNext() ;

@SuppressWarnings("unchecked")
protected Map generateMapDelegate( Map... subMaps ) {
Map ret = new HashMap() ;
for( Map m : subMaps ) {
ret.putAll( m ) ;
}
return ret ;
}

@Override
public boolean hasNext() {
if( !initialised ) {
Expand Down
8 changes: 3 additions & 5 deletions src/main/groovy/groovy/stream/MapStream.java
Expand Up @@ -56,7 +56,7 @@ private T cloneMap( Map m ) {
@Override
public T next() {
T ret = cloneMap( (Map)current ) ;
transform.setDelegate( generateMapDelegate( using, (Map)current ) ) ;
transform.setDelegate( delegate.integrateCurrent( (Map)ret ) ) ;
loadNext() ;
this.streamIndex++ ;
return transform.call( ret ) ;
Expand All @@ -75,22 +75,20 @@ private T getFirst() {
@SuppressWarnings("unchecked")
protected void loadNext() {
while( !exhausted ) {
this.unfilteredIndex++ ;
if( current == null ) {
current = getFirst() ;
this.unfilteredIndex++ ;
}
else {
for( int i = keys.size() - 1 ; i >= 0 ; i-- ) {
String key = keys.get( i ) ;
if( iterators.get( key ).hasNext() ) {
((Map)current).put( key, iterators.get( key ).next() ) ;
this.unfilteredIndex++ ;
break ;
}
else if( i > 0 ) {
iterators.put( key, initial.get( key ).iterator() ) ;
((Map)current).put( key, iterators.get( key ).next() ) ;
this.unfilteredIndex++ ;
}
else {
exhausted = true ;
Expand All @@ -101,7 +99,7 @@ else if( i > 0 ) {
exhausted = true ;
break ;
}
condition.setDelegate( generateMapDelegate( using, stopDelegate, (Map)current ) ) ;
condition.setDelegate( delegate.integrateCurrent( (Map)current ) ) ;
Object cond = condition.call( current ) ;
if( cond == StreamStopper.getInstance() ) {
exhausted = true ;
Expand Down
4 changes: 1 addition & 3 deletions src/main/groovy/groovy/stream/StreamImpl.java
Expand Up @@ -87,14 +87,13 @@ public T next() {
@Override
protected void loadNext() {
while( !exhausted ) {
this.unfilteredIndex++ ;
if( current == null && iterator.hasNext() ) {
current = iterator.next() ;
this.unfilteredIndex++ ;
}
else {
if( iterator.hasNext() ) {
current = iterator.next() ;
this.unfilteredIndex++ ;
}
else {
exhausted = true ;
Expand All @@ -104,7 +103,6 @@ protected void loadNext() {
exhausted = true ;
break ;
}
condition.setDelegate( generateMapDelegate( using, stopDelegate ) ) ;
Object cond = condition.call( current ) ;
if( cond == StreamStopper.getInstance() ) {
exhausted = true ;
Expand Down
33 changes: 33 additions & 0 deletions src/test/groovy/groovy/stream/DelegateTests.groovy
@@ -0,0 +1,33 @@
package groovy.stream

public class DelegateTests extends spock.lang.Specification {
def "unfilteredIndex map test"() {
setup:
def stream = Stream.from 1..4 map { it * unfilteredIndex }
when:
def result = stream.collect()
then:
"unfilteredIndex is incremented BEFORE map"
result == [ 1, 4, 9, 16 ]
}

def "unfilteredIndex filter test"() {
setup:
def stream = Stream.from 1..4 filter { 2 != unfilteredIndex }
when:
def result = stream.collect()
then:
"unfilteredIndex is incremented AFTER filter"
result == [ 1, 2, 4 ]
}

def "unfilteredIndex until test"() {
setup:
def stream = Stream.from 1..4 until { unfilteredIndex == 2 }
when:
def result = stream.collect()
then:
"unfilteredIndex is incremented BEFORE until"
result == [ 1, 2 ]
}
}
2 changes: 1 addition & 1 deletion src/test/groovy/groovy/stream/MapTests.groovy
Expand Up @@ -25,7 +25,7 @@ public class MapTests extends spock.lang.Specification {

def "Map with transformation"() {
setup:
def stream = Stream.from x:1..2, y:1..2 map { x + y }
def stream = Stream.from x:1..2, y:1..2 map { println "$x $y" ; x + y }

when:
def result = stream.collect()
Expand Down
12 changes: 6 additions & 6 deletions src/test/groovy/groovy/stream/RangeStreamTests.groovy
Expand Up @@ -13,7 +13,7 @@ public class RangeStreamTests extends spock.lang.Specification {
result.size() == range.size()
stream.exhausted
stream.streamIndex == range.size() - 1
stream.unfilteredIndex == range.size() - 1
stream.unfilteredIndex == range.size()
}

def "Limited Range usage"() {
Expand All @@ -30,7 +30,7 @@ public class RangeStreamTests extends spock.lang.Specification {
result == limit
stream.exhausted
stream.streamIndex == limit.size() - 1
stream.unfilteredIndex == range.size() - 1
stream.unfilteredIndex == range.size()
}

def "Transformed Range usage"() {
Expand All @@ -46,7 +46,7 @@ public class RangeStreamTests extends spock.lang.Specification {
result == range*.multiply( 2 )
stream.exhausted
stream.streamIndex == range.size() - 1
stream.unfilteredIndex == range.size() - 1
stream.unfilteredIndex == range.size()
}

def "Transformed, Even Range usage"() {
Expand All @@ -62,7 +62,7 @@ public class RangeStreamTests extends spock.lang.Specification {
result == expected
stream.exhausted
stream.streamIndex == expected.size() - 1
stream.unfilteredIndex == range.size() - 1
stream.unfilteredIndex == range.size()
}

def "Range with local variables"() {
Expand All @@ -77,7 +77,7 @@ public class RangeStreamTests extends spock.lang.Specification {
result == 1..upper
stream.exhausted
stream.streamIndex == upper - 1
stream.unfilteredIndex == upper - 1
stream.unfilteredIndex == upper
}

def "The kitchen sink"() {
Expand All @@ -94,7 +94,7 @@ public class RangeStreamTests extends spock.lang.Specification {
result == expected
stream.exhausted
stream.streamIndex == expected.size() - 1
stream.unfilteredIndex == upper - 1
stream.unfilteredIndex == upper
}

def "Ranged index access"() {
Expand Down

0 comments on commit 0b3ee32

Please sign in to comment.