Skip to content

Commit

Permalink
Make sure when taking just the first row, that we unsubscribe immedia…
Browse files Browse the repository at this point in the history
…tely so that we don't hold ono to a connection
  • Loading branch information
rchodava committed Oct 5, 2016
1 parent 0d29261 commit 841b39e
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import foundation.stack.datamill.configuration.Named;
import foundation.stack.datamill.db.impl.QueryBuilderImpl;
import foundation.stack.datamill.db.impl.RowImpl;
import foundation.stack.datamill.db.impl.UnsubscribeOnCompletedOperator;
import org.flywaydb.core.Flyway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -202,8 +203,8 @@ public <T> Observable<List<T>> getAs(Func1<Row, T> transformer) {
}

@Override
public <T> Observable<T> getFirstAs(Func1<Row, T> transformer) {
return stream().map(transformer).first();
public <T> Observable<T> firstAs(Func1<Row, T> transformer) {
return stream().map(transformer).take(1).lift(new UnsubscribeOnCompletedOperator<>());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@
*/
public interface ResultBuilder {
<T> Observable<List<T>> getAs(Func1<Row, T> transformer);
<T> Observable<T> getFirstAs(Func1<Row, T> transformer);
<T> Observable<T> firstAs(Func1<Row, T> transformer);
Observable<Row> stream();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package foundation.stack.datamill.db.impl;

import rx.Observable;
import rx.Subscriber;
import rx.exceptions.Exceptions;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class UnsubscribeOnCompletedOperator<T> implements Observable.Operator<T, T> {
private final AtomicBoolean completed = new AtomicBoolean();

@Override
public Subscriber<? super T> call(Subscriber<? super T> subscriber) {
return new Subscriber<T>() {
@Override
public void onCompleted() {
this.unsubscribe();

if (!subscriber.isUnsubscribed()) {
if (completed.compareAndSet(false, true)) {
subscriber.onCompleted();
}
}
}

@Override
public void onError(Throwable e) {
this.unsubscribe();

if (!subscriber.isUnsubscribed()) {
if (completed.compareAndSet(false, true)) {
subscriber.onError(e);
}
}
}

@Override
public void onNext(T t) {
this.unsubscribe();

if (!subscriber.isUnsubscribed()) {
if (completed.compareAndSet(false, true)) {
try {
subscriber.onNext(t);
subscriber.onCompleted();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
subscriber.onError(e);
}
}
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import foundation.stack.datamill.db.ResultBuilder;
import foundation.stack.datamill.db.Row;
import foundation.stack.datamill.db.UpdateQueryExecution;
import foundation.stack.datamill.db.impl.UnsubscribeOnCompletedOperator;
import rx.Observable;
import rx.functions.Func1;

Expand Down Expand Up @@ -57,8 +58,8 @@ public <T> Observable<List<T>> getAs(Func1<Row, T> transformer) {
}

@Override
public <T> Observable<T> getFirstAs(Func1<Row, T> transformer) {
return stream().map(transformer).first();
public <T> Observable<T> firstAs(Func1<Row, T> transformer) {
return stream().map(transformer).take(1).lift(new UnsubscribeOnCompletedOperator<>());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.google.common.collect.ImmutableMap;
import foundation.stack.datamill.db.ResultBuilder;
import foundation.stack.datamill.db.Row;
import foundation.stack.datamill.db.UpdateQueryExecution;
import foundation.stack.datamill.reflection.Outline;
import foundation.stack.datamill.reflection.OutlineBuilder;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package foundation.stack.datamill.db.impl;

import org.junit.Test;
import rx.Observable;

import static org.junit.Assert.assertEquals;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class UnsubscribeOnCompletedOperatorTest {
@Test
public void operator() {
int[] unsubscribed = new int[] { 1 };
int result = Observable.just(1, 2, 3)
.doOnUnsubscribe(() -> unsubscribed[0] = unsubscribed[0] + 10)
.lift(new UnsubscribeOnCompletedOperator<>())
.doOnUnsubscribe(() -> unsubscribed[0] = unsubscribed[0] * 2)
.toBlocking()
.lastOrDefault(0);
assertEquals(1, result);
assertEquals(22, unsubscribed[0]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void rowBuilding() {
client.select(testModelOutline.member(m -> m.getProperty()))
.from(testModelOutline)
.all()
.getFirstAs(row -> testModelOutline.wrap(new TestModel())
.firstAs(row -> testModelOutline.wrap(new TestModel())
.set(m -> m.getProperty(), row.column(testModelOutline.member(m -> m.getProperty())))
.set(m -> m.getStringProperty(), row.column(testModelOutline.member(m -> m.getStringProperty())))
.unwrap())
Expand Down

0 comments on commit 841b39e

Please sign in to comment.