Skip to content

Commit

Permalink
Operation: Join
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Nov 21, 2013
1 parent d66fa11 commit e20f4fb
Show file tree
Hide file tree
Showing 3 changed files with 609 additions and 1 deletion.
23 changes: 22 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import rx.operators.OperationFirstOrDefault;
import rx.operators.OperationGroupBy;
import rx.operators.OperationInterval;
import rx.operators.OperationJoin;
import rx.operators.OperationLast;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
Expand Down Expand Up @@ -5662,5 +5663,25 @@ private boolean isInternalImplementation(Object o) {
return isInternal;
}
}

/**
* Correlates the elements of two sequences based on overlapping durations.
* @param right The right observable sequence to join elements for.
* @param leftDurationSelector A function to select the duration of each
* element of this observable sequence, used to
* determine overlap.
* @param rightDurationSelector A function to select the duration of each
* element of the right observable sequence,
* used to determine overlap.
* @param resultSelector A function invoked to compute a result element
* for any two overlapping elements of the left and
* right observable sequences.
* @return An observable sequence that contains result elements computed
* from source elements that have an overlapping duration.
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229750.aspx'>MSDN: Observable.Join</a>
*/
public <TRight, TLeftDuration, TRightDuration, R> Observable<R> join(Observable<TRight> right, Func1<T, Observable<TLeftDuration>> leftDurationSelector,
Func1<TRight, Observable<TRightDuration>> rightDurationSelector,
Func2<T, TRight, R> resultSelector) {
return create(new OperationJoin<T, TRight, TLeftDuration, TRightDuration, R>(this, right, leftDurationSelector, rightDurationSelector, resultSelector));
}
}
277 changes: 277 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationJoin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import java.util.HashMap;
import java.util.Map;
import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

/**
* Correlates the elements of two sequences based on overlapping durations.
*/
public class OperationJoin<TLeft, TRight, TLeftDuration, TRightDuration, R> implements OnSubscribeFunc<R> {
final Observable<TLeft> left;
final Observable<TRight> right;
final Func1<TLeft, Observable<TLeftDuration>> leftDurationSelector;
final Func1<TRight, Observable<TRightDuration>> rightDurationSelector;
final Func2<TLeft, TRight, R> resultSelector;
public OperationJoin(
Observable<TLeft> left,
Observable<TRight> right,
Func1<TLeft, Observable<TLeftDuration>> leftDurationSelector,
Func1<TRight, Observable<TRightDuration>> rightDurationSelector,
Func2<TLeft, TRight, R> resultSelector) {
this.left = left;
this.right = right;
this.leftDurationSelector = leftDurationSelector;
this.rightDurationSelector = rightDurationSelector;
this.resultSelector = resultSelector;
}

@Override
public Subscription onSubscribe(Observer<? super R> t1) {
SerialSubscription cancel = new SerialSubscription();
ResultSink result = new ResultSink(t1, cancel);
cancel.setSubscription(result.run());
return cancel;
}
/** Manage the left and right sources. */
class ResultSink {
final Object gate = new Object();
final CompositeSubscription group = new CompositeSubscription();
boolean leftDone;
int leftId;
final Map<Integer, TLeft> leftMap = new HashMap<Integer, TLeft>();
boolean rightDone;
int rightId;
final Map<Integer, TRight> rightMap = new HashMap<Integer, TRight>();
final Observer<? super R> observer;
final Subscription cancel;
public ResultSink(Observer<? super R> observer, Subscription cancel) {
this.observer = observer;
this.cancel = cancel;
}
public Subscription run() {
SerialSubscription leftCancel = new SerialSubscription();
SerialSubscription rightCancel = new SerialSubscription();

group.add(leftCancel);
group.add(rightCancel);

leftCancel.setSubscription(left.subscribe(new LeftObserver(leftCancel)));
rightCancel.setSubscription(right.subscribe(new RightObserver(rightCancel)));

return group;
}
/** Observes the left values. */
class LeftObserver implements Observer<TLeft> {
final Subscription self;
public LeftObserver(Subscription self) {
this.self = self;
}
protected void expire(int id, Subscription resource) {
synchronized (gate) {
if (leftMap.remove(id) != null && leftMap.isEmpty() && leftDone) {
observer.onCompleted();
cancel.unsubscribe();
}
}
group.remove(resource);
}
@Override
public void onNext(TLeft args) {
int id;
synchronized (gate) {
id = leftId++;
leftMap.put(id, args);
}
SerialSubscription md = new SerialSubscription();
group.add(md);

Observable<TLeftDuration> duration;
try {
duration = leftDurationSelector.call(args);
} catch (Throwable t) {
observer.onError(t);
cancel.unsubscribe();
return;
}

md.setSubscription(duration.subscribe(new LeftDurationObserver(id, md)));

synchronized (gate) {
for (TRight r : rightMap.values()) {
R result;
try {
result = resultSelector.call(args, r);
} catch (Throwable t) {
observer.onError(t);
cancel.unsubscribe();
return;
}
observer.onNext(result);
}
}
}
@Override
public void onError(Throwable e) {
synchronized (gate) {
observer.onError(e);
cancel.unsubscribe();
}
}
@Override
public void onCompleted() {
synchronized (gate) {
leftDone = true;
if (rightDone || leftMap.isEmpty()) {
observer.onCompleted();
cancel.unsubscribe();
} else {
self.unsubscribe();
}
}
}
/** Observes the left duration. */
class LeftDurationObserver implements Observer<TLeftDuration> {
final int id;
final Subscription handle;
public LeftDurationObserver(int id, Subscription handle) {
this.id = id;
this.handle = handle;
}

@Override
public void onNext(TLeftDuration args) {
expire(id, handle);
}

@Override
public void onError(Throwable e) {
LeftObserver.this.onError(e);
}

@Override
public void onCompleted() {
expire(id, handle);
}

}
}
/** Observes the right values. */
class RightObserver implements Observer<TRight> {
final Subscription self;
public RightObserver(Subscription self) {
this.self = self;
}
void expire(int id, Subscription resource) {
synchronized (gate) {
if (rightMap.remove(id) != null && rightMap.isEmpty() && rightDone) {
observer.onCompleted();
cancel.unsubscribe();
}
}
group.remove(resource);
}
@Override
public void onNext(TRight args) {
int id = 0;
synchronized (gate) {
id = rightId++;
rightMap.put(id, args);
}
SerialSubscription md = new SerialSubscription();
group.add(md);

Observable<TRightDuration> duration;
try {
duration = rightDurationSelector.call(args);
} catch (Throwable t) {
observer.onError(t);
cancel.unsubscribe();
return;
}

md.setSubscription(duration.subscribe(new RightDurationObserver(id, md)));

synchronized (gate) {
for (TLeft lv : leftMap.values()) {
R result;
try {
result = resultSelector.call(lv, args);
} catch (Throwable t) {
observer.onError(t);
cancel.unsubscribe();
return;
}
observer.onNext(result);
}
}
}
@Override
public void onError(Throwable e) {
synchronized (gate) {
observer.onError(e);
cancel.unsubscribe();
}
}
@Override
public void onCompleted() {
synchronized (gate) {
rightDone = true;
if (leftDone || rightMap.isEmpty()) {
observer.onCompleted();
cancel.unsubscribe();
} else {
self.unsubscribe();
}
}
}
/** Observe the right duration. */
class RightDurationObserver implements Observer<TRightDuration> {
final int id;
final Subscription handle;
public RightDurationObserver(int id, Subscription handle) {
this.id = id;
this.handle = handle;
}

@Override
public void onNext(TRightDuration args) {
expire(id, handle);
}

@Override
public void onError(Throwable e) {
RightObserver.this.onError(e);
}

@Override
public void onCompleted() {
expire(id, handle);
}

}
}
}
}
Loading

0 comments on commit e20f4fb

Please sign in to comment.