Skip to content

Commit

Permalink
Merge pull request ReactiveX#343 from benjchristensen/covariant-support
Browse files Browse the repository at this point in the history
Covariant Support with super/extends and OnSubscribeFunc
  • Loading branch information
benjchristensen committed Sep 4, 2013
2 parents 2d58f7c + 9215afa commit 65ac7c2
Show file tree
Hide file tree
Showing 84 changed files with 1,521 additions and 1,029 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.groovy;

import groovy.lang.Closure;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;

/**
* Concrete wrapper that accepts a {@link Closure} and produces a {@link OnSubscribeFunc}.
*
* @param <T>
*/
public class GroovyOnSubscribeFuncWrapper<T> implements OnSubscribeFunc<T> {

private final Closure<Subscription> closure;

public GroovyOnSubscribeFuncWrapper(Closure<Subscription> closure) {
this.closure = closure;
}

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
return closure.call(observer);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.codehaus.groovy.runtime.metaclass.MetaClassRegistryImpl;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.observables.BlockingObservable;
import rx.util.functions.Action;
import rx.util.functions.Function;
Expand All @@ -52,33 +53,6 @@ public RxGroovyExtensionModule() {
super("RxGroovyExtensionModule", "1.0");
}

/**
* Keeping this code around a little while as it was hard to figure out ... and I'm still messing with it while debugging.
*
* Once the rest of this ExtensionModule stuff is working I'll delete this method.
*
* This is used for manually initializing rather than going via the org.codehaus.groovy.runtime.ExtensionModule properties file.
*/
public static void initializeManuallyForTesting() {
System.out.println("initialize");
MetaClassRegistryImpl mcRegistry = ((MetaClassRegistryImpl) GroovySystem.getMetaClassRegistry());
// RxGroovyExtensionModule em = new RxGroovyExtensionModule();

Properties p = new Properties();
p.setProperty("moduleFactory", "rx.lang.groovy.RxGroovyPropertiesModuleFactory");
Map<CachedClass, List<MetaMethod>> metaMethods = new HashMap<CachedClass, List<MetaMethod>>();
mcRegistry.registerExtensionModuleFromProperties(p, RxGroovyExtensionModule.class.getClassLoader(), metaMethods);

for (ExtensionModule m : mcRegistry.getModuleRegistry().getModules()) {
System.out.println("Module: " + m.getName());
}

for (CachedClass cc : metaMethods.keySet()) {
System.out.println("Adding MetaMethods to CachedClass: " + cc);
cc.addNewMopMethods(metaMethods.get(cc));
}
}

@SuppressWarnings("rawtypes")
@Override
public List<MetaMethod> getMetaMethods() {
Expand Down Expand Up @@ -135,6 +109,8 @@ public Object invoke(Object object, Object[] arguments) {
if (o instanceof Closure) {
if (Action.class.isAssignableFrom(m.getParameterTypes()[i])) {
newArgs[i] = new GroovyActionWrapper((Closure) o);
} else if(OnSubscribeFunc.class.isAssignableFrom(m.getParameterTypes()[i])) {
newArgs[i] = new GroovyOnSubscribeFuncWrapper((Closure) o);
} else {
newArgs[i] = new GroovyFunctionWrapper((Closure) o);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.mockito.MockitoAnnotations;

import rx.Notification;
import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.observables.GroupedObservable;
Expand Down Expand Up @@ -296,9 +297,9 @@ def class ObservableTests {
}


def class AsyncObservable implements Func1<Observer<Integer>, Subscription> {
def class AsyncObservable implements OnSubscribeFunc {

public Subscription call(final Observer<Integer> observer) {
public Subscription onSubscribe(final Observer<Integer> observer) {
new Thread(new Runnable() {
public void run() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ object RxImplicits {
import java.{ lang => jlang }
import language.implicitConversions

import rx.Observable
import rx.{ Observable, Observer, Subscription }
import rx.Observable.OnSubscribeFunc
import rx.observables.BlockingObservable
import rx.util.functions._

Expand Down Expand Up @@ -56,7 +57,7 @@ object RxImplicits {
}

/**
* Converts a function shaped ilke compareTo into the equivalent Rx Func2
* Converts a function shaped like compareTo into the equivalent Rx Func2
*/
implicit def convertComparisonFuncToRxFunc2[A](f: (A, A) => Int): Func2[A, A, jlang.Integer] =
new Func2[A, A, jlang.Integer] {
Expand Down Expand Up @@ -100,13 +101,18 @@ object RxImplicits {
def call(a: A, b: B, c: C, d: D) = f(a, b, c, d)
}

implicit def onSubscribeFunc[A](f: (Observer[_ >: A]) => Subscription): OnSubscribeFunc[A] =
new OnSubscribeFunc[A] {
override def onSubscribe(a: Observer[_ >: A]) = f(a)
}

/**
* This implicit class implements all of the methods necessary for including Observables in a
* for-comprehension. Note that return type is always Observable, so that the ScalaObservable
* type never escapes the for-comprehension
*/
implicit class ScalaObservable[A](wrapped: Observable[A]) {
def map[B](f: A => B): Observable[B] = wrapped.map(f)
def map[B](f: A => B): Observable[B] = wrapped.map[B](f)
def flatMap[B](f: A => Observable[B]): Observable[B] = wrapped.mapMany(f)
def foreach(f: A => Unit): Unit = wrapped.toBlockingObservable.forEach(f)
def withFilter(p: A => Boolean): WithFilter = new WithFilter(p)
Expand All @@ -131,7 +137,9 @@ class UnitTestSuite extends JUnitSuite {
import org.mockito.Mockito._
import org.mockito.{ MockitoAnnotations, Mock }
import rx.{ Notification, Observer, Observable, Subscription }
import rx.Observable.OnSubscribeFunc
import rx.observables.GroupedObservable
import rx.subscriptions.Subscriptions
import collection.mutable.ArrayBuffer
import collection.JavaConverters._

Expand All @@ -147,7 +155,7 @@ class UnitTestSuite extends JUnitSuite {
class ObservableWithException(s: Subscription, values: String*) extends Observable[String] {
var t: Thread = null

override def subscribe(observer: Observer[String]): Subscription = {
override def subscribe(observer: Observer[_ >: String]): Subscription = {
println("ObservableWithException subscribed to ...")
t = new Thread(new Runnable() {
override def run() {
Expand Down Expand Up @@ -175,7 +183,6 @@ class UnitTestSuite extends JUnitSuite {
}

// tests of static methods

@Test def testSingle {
assertEquals(1, Observable.from(1).toBlockingObservable.single)
}
Expand Down Expand Up @@ -208,6 +215,11 @@ class UnitTestSuite extends JUnitSuite {
case ex: Throwable => fail("Caught unexpected exception " + ex.getCause + ", expected IllegalStateException")
}
}

@Test def testCreateFromOnSubscribeFunc {
val created = Observable.create((o: Observer[_ >: Integer]) => Subscriptions.empty)
//no assertions on subscription, just testing the implicit
}

@Test def testFromJavaInterop {
val observable = Observable.from(List(1, 2, 3).asJava)
Expand Down Expand Up @@ -248,7 +260,7 @@ class UnitTestSuite extends JUnitSuite {

@Test def testFlattenMerge {
val observable = Observable.from(Observable.from(1, 2, 3))
val merged = Observable.merge(observable)
val merged = Observable.merge[Int](observable)
assertSubscribeReceives(merged)(1, 2, 3)
}

Expand All @@ -272,6 +284,18 @@ class UnitTestSuite extends JUnitSuite {
assertSubscribeReceives(synchronized)(1, 2, 3)
}

@Test def testZip2() {
val colors: Observable[String] = Observable.from("red", "green", "blue")
val names: Observable[String] = Observable.from("lion-o", "cheetara", "panthro")

case class Character(color: String, name: String)

val cheetara = Character("green", "cheetara")
val panthro = Character("blue", "panthro")
val characters = Observable.zip[String, String, Character](colors, names, Character.apply _)
assertSubscribeReceives(characters)(cheetara, panthro)
}

@Test def testZip3() {
val numbers = Observable.from(1, 2, 3)
val colors = Observable.from("red", "green", "blue")
Expand All @@ -283,7 +307,7 @@ class UnitTestSuite extends JUnitSuite {
val cheetara = Character(2, "green", "cheetara")
val panthro = Character(3, "blue", "panthro")

val characters = Observable.zip(numbers, colors, names, Character.apply _)
val characters = Observable.zip[Int, String, String, Character](numbers, colors, names, Character.apply _)
assertSubscribeReceives(characters)(liono, cheetara, panthro)
}

Expand All @@ -299,7 +323,7 @@ class UnitTestSuite extends JUnitSuite {
val cheetara = Character(2, "green", "cheetara", false)
val panthro = Character(3, "blue", "panthro", false)

val characters = Observable.zip(numbers, colors, names, isLeader, Character.apply _)
val characters = Observable.zip[Int, String, String, Boolean, Character](numbers, colors, names, isLeader, Character.apply _)
assertSubscribeReceives(characters)(liono, cheetara, panthro)
}

Expand Down Expand Up @@ -338,7 +362,8 @@ class UnitTestSuite extends JUnitSuite {
@Test def testMap {
val numbers = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9)
val mappedNumbers = ArrayBuffer.empty[Int]
numbers.map((x: Int) => x * x).subscribe((squareVal: Int) => {
val mapped: Observable[Int] = numbers map ((x: Int) => x * x)
mapped.subscribe((squareVal: Int) => {
mappedNumbers.append(squareVal)
})
assertEquals(List(1, 4, 9, 16, 25, 36, 49, 64, 81), mappedNumbers.toList)
Expand Down Expand Up @@ -458,18 +483,9 @@ class UnitTestSuite extends JUnitSuite {
assertSubscribeReceives(skipped)(3, 4)
}

/**
* Both testTake and testTakeWhileWithIndex exposed a bug with unsubscribes not properly propagating.
* observable.take(2) produces onNext(first), onNext(second), and 4 onCompleteds
* it should produce onNext(first), onNext(second), and 1 onCompleted
*
* Switching to Observable.create(OperationTake.take(observable, 2)) works as expected
*/
@Test def testTake {
import rx.operators._

val observable = Observable.from(1, 2, 3, 4, 5)
val took = Observable.create(OperationTake.take(observable, 2))
val took = observable.take(2)
assertSubscribeReceives(took)(1, 2)
}

Expand All @@ -479,11 +495,11 @@ class UnitTestSuite extends JUnitSuite {
assertSubscribeReceives(took)(1, 3, 5)
}

/*@Test def testTakeWhileWithIndex {
val observable = Observable.from(1, 3, 5, 6, 7, 9, 11, 12, 13, 15, 17)
val took = observable.takeWhileWithIndex((i: Int, idx: Int) => isOdd(i) && idx > 4)
assertSubscribeReceives(took)(9, 11)
}*/
@Test def testTakeWhileWithIndex {
val observable = Observable.from(1, 3, 5, 7, 9, 11, 12, 13, 15, 17)
val took = observable.takeWhileWithIndex((i: Int, idx: Int) => isOdd(i) && idx < 8)
assertSubscribeReceives(took)(1, 3, 5, 7, 9, 11)
}

@Test def testTakeLast {
val observable = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9)
Expand Down Expand Up @@ -559,7 +575,7 @@ class UnitTestSuite extends JUnitSuite {

@Test def testFilterInForComprehension {
val doubler = (i: Int) => Observable.from(i, i)
val filteredObservable = for {
val filteredObservable: Observable[Int] = for {
i: Int <- Observable.from(1, 2, 3, 4)
j: Int <- doubler(i) if isOdd(i)
} yield j
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package rx.android.concurrency;

import android.os.Handler;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.robolectric.RobolectricTestRunner;
import org.robolectric.annotation.Config;

import rx.Scheduler;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
Expand Down Expand Up @@ -39,7 +41,7 @@ public HandlerThreadScheduler(Handler handler) {
* See {@link #schedule(Object, rx.util.functions.Func2, long, java.util.concurrent.TimeUnit)}
*/
@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
return schedule(state, action, 0L, TimeUnit.MILLISECONDS);
}

Expand All @@ -56,7 +58,7 @@ public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscr
* @return A Subscription from which one can unsubscribe from.
*/
@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit) {
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
final Scheduler _scheduler = this;
handler.postDelayed(new Runnable() {
Expand All @@ -76,6 +78,7 @@ public static final class UnitTest {
public void shouldScheduleImmediateActionOnHandlerThread() {
final Handler handler = mock(Handler.class);
final Object state = new Object();
@SuppressWarnings("unchecked")
final Func2<Scheduler, Object, Subscription> action = mock(Func2.class);

Scheduler scheduler = new HandlerThreadScheduler(handler);
Expand All @@ -94,6 +97,7 @@ public void shouldScheduleImmediateActionOnHandlerThread() {
public void shouldScheduleDelayedActionOnHandlerThread() {
final Handler handler = mock(Handler.class);
final Object state = new Object();
@SuppressWarnings("unchecked")
final Func2<Scheduler, Object, Subscription> action = mock(Func2.class);

Scheduler scheduler = new HandlerThreadScheduler(handler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private SwingScheduler() {
}

@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
EventQueue.invokeLater(new Runnable() {
@Override
Expand All @@ -75,7 +75,7 @@ public void call() {
}

@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long dueTime, TimeUnit unit) {
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
long delay = unit.toMillis(dueTime);
assertThatTheDelayIsValidForTheSwingTimer(delay);
Expand Down Expand Up @@ -113,7 +113,7 @@ public void call() {
}

@Override
public <T> Subscription schedulePeriodically(T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
public <T> Subscription schedulePeriodically(T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
final AtomicReference<Timer> timer = new AtomicReference<Timer>();

final long delay = unit.toMillis(period);
Expand Down
Loading

0 comments on commit 65ac7c2

Please sign in to comment.