Skip to content

Commit

Permalink
Added Observables - a way to convert callback base operations into pu…
Browse files Browse the repository at this point in the history
…sh based ones.

Added Observable, Subscription and Observer interfaces.
Added Observables factory that converts callback based operations into observable ones.

JAVA-1805
  • Loading branch information
rozza committed May 15, 2015
1 parent 7672547 commit a6bf6d2
Show file tree
Hide file tree
Showing 14 changed files with 2,026 additions and 0 deletions.
149 changes: 149 additions & 0 deletions docs/reference/content/driver-async/reference/observables.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
+++
date = "2015-05-14T08:01:00-00:00"
title = "Observables"
[menu.main]
parent = "Async Reference"
identifier = "Observables"
weight = 90
pre = "<i class='fa'></i>"
+++

## Observables

The MongoDB Async Driver is fully callback based and makes extensive use of [`SingleResultCallback<T>`]({{< apiref "com/mongodb/async/client/SingleResultCallback" >}}) to achieve this. The `SingleResultCallback<T>` interface requires the implementation of a single method `onResult(T result, Throwable t)` which is called once the operation has completed or errored. The `result` parameter contains the result of the operation if successful. If the operation failed for any reason then the `t` contains the `Throwable` reason for the failure. This pattern allows the users application logic to be deferred until the underlying network IO to MongoDB has been completed.

The callback pattern is extremely flexible but can quickly become unwieldy if the application logic requires a chain of operations. Nesting of callbacks can make code harder to read and give the appearance of making the codebase more complex that it actually is. To help with this we also have released two observable based asynchronous drivers:

1. [MongoDB Reactive Streams Driver](http://mongodb.github.io/mongo-java-driver-reactivestreams/)
2. [MongoDB RxJava Driver](http://mongodb.github.io/mongo-java-driver-rx/)

These observable drivers follow similar patterns that split the logic into `onNext`, `onError` and `onComplete(d)` methods. These methods split out the logic used by `SingleResultCallback<T>.onResult(T result, Throwable t)` into individual components that can make the code easier to reason with.

The MongoDB Async Driver provides a factory and interfaces that do the heavy lifting of converting callback based operations into an observable operations. There are three interfaces [`Observable`]({{< apiref "com/mongodb/async/client/Observable" >}}), [`Subscription`]({{< apiref "com/mongodb/async/client/Subscription" >}}) and [`Observer`]({{< apiref "com/mongodb/async/client/Observer" >}}). The [`Observables`]({{< apiref "com/mongodb/async/client/Observables" >}}) helpers convert any callback based operations into observable operations.

{{% note %}}
The interfaces are the similar to `Publisher`, `Subscription` and `Subscriber` interfaces from the [reactive streams](http://www.reactive-streams.org/) JVM implementation. However, we prefer the name `Observerable` to `Publisher` and `Observer` to `Subscriber` for readability purposes.
{{% /note %}}

## Observable
The [`Observable`]({{< apiref "com/mongodb/async/client/Observable" >}}) represents a MongoDB operation which emits its results to the `Observer` based on demand requested by the `Subscription` to the `Observable`.

## Subscription

A [`Subscription`]({{< apiref "com/mongodb/async/client/Subscription" >}}) represents a one-to-one lifecycle of an `Observer` subscribing to an `Observable`. A `Subscription` to an `Observable` can only be used by a single `Observer`. The purpose of a `Subscription` is to control demand and to allow unsubscribing from the `Observable`.

## Observer

An [`Observer`]({{< apiref "com/mongodb/async/client/Observer" >}}) provides the mechanism for receiving push-based notifications from the `Observable`. Demand for these events is signalled by its `Subscription`. On subscription to an `Observable` the `Observer` will be passed the `Subscription` via the `onSubscribe(Subscription subscription)`.
Demand for results is signaled via the `Subscription` and any results are passed to the `onNext(TResult result)` method. If there is an error for any reason the `onError(Throwable e)` will be called and no more events passed to the `Observer`. Alternatively, when the `Observer` has consumed all the results from the `Observable` the `onComplete()` method is called.

## Wrapping a MongoIterable

With the [`Observables`]({{< apiref "com/mongodb/async/client/Observables" >}}) factory creating an `Observable` from a `MongoIterable` is simple.

In the following example we iterate and print out all json forms of documents in a collection:

```java
Observables.observe(collection.find()).subscribe(new Observer<Document>(){
@Override
void onSubscribe(final Subscription subscription) {
System.out.println("Subscribed and requesting all documents");
subscription.request(Long.MAX_VALUE);
}

@Override
void onNext(final Document document) {
System.out.println(document.toJson());
}

@Override
void onError(final Throwable e) {
System.out.println("There was an error: " + e.getMessage());
}

@Override
void onComplete() {
System.out.println("Finished iterating all documents");
}
});
```

## Wrapping a callback based method

Creating an `Observable` from any callback based methods requires the wrapping of the method inside a [`Block`]({{< apiref "com/mongodb/Block" >}}). This allows the execution of the operation to be delayed, until demand is request by the `Subscription`. The method *must* use the supplied callback to convert the results into an `Observable`.

In the following example we print out the count of the number of documents in a collection:


```java
Block<SingleResultCallback<Long>> operation = new Block<SingleResultCallback<Long>>() {
@Override
void apply(final SingleResultCallback<Long> callback) {
collection.count(callback);
}
};

// Or in Java 8 syntax:
operation = (Block<SingleResultCallback<Long>>) collection::count;

Observables.observe(operation).subscribe(new Observer<Long>(){
@Override
void onSubscribe(final Subscription subscription) {
System.out.println("Subscribed and requesting the count");
subscription.request(1);
}

@Override
void onNext(final Long count) {
System.out.println("The collection has " + count + " documents");
}

@Override
void onError(final Throwable e) {
System.out.println("There was an error: " + e.getMessage());
}

@Override
void onComplete() {
System.out.println("Finished");
}
});
```

## Back Pressure

In the following example, the `Subscription` is used to control demand when iterating an `Observable`. This is similar in concept to the `MongoIterable.forEach` method but allows demand-driven iteration:

```java
Observables.observe(collection.find()).subscribe(new Observer<Document>(){
private long batchSize = 10;
private long seen = 0;
private Subscription subscription;

@Override
void onSubscribe(final Subscription subscription) {
this.subscription = subscription;
subscription.request(batchSize);
}

@Override
void onNext(final Document document) {
System.out.println(document.toJson());
seen += 1;
if (seen == batchSize) {
seen = 0;
subscription.request(batchSize);
}
}

@Override
void onError(final Throwable e) {
System.out.println("There was an error: " + e.getMessage());
}

@Override
void onComplete() {
System.out.println("Finished iterating all documents");
}
});
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Copyright 2015 MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.async.client;

import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;

abstract class AbstractSubscription<TResult> implements Subscription {

private final Observer<? super TResult> observer;

/* protected by `this` */
private boolean requestedData;
private boolean isProcessing;
private long requested = 0;
private boolean isUnsubscribed = false;
private boolean isTerminated = false;
/* protected by `this` */

private final ConcurrentLinkedQueue<TResult> resultsQueue = new ConcurrentLinkedQueue<TResult>();

public AbstractSubscription(final Observer<? super TResult> observer) {
this.observer = observer;
}

@Override
public void unsubscribe() {
boolean unsubscribe = false;

synchronized (this) {
if (!isUnsubscribed) {
unsubscribe = true;
isUnsubscribed = true;
isTerminated = true;
}
}

if (unsubscribe) {
postTerminate();
}
}

@Override
public boolean isUnsubscribed() {
return isUnsubscribed;
}

@Override
public void request(final long n) {
if (n < 1) {
throw new IllegalArgumentException("Number requested cannot be negative: " + n);
}

boolean requestData = false;
synchronized (this) {
if (requested + n < 1) {
requested = Long.MAX_VALUE;
} else {
requested += n;
}
if (!requestedData) {
requestedData = true;
requestData = true;
}
}

if (requestData) {
requestInitialData();
} else {
processResultsQueue();
}
}

abstract void requestInitialData();

void requestMoreData() {
}

void postTerminate() {
}

boolean checkCompleted() {
return true;
}

boolean isTerminated() {
return isTerminated;
}

long getRequested() {
return requested;
}

void addToQueue(final TResult result) {
if (result != null) {
resultsQueue.add(result);
}
processResultsQueue();
}

void addToQueue(final List<TResult> results) {
if (results != null) {
resultsQueue.addAll(results);
}
processResultsQueue();
}

void onError(final Throwable t) {
if (terminalAction()) {
postTerminate();
observer.onError(t);
}
}

void onNext(final TResult next) {
boolean isTerminated = false;
synchronized (this) {
isTerminated = this.isTerminated;
}

if (!isTerminated) {
try {
observer.onNext(next);
} catch (Throwable t) {
onError(t);
}
}
}

void onComplete() {
if (terminalAction()) {
postTerminate();
observer.onComplete();
}
}

private void processResultsQueue() {
boolean mustProcess = false;

synchronized (this) {
if (!isProcessing && !isTerminated) {
isProcessing = true;
mustProcess = true;
}
}

if (mustProcess) {
boolean requestMore = false;

long processedCount = 0;
boolean completed = false;
while (true) {
long localWanted = 0;

synchronized (this) {
requested -= processedCount;
if (resultsQueue.isEmpty()) {
completed = checkCompleted();
requestMore = requested > 0;
isProcessing = false;
break;
} else if (requested == 0) {
isProcessing = false;
break;
}
localWanted = requested;
}
processedCount = 0;

while (localWanted > 0) {
TResult item = resultsQueue.poll();
if (item == null) {
break;
} else {
onNext(item);
localWanted -= 1;
processedCount += 1;
}
}
}

if (completed) {
onComplete();
} else if (requestMore) {
requestMoreData();
}
}
}

private boolean terminalAction() {
boolean isTerminal = false;
synchronized (this) {
if (!isTerminated) {
isTerminated = true;
isTerminal = true;
}
}
return isTerminal;
}

}
Loading

0 comments on commit a6bf6d2

Please sign in to comment.