Skip to content

Commit

Permalink
Merge pull request #150 from tsegismont/issue/149
Browse files Browse the repository at this point in the history
Allow MongoIterableStream handlers to be set to null after the stream is closed
  • Loading branch information
karianna committed Jun 4, 2018
2 parents f9a2785 + e529d81 commit d48cd70
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
Expand Up @@ -41,7 +41,6 @@ class MongoIterableStream implements ReadStream<JsonObject> {

@Override
public synchronized MongoIterableStream exceptionHandler(Handler<Throwable> handler) {
checkClosed();
this.exceptionHandler = handler;
return this;
}
Expand All @@ -55,11 +54,9 @@ private void checkClosed() {

@Override
public synchronized MongoIterableStream handler(Handler<JsonObject> handler) {
checkClosed();
if (handler == null) {
if ((dataHandler = handler) == null) {
close();
} else {
dataHandler = handler;
SingleResultCallback<AsyncBatchCursor<JsonObject>> callback = (result, t) -> {
context.runOnContext(v -> {
synchronized (this) {
Expand Down Expand Up @@ -177,6 +174,9 @@ public synchronized MongoIterableStream endHandler(Handler<Void> handler) {

// Always called from a synchronized method or block
private void close() {
if (closed) {
return;
}
closed = true;
AtomicReference<AsyncBatchCursor> cursorRef = new AtomicReference<>();
context.executeBlocking(fut -> {
Expand Down
15 changes: 11 additions & 4 deletions src/test/java/io/vertx/ext/mongo/MongoClientTest.java
Expand Up @@ -3,14 +3,13 @@
import com.mongodb.async.client.MongoClients;
import com.mongodb.async.client.MongoDatabase;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.mongo.impl.codec.json.JsonObjectCodec;
import org.junit.After;
import org.junit.Before;
import io.vertx.core.streams.ReadStream;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -47,12 +46,17 @@ public void tearDown() throws Exception {
public void testFindBatch() throws Exception {
int numDocs = 3000;

AtomicReference<ReadStream<JsonObject>> streamReference = new AtomicReference<>();

String collection = randomCollection();
CountDownLatch latch = new CountDownLatch(1);
List<String> foos = new ArrayList<>();
mongoClient.createCollection(collection, onSuccess(res -> {
insertDocs(mongoClient, collection, numDocs, onSuccess(res2 -> {
mongoClient.findBatchWithOptions(collection, new JsonObject(), new FindOptions().setSort(new JsonObject().put("foo", 1)))
FindOptions findOptions = new FindOptions().setSort(new JsonObject().put("foo", 1));
ReadStream<JsonObject> stream = mongoClient.findBatchWithOptions(collection, new JsonObject(), findOptions);
streamReference.set(stream);
stream
.exceptionHandler(this::fail)
.endHandler(v -> latch.countDown())
.handler(result -> foos.add(result.getString("foo")));
Expand All @@ -62,6 +66,9 @@ public void testFindBatch() throws Exception {
assertEquals(numDocs, foos.size());
assertEquals("bar0", foos.get(0));
assertEquals("bar999", foos.get(numDocs - 1));

// Make sure stream handlers can be set to null after closing
streamReference.get().handler(null).exceptionHandler(null).endHandler(null);
}


Expand Down

0 comments on commit d48cd70

Please sign in to comment.