diff --git a/azure-documentdb-examples/pom.xml b/azure-documentdb-examples/pom.xml
new file mode 100644
index 000000000000..c4a3891f91ee
--- /dev/null
+++ b/azure-documentdb-examples/pom.xml
@@ -0,0 +1,97 @@
+
+ 4.0.0
+
+ com.microsoft.azure
+ azure-documentdb-examples
+ 0.0.1-SNAPSHOT
+ jar
+
+ azure-documentdb-examples
+ http://azure.microsoft.com/en-us/services/documentdb/
+
+
+ MIT License
+ http://www.opensource.org/licenses/mit-license.php
+
+
+
+
+ UTF-8
+ 1.7.6
+ 1.2.17
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.6.0
+
+ 1.8
+ 1.8
+
+
+
+ org.apache.maven.plugins
+ maven-eclipse-plugin
+ 2.8
+
+
+
+ org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8
+
+
+
+
+
+
+
+
+ com.microsoft.azure
+ azure-documentdb-rx
+ 0.9.0-SNAPSHOT
+
+
+ io.reactivex
+ rxjava-guava
+ 1.0.3
+
+
+ junit
+ junit
+ 4.12
+ test
+
+
+ org.mockito
+ mockito-core
+ 1.10.19
+ test
+
+
+ org.hamcrest
+ hamcrest-all
+ 1.3
+ test
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+ test
+
+
+ org.slf4j
+ slf4j-log4j12
+ ${slf4j.version}
+ test
+
+
+ log4j
+ log4j
+ ${log4j.version}
+ test
+
+
+
diff --git a/azure-documentdb-examples/src/test/java/com/microsoft/azure/documentdb/rx/examples/DatabaseAndCollectionCreationAsyncAPITest.java b/azure-documentdb-examples/src/test/java/com/microsoft/azure/documentdb/rx/examples/DatabaseAndCollectionCreationAsyncAPITest.java
new file mode 100644
index 000000000000..1ea1cf0a7140
--- /dev/null
+++ b/azure-documentdb-examples/src/test/java/com/microsoft/azure/documentdb/rx/examples/DatabaseAndCollectionCreationAsyncAPITest.java
@@ -0,0 +1,256 @@
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2016 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.documentdb.rx.examples;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.microsoft.azure.documentdb.ConnectionPolicy;
+import com.microsoft.azure.documentdb.ConsistencyLevel;
+import com.microsoft.azure.documentdb.Database;
+import com.microsoft.azure.documentdb.DocumentClientException;
+import com.microsoft.azure.documentdb.DocumentCollection;
+import com.microsoft.azure.documentdb.FeedResponsePage;
+import com.microsoft.azure.documentdb.ResourceResponse;
+import com.microsoft.azure.documentdb.SqlParameter;
+import com.microsoft.azure.documentdb.SqlParameterCollection;
+import com.microsoft.azure.documentdb.SqlQuerySpec;
+import com.microsoft.azure.documentdb.rx.AsyncDocumentClient;
+import com.microsoft.azure.documentdb.rx.examples.TestConfigurations;
+
+import rx.Observable;
+import rx.functions.Action1;
+import rx.observable.ListenableFutureObservable;
+
+/**
+ * This integration test class demonstrates how to use Async API to create,
+ * delete, replace, and update.
+ *
+ * NOTE: you can use rxJava based async api with java8 lambda expression. Using of
+ * rxJava based async APIs with java8 lambda expressions is much prettier.
+ *
+ * You can also use the async API without java8 lambda expression support.
+ *
+ * For example
+ *
+ *
{@link #testCreateDatabase_Async()} demonstrates how to use async api with
+ * java8 lambda expression.
+ *
+ *
{@link #testCreateDatabase_Async_withoutLambda()} demonstrates how to the same
+ * thing without lambda expression.
+ *
+ *
+ * Also if you need to work with Future or ListenableFuture it is possible to transform
+ * an observable to ListenableFuture. Please see {@link #testTransformObservableToGoogleGuavaListenableFuture()}
+ *
+ */
+public class DatabaseAndCollectionCreationAsyncAPITest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseAndCollectionCreationAsyncAPITest.class);
+
+ private static final String DATABASE_ID = "async-test-db";
+ private DocumentCollection collectionDefinition;
+ private Database databaseDefinition;
+
+ private AsyncDocumentClient asyncClient;
+
+ @Before
+ public void setUp() throws DocumentClientException {
+
+ asyncClient = new AsyncDocumentClient.Builder()
+ .withServiceEndpoint(TestConfigurations.HOST)
+ .withMasterKey(TestConfigurations.MASTER_KEY)
+ .withConnectionPolicy(ConnectionPolicy.GetDefault())
+ .withConsistencyLevel(ConsistencyLevel.Session)
+ .build();
+
+ // Clean up before setting up
+ this.cleanUpGeneratedDatabases();
+
+ databaseDefinition = new Database();
+ databaseDefinition.setId(DATABASE_ID);
+
+ collectionDefinition = new DocumentCollection();
+ collectionDefinition.setId(UUID.randomUUID().toString());
+ }
+
+ @After
+ public void shutdown() throws DocumentClientException {
+ asyncClient.close();
+ }
+
+ @Test
+ public void testCreateDatabase_Async() throws Exception {
+
+ // create a database using async api
+ // this test uses java8 lambda expression see testCreateDatabase_Async_withoutLambda
+
+ Observable> createDatabaseObservable = asyncClient
+ .createDatabase(databaseDefinition, null);
+
+ final CountDownLatch doneLatch = new CountDownLatch(1);
+
+ createDatabaseObservable
+ .single() // we know there is only single result
+ .subscribe(
+ databaseResourceResponse -> {
+ System.out.println(databaseResourceResponse.getActivityId());
+ doneLatch.countDown();
+ },
+
+ error -> {
+ System.err.println("an error happened in database creation: actual cause: " + error.getMessage());
+ }
+ );
+
+ // wait till database creation completes
+ doneLatch.await();
+ }
+
+ @Test
+ public void testCreateDatabase_Async_withoutLambda() throws Exception {
+
+ // create a database using async api
+
+ Observable> createDatabaseObservable = asyncClient
+ .createDatabase(databaseDefinition, null);
+
+ final CountDownLatch successfulCompletionLatch = new CountDownLatch(1);
+ Action1> onDatabaseCreationAction = new Action1>() {
+
+ @Override
+ public void call(ResourceResponse resourceResponse) {
+ // Database is created
+ System.out.println(resourceResponse.getActivityId());
+ successfulCompletionLatch.countDown();
+ }
+ };
+
+ Action1 onError = new Action1() {
+ @Override
+ public void call(Throwable error) {
+ System.err.println("an error happened in database creation: actual cause: " + error.getMessage());
+ }
+ };
+
+ createDatabaseObservable
+ .single() //we know there is only a single event
+ .subscribe(onDatabaseCreationAction, onError);
+
+ // wait till database creation completes
+ successfulCompletionLatch.await();
+ }
+
+
+ @Test
+ public void testCreateDatabase_toBlocking() throws DocumentClientException {
+
+ // create a database
+ // toBlocking() converts the observable to a blocking observable
+
+ Observable> createDatabaseObservable = asyncClient
+ .createDatabase(databaseDefinition, null);
+
+ // toBlocking() converts to a blocking observable
+ // single() gets the only result
+ createDatabaseObservable.toBlocking().single();
+ }
+
+ @Test
+ public void testCreateDatabase_toBlocking_DatabaseAlreadyExists_Fails() throws DocumentClientException {
+
+ // attempt to create a database which already exists
+ // - first create a database
+ // - Using the async api generate an async database creation observable
+ // - Converts the Observable to blocking using Observable.toBlocking() api
+ // - catch already exist failure (409)
+
+ asyncClient.createDatabase(databaseDefinition, null).toBlocking().single();
+
+ // Create the database for test.
+ Observable> databaseForTestObservable = asyncClient
+ .createDatabase(databaseDefinition, null);
+
+ try {
+ databaseForTestObservable
+ .toBlocking() //blocks
+ .single(); //gets the single result
+ assertThat("Should not reach here", false);
+ } catch (Exception e) {
+ assertThat("Database already exists.",
+ ((DocumentClientException) e.getCause()).getStatusCode(), equalTo(409));
+ }
+ }
+
+ @Test
+ public void testTransformObservableToGoogleGuavaListenableFuture() throws Exception {
+
+ // You can convert an Observable to a ListenableFuture.
+ // ListenableFuture (part of google guava library) is a popular extension
+ // of Java's Future which allows registering listener callbacks:
+ // https://github.com/google/guava/wiki/ListenableFutureExplained
+
+ Observable> createDatabaseObservable = asyncClient.createDatabase(databaseDefinition, null);
+ ListenableFuture> future = ListenableFutureObservable.to(createDatabaseObservable);
+
+ ResourceResponse rrd = future.get();
+
+ assertThat(rrd.getRequestCharge(), greaterThan((double) 0));
+ System.out.print(rrd.getRequestCharge());
+ }
+
+ private void cleanUpGeneratedDatabases() throws DocumentClientException {
+ LOGGER.info("cleanup databases invoked");
+
+ String[] allDatabaseIds = { DATABASE_ID };
+
+ for (String id : allDatabaseIds) {
+ try {
+ List> feedResponsePages = asyncClient
+ .queryDatabases(new SqlQuerySpec("SELECT * FROM root r WHERE r.id=@id",
+ new SqlParameterCollection(new SqlParameter("@id", id))), null).toList().toBlocking().single();
+
+
+ if (!feedResponsePages.get(0).getResults().isEmpty()) {
+ Database res = feedResponsePages.get(0).getResults().get(0);
+ LOGGER.info("deleting a database " + feedResponsePages.get(0));
+ asyncClient.deleteDatabase(res.getSelfLink(), null).toBlocking().single();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
diff --git a/azure-documentdb-examples/src/test/java/com/microsoft/azure/documentdb/rx/examples/DocumentCRUDAsyncAPITest.java b/azure-documentdb-examples/src/test/java/com/microsoft/azure/documentdb/rx/examples/DocumentCRUDAsyncAPITest.java
new file mode 100644
index 000000000000..fd9962c17117
--- /dev/null
+++ b/azure-documentdb-examples/src/test/java/com/microsoft/azure/documentdb/rx/examples/DocumentCRUDAsyncAPITest.java
@@ -0,0 +1,485 @@
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2016 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.documentdb.rx.examples;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.microsoft.azure.documentdb.ConnectionPolicy;
+import com.microsoft.azure.documentdb.ConsistencyLevel;
+import com.microsoft.azure.documentdb.Database;
+import com.microsoft.azure.documentdb.Document;
+import com.microsoft.azure.documentdb.DocumentClientException;
+import com.microsoft.azure.documentdb.DocumentCollection;
+import com.microsoft.azure.documentdb.FeedResponsePage;
+import com.microsoft.azure.documentdb.ResourceResponse;
+import com.microsoft.azure.documentdb.SqlParameter;
+import com.microsoft.azure.documentdb.SqlParameterCollection;
+import com.microsoft.azure.documentdb.SqlQuerySpec;
+import com.microsoft.azure.documentdb.rx.AsyncDocumentClient;
+import com.microsoft.azure.documentdb.rx.examples.TestConfigurations;
+
+import rx.Observable;
+import rx.functions.Action1;
+import rx.observable.ListenableFutureObservable;
+
+/**
+ * This integration test class demonstrates how to use Async API to create,
+ * delete, replace, and upsert. If you are interested in examples for querying
+ * for documents please see {@link DocumentQueryAsyncAPITest}
+ *
+ * NOTE: you can use rxJava based async api with java8 lambda expression. Using
+ * of rxJava based async APIs with java8 lambda expressions is much prettier.
+ *
+ * You can also use the async API without java8 lambda expression.
+ *
+ * For example
+ *
+ *
{@link #testCreateDocument_Async()} demonstrates how to use async api
+ * with java8 lambda expression.
+ *
+ *
{@link #testCreateDocument_Async_withoutLambda()} demonstrates how to the same
+ * thing without lambda expression.
+ *
+ *
+ * Also if you need to work with Future or ListenableFuture it is possible to
+ * transform an observable to ListenableFuture. Please see
+ * {@link #testTransformObservableToGoogleGuavaListenableFuture()}
+ *
+ */
+public class DocumentCRUDAsyncAPITest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DocumentCRUDAsyncAPITest.class);
+
+ private static final String DATABASE_ID = "async-test-db";
+
+ private AsyncDocumentClient asyncClient;
+ private DocumentCollection createdCollection;
+
+ @Before
+ public void setUp() throws DocumentClientException {
+
+ // sets up the requirements for each test
+
+ asyncClient = new AsyncDocumentClient.Builder()
+ .withServiceEndpoint(TestConfigurations.HOST)
+ .withMasterKey(TestConfigurations.MASTER_KEY)
+ .withConnectionPolicy(ConnectionPolicy.GetDefault())
+ .withConsistencyLevel(ConsistencyLevel.Session)
+ .build();
+ // Clean up the database.
+ this.cleanUpGeneratedDatabases();
+
+ Database databaseDefinition = new Database();
+ databaseDefinition.setId(DATABASE_ID);
+
+ DocumentCollection collectionDefinition = new DocumentCollection();
+ collectionDefinition.setId(UUID.randomUUID().toString());
+
+ // create database
+ ResourceResponse databaseCreationResponse = asyncClient.createDatabase(databaseDefinition, null)
+ .toBlocking().single();
+
+ // create collection
+ createdCollection = asyncClient
+ .createCollection(databaseCreationResponse.getResource().getSelfLink(), collectionDefinition, null)
+ .toBlocking().single().getResource();
+ }
+
+ @After
+ public void shutdown() throws DocumentClientException {
+ asyncClient.close();
+ }
+
+ @Test
+ public void testCreateDocument_Async() throws Exception {
+
+ // create a document
+ Document doc = new Document(String.format("{ 'id': 'doc%d', 'counter': '%d'}", 1, 1));
+ Observable> createDocumentObservable = asyncClient.createDocument(createdCollection.getSelfLink(), doc, null, true);
+
+ final CountDownLatch doneLatch = new CountDownLatch(1);
+
+ // subscribe to events emitted by the observable
+ createDocumentObservable
+ .single() // we know there will be one response
+ .subscribe(
+
+ documentResourceResponse -> {
+ System.out.println(documentResourceResponse.getActivityId());
+ doneLatch.countDown();
+ },
+
+ error -> {
+ System.err.println("an error happened in document creation: actual cause: " + error.getMessage());
+ });
+
+ // wait till document creation completes
+ doneLatch.await();
+ }
+
+ @Test
+ public void testCreateDocument_Async_withoutLambda() throws Exception {
+
+ // create a document in without java8 lambda expressions
+ Document doc = new Document(String.format("{ 'id': 'doc%d', 'counter': '%d'}", 1, 1));
+ Observable> createDocumentObservable = asyncClient.createDocument(createdCollection.getSelfLink(), doc, null, true);
+
+ final CountDownLatch doneLatch = new CountDownLatch(1);
+
+ Action1> onNext = new Action1>() {
+
+ @Override
+ public void call(ResourceResponse documentResourceResponse) {
+ System.out.println(documentResourceResponse.getActivityId());
+ doneLatch.countDown();
+ }
+ };
+
+ Action1 onError = new Action1() {
+
+ @Override
+ public void call(Throwable error) {
+ System.err.println("an error happened in document creation: actual cause: " + error.getMessage());
+ }
+ };
+
+ // subscribe to events emitted by the observable
+ createDocumentObservable
+ .single() // we know there will be one response
+ .subscribe(onNext, onError);
+
+ // wait till document creation completes
+ doneLatch.await();
+ }
+
+ @Test
+ public void testCreateDocument_toBlocking() throws DocumentClientException {
+
+ // create a document
+ // toBlocking() converts the observable to a blocking observable
+ Document doc = new Document(String.format("{ 'id': 'doc%d', 'counter': '%d'}", 1, 1));
+ Observable> createDocumentObservable =
+ asyncClient.createDocument(createdCollection.getSelfLink(), doc, null, true);
+
+
+ // toBlocking() converts to a blocking observable
+ // single() gets the only result
+ createDocumentObservable
+ .toBlocking() //converts the observable to a blocking observable
+ .single(); //gets the single result
+ }
+
+ @Test
+ public void testDocumentCreation_SumUpRequestCharge() throws Exception {
+
+ // create 10 documents and sum up all the documents creation request charges
+
+ // create 10 documents
+ List>> listOfCreateDocumentObservables = new ArrayList<>();
+ for(int i = 0; i < 10; i++) {
+ Document doc = new Document(String.format("{ 'id': 'doc%d', 'counter': '%d'}", i, i));
+
+ Observable> createDocumentObservable =
+ asyncClient.createDocument(createdCollection.getSelfLink(), doc, null, false);
+ listOfCreateDocumentObservables.add(createDocumentObservable);
+ }
+
+ // merge all document creation observables into one observable
+ Observable> mergedObservable = Observable.merge(listOfCreateDocumentObservables);
+
+ // create a new observable emitting the total charge of creating all 10 documents
+ Observable totalChargeObservable = mergedObservable
+ .map(ResourceResponse::getRequestCharge) //map to request charge
+ .reduce((totalCharge, charge) -> totalCharge + charge); //sum up all the charges
+
+ final CountDownLatch doneLatch = new CountDownLatch(1);
+
+ // subscribe to the total request charge observable
+ totalChargeObservable.subscribe(totalCharge -> {
+ // print the total charge
+ System.out.println(totalCharge);
+ doneLatch.countDown();
+ });
+
+ doneLatch.await();
+ }
+
+ @Test
+ public void testCreateDocument_toBlocking_DocumentAlreadyExists_Fails() throws DocumentClientException {
+
+ // attempt to create a document which already exists
+ // - first create a document
+ // - Using the async api generate an async document creation observable
+ // - Converts the Observable to blocking using Observable.toBlocking() api
+ // - catch already exist failure (409)
+ Document doc = new Document(String.format("{ 'id': 'doc%d', 'counter': '%d'}", 1, 1));
+ asyncClient.createDocument(createdCollection.getSelfLink(), doc, null, false).toBlocking().single();
+
+ // Create the document
+ Observable> createDocumentObservable = asyncClient
+ .createDocument(createdCollection.getSelfLink(), doc, null, false);
+
+ try {
+ createDocumentObservable
+ .toBlocking() //converts the observable to a blocking observable
+ .single(); //gets the single result
+ Assert.fail("Document Already Exists. Document Creation must fail");
+ } catch (Exception e) {
+ assertThat("Document already exists.",
+ ((DocumentClientException) e.getCause()).getStatusCode(), equalTo(409));
+ }
+ }
+
+ @Test
+ public void testCreateDocument_Async_DocumentAlreadyExists_Fails() throws Exception {
+
+ // attempt to create a document which already exists
+ // - first create a document
+ // - Using the async api generate an async document creation observable
+ // - Converts the Observable to blocking using Observable.toBlocking() api
+ // - catch already exist failure (409)
+ Document doc = new Document(String.format("{ 'id': 'doc%d', 'counter': '%d'}", 1, 1));
+ asyncClient.createDocument(createdCollection.getSelfLink(), doc, null, false).toBlocking().single();
+
+ // Create the document
+ Observable> createDocumentObservable = asyncClient
+ .createDocument(createdCollection.getSelfLink(), doc, null, false);
+
+ List errorList = Collections.synchronizedList(new ArrayList());
+
+ createDocumentObservable.subscribe(
+ resourceResponse -> {},
+
+ error -> {
+ errorList.add(error);
+ System.err.println("failed to create a document due to: " + error.getMessage());
+ }
+ );
+
+ Thread.sleep(2000);
+ assertThat(errorList, hasSize(1));
+ assertThat(errorList.get(0), is(instanceOf(DocumentClientException.class)));
+ assertThat(((DocumentClientException) errorList.get(0)).getStatusCode(), equalTo(409));
+ }
+
+ @Test
+ public void testDocumentReplace_Async() throws Exception {
+
+ // replace a document
+
+ // create a document
+ Document doc = new Document(String.format("{ 'id': 'doc%d', 'counter': '%d'}", 1, 1));
+ String documentLink = asyncClient.createDocument(createdCollection.getSelfLink(), doc, null, false).toBlocking().single().getResource().getSelfLink();
+
+ // try to replace the existing document
+ Document replacingDocument = new Document(String.format("{ 'id': 'doc%d', 'counter': '%d', 'new-prop' : '2'}", 1, 1));
+ Observable> replaceDocumentObservable = asyncClient
+ .replaceDocument(documentLink, replacingDocument, null);
+
+ List> capturedResponse = Collections.synchronizedList(new ArrayList>());
+
+ replaceDocumentObservable.subscribe(
+ resourceResponse -> {
+ capturedResponse.add(resourceResponse);
+ }
+
+ );
+
+ Thread.sleep(2000);
+
+ assertThat(capturedResponse, hasSize(1));
+ assertThat(capturedResponse.get(0).getResource().get("new-prop"), equalTo("2"));
+ }
+
+ @Test
+ public void testDocumentUpsert_Async() throws Exception {
+
+ // upsert a document
+
+ // create a document
+ Document doc = new Document(String.format("{ 'id': 'doc%d', 'counter': '%d'}", 1, 1));
+ asyncClient.createDocument(createdCollection.getSelfLink(), doc, null, false).toBlocking().single();
+
+ // upsert the existing document
+ Document upsertingDocument = new Document(String.format("{ 'id': 'doc%d', 'counter': '%d', 'new-prop' : '2'}", 1, 1));
+ Observable> upsertDocumentObservable = asyncClient
+ .upsertDocument(createdCollection.getSelfLink(), upsertingDocument, null, false);
+
+ List> capturedResponse = Collections.synchronizedList(new ArrayList>());
+
+ upsertDocumentObservable.subscribe(
+ resourceResponse -> {
+ capturedResponse.add(resourceResponse);
+ }
+
+ );
+
+ Thread.sleep(4000);
+
+ assertThat(capturedResponse, hasSize(1));
+ assertThat(capturedResponse.get(0).getResource().get("new-prop"), equalTo("2"));
+ }
+
+ @Test
+ public void testDocumentDelete_Async() throws Exception {
+
+ // delete a document
+
+ // create a document
+ Document doc = new Document(String.format("{ 'id': 'doc%d', 'counter': '%d'}", 1, 1));
+ String documentLink = asyncClient.createDocument(createdCollection.getSelfLink(), doc, null, false).toBlocking().single().getResource().getSelfLink();
+
+ // delete the existing document
+ Observable> deleteDocumentObservable = asyncClient
+ .deleteDocument(documentLink, null);
+
+ List> capturedResponse = Collections.synchronizedList(new ArrayList>());
+
+ deleteDocumentObservable.subscribe(
+ resourceResponse -> {
+ capturedResponse.add(resourceResponse);
+ }
+
+ );
+
+ Thread.sleep(2000);
+
+ assertThat(capturedResponse, hasSize(1));
+
+ // assert document is deleted
+ List listOfDocuments = asyncClient
+ .queryDocuments(createdCollection.getSelfLink(), "SELECT * FROM root", null)
+ .map(FeedResponsePage::getResults) //map page to its list of documents
+ .concatMap(Observable::from) //flatten the observable
+ .toList() //transform to a observable
+ .toBlocking() //block
+ .single(); //gets the List
+
+ // assert that there is no document found
+ assertThat(listOfDocuments, hasSize(0));
+ }
+
+ @Test
+ public void testDocumentRead_Async() throws Exception {
+
+ // read a document
+
+ //create a document
+ Document doc = new Document(String.format("{ 'id': 'doc%d', 'counter': '%d'}", 1, 1));
+ String documentLink = asyncClient.createDocument(createdCollection.getSelfLink(), doc, null, false).toBlocking().single().getResource().getSelfLink();
+
+ // read the document
+ Observable> readDocumentObservable = asyncClient
+ .readDocument(documentLink, null);
+
+ List> capturedResponse = Collections.synchronizedList(new ArrayList>());
+
+ readDocumentObservable.subscribe(
+ resourceResponse -> {
+ capturedResponse.add(resourceResponse);
+ }
+
+ );
+
+ Thread.sleep(2000);
+
+ // assert document is retrieved
+ assertThat(capturedResponse, hasSize(1));
+ }
+
+ @Test
+ public void testTransformObservableToFuture() throws Exception {
+
+ // You can convert an Observable to a Future.
+ Document doc = new Document(String.format("{ 'id': 'doc%d', 'counter': '%d'}", 1, 1));
+ Observable> createDocumentObservable = asyncClient
+ .createDocument(createdCollection.getSelfLink(), doc, null, false);
+ Future> future = createDocumentObservable.toBlocking().toFuture();
+
+ ResourceResponse rrd = future.get();
+
+ assertThat(rrd.getRequestCharge(), greaterThan((double) 0));
+ System.out.print(rrd.getRequestCharge());
+ }
+
+ @Test
+ public void testTransformObservableToGoogleGuavaListenableFuture() throws Exception {
+
+ // You can convert an Observable to a ListenableFuture.
+ // ListenableFuture (part of google guava library) is a popular extension
+ // of Java's Future which allows registering listener callbacks:
+ // https://github.com/google/guava/wiki/ListenableFutureExplained
+ Document doc = new Document(String.format("{ 'id': 'doc%d', 'counter': '%d'}", 1, 1));
+ Observable> createDocumentObservable = asyncClient
+ .createDocument(createdCollection.getSelfLink(), doc, null, false);
+ ListenableFuture> listenableFuture = ListenableFutureObservable.to(createDocumentObservable);
+
+ ResourceResponse rrd = listenableFuture.get();
+
+ assertThat(rrd.getRequestCharge(), greaterThan((double) 0));
+ System.out.print(rrd.getRequestCharge());
+ }
+
+ private void cleanUpGeneratedDatabases() throws DocumentClientException {
+ LOGGER.info("cleanup databases invoked");
+
+ String[] allDatabaseIds = { DATABASE_ID };
+
+ for (String id : allDatabaseIds) {
+ try {
+ List> feedResponsePages = asyncClient
+ .queryDatabases(new SqlQuerySpec("SELECT * FROM root r WHERE r.id=@id",
+ new SqlParameterCollection(new SqlParameter("@id", id))), null).toList().toBlocking().single();
+
+
+ if (!feedResponsePages.get(0).getResults().isEmpty()) {
+ Database res = feedResponsePages.get(0).getResults().get(0);
+ LOGGER.info("deleting a database " + feedResponsePages.get(0));
+ asyncClient.deleteDatabase(res.getSelfLink(), null).toBlocking().single();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
diff --git a/azure-documentdb-examples/src/test/java/com/microsoft/azure/documentdb/rx/examples/DocumentQueryAsyncAPITest.java b/azure-documentdb-examples/src/test/java/com/microsoft/azure/documentdb/rx/examples/DocumentQueryAsyncAPITest.java
new file mode 100644
index 000000000000..0802156d5742
--- /dev/null
+++ b/azure-documentdb-examples/src/test/java/com/microsoft/azure/documentdb/rx/examples/DocumentQueryAsyncAPITest.java
@@ -0,0 +1,516 @@
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2016 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.documentdb.rx.examples;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.microsoft.azure.documentdb.ConnectionPolicy;
+import com.microsoft.azure.documentdb.ConsistencyLevel;
+import com.microsoft.azure.documentdb.Database;
+import com.microsoft.azure.documentdb.Document;
+import com.microsoft.azure.documentdb.DocumentClientException;
+import com.microsoft.azure.documentdb.DocumentCollection;
+import com.microsoft.azure.documentdb.FeedOptions;
+import com.microsoft.azure.documentdb.FeedResponsePage;
+import com.microsoft.azure.documentdb.PartitionKeyDefinition;
+import com.microsoft.azure.documentdb.RequestOptions;
+import com.microsoft.azure.documentdb.ResourceResponse;
+import com.microsoft.azure.documentdb.SqlParameter;
+import com.microsoft.azure.documentdb.SqlParameterCollection;
+import com.microsoft.azure.documentdb.SqlQuerySpec;
+import com.microsoft.azure.documentdb.internal.HttpConstants;
+import com.microsoft.azure.documentdb.rx.AsyncDocumentClient;
+import com.microsoft.azure.documentdb.rx.examples.TestConfigurations;
+
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.observable.ListenableFutureObservable;
+
+/**
+ * This integration test class demonstrates how to use Async API to query for
+ * documents.
+ *
+ * NOTE: you can use rxJava based async api with java8 lambda expression. Using of
+ * rxJava based async APIs with java8 lambda expressions is much prettier.
+ *
+ * You can also use the async API without java8 lambda expression.
+ *
+ * For example
+ *
+ *
{@link #testQueryDocuments_Async()} demonstrates how to use async api
+ * with java8 lambda expression.
+ *
+ *
{@link #testQueryDocuments_Async_withoutLambda()} demonstrates how to the same
+ * thing without lambda expression.
+ *
+ *
+ * Also if you need to work with Future or ListenableFuture it is possible to transform
+ * an observable to ListenableFuture. Please see {@link #testTransformObservableToGoogleGuavaListenableFuture()}
+ *
+ */
+public class DocumentQueryAsyncAPITest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DocumentQueryAsyncAPITest.class);
+
+ private static final String DATABASE_ID = "async-test-db";
+
+ private AsyncDocumentClient asyncClient;
+
+ private DocumentCollection createdCollection;
+ private Database createdDatabase;
+
+ private int numberOfDocuments;
+
+ @Before
+ public void setUp() throws DocumentClientException {
+
+ asyncClient = new AsyncDocumentClient.Builder()
+ .withServiceEndpoint(TestConfigurations.HOST)
+ .withMasterKey(TestConfigurations.MASTER_KEY)
+ .withConnectionPolicy(ConnectionPolicy.GetDefault())
+ .withConsistencyLevel(ConsistencyLevel.Session)
+ .build();
+
+ // Clean up the database.
+ this.cleanUpGeneratedDatabases();
+
+ Database databaseDefinition = new Database();
+ databaseDefinition.setId(DATABASE_ID);
+
+ DocumentCollection collectionDefinition = new DocumentCollection();
+ collectionDefinition.setId(UUID.randomUUID().toString());
+
+ // create database
+ ResourceResponse databaseCreationResponse = asyncClient.createDatabase(databaseDefinition, null)
+ .toBlocking().single();
+
+ createdDatabase = databaseCreationResponse.getResource();
+
+ // create collection
+ createdCollection = asyncClient
+ .createCollection(databaseCreationResponse.getResource().getSelfLink(), collectionDefinition, null)
+ .toBlocking().single().getResource();
+
+ numberOfDocuments = 20;
+ // add documents
+ for (int i = 0; i < numberOfDocuments; i++) {
+ Document doc = new Document(String.format("{ 'id': 'loc%d', 'counter': %d}", i, i));
+ asyncClient.createDocument(createdCollection.getSelfLink(), doc, null, true).toBlocking().single();
+ }
+ }
+
+ @After
+ public void shutdown() throws DocumentClientException {
+ asyncClient.close();
+ }
+
+ @Test
+ public void testQueryDocuments_Async() throws Exception {
+
+ // query for documents
+ // creates a document query observable and verifies the async behavior
+ // of document query observable
+
+ int requestPageSize = 3;
+ FeedOptions options = new FeedOptions();
+ options.setPageSize(requestPageSize);
+
+ Observable> documentQueryObservable = asyncClient
+ .queryDocuments(createdCollection.getSelfLink(), "SELECT * FROM root", options);
+
+ final CountDownLatch mainThreadBarrier = new CountDownLatch(1);
+
+ final CountDownLatch resultsCountDown = new CountDownLatch(numberOfDocuments);
+
+ // forEach(.) is an alias for subscribe(.)
+
+ documentQueryObservable.forEach(page -> {
+ try {
+ // waits on the barrier
+ mainThreadBarrier.await();
+ } catch (InterruptedException e) {
+ }
+
+ for (@SuppressWarnings("unused") Document d : page.getResults()) {
+ resultsCountDown.countDown();
+ }
+ });
+
+ // The following code will run concurrently
+
+ System.out.println("action is subscribed to the observable");
+
+ // release main thread barrier
+ System.out.println("after main thread barrier is released, subscribed observable action can continue");
+ mainThreadBarrier.countDown();
+
+ System.out.println("waiting for all the results using result count down latch");
+
+ resultsCountDown.await();
+ }
+
+ @Test
+ public void testQueryDocuments_Async_withoutLambda() throws Exception {
+
+ // query for documents
+ // creates a document query observable and verifies the async behavior
+ // of document query observable
+
+ // NOTE: does the same thing as testQueryDocuments_Async without java8 lambda expression
+
+ int requestPageSize = 3;
+ FeedOptions options = new FeedOptions();
+ options.setPageSize(requestPageSize);
+
+ Observable> documentQueryObservable = asyncClient
+ .queryDocuments(createdCollection.getSelfLink(), "SELECT * FROM root", options);
+
+ final CountDownLatch mainThreadBarrier = new CountDownLatch(1);
+
+ final CountDownLatch resultsCountDown = new CountDownLatch(numberOfDocuments);
+
+ Action1> actionPerPage = new Action1>() {
+
+ @SuppressWarnings("unused")
+ @Override
+ public void call(FeedResponsePage t) {
+
+ try {
+ // waits on the barrier
+ mainThreadBarrier.await();
+ } catch (InterruptedException e) {
+ }
+
+ for (Document d : t.getResults()) {
+ resultsCountDown.countDown();
+ }
+ }
+ };
+
+ // forEach(.) is an alias for subscribe(.)
+ documentQueryObservable.forEach(actionPerPage);
+ // the following code will run concurrently
+
+ System.out.println("action is subscribed to the observable");
+
+ // release main thread barrier
+ System.out.println("after main thread barrier is released, subscribed observable action can continue");
+ mainThreadBarrier.countDown();
+
+ System.out.println("waiting for all the results using result count down latch");
+
+ resultsCountDown.await();
+ }
+
+ @Test
+ public void testQueryDocuments_findTotalRequestCharge() throws Exception {
+
+ // queries for documents and sum up the total request charge
+
+ int requestPageSize = 3;
+ FeedOptions options = new FeedOptions();
+ options.setPageSize(requestPageSize);
+
+ Observable totalChargeObservable = asyncClient
+ .queryDocuments(createdCollection.getSelfLink(), "SELECT * FROM root", options)
+ .map(FeedResponsePage::getRequestCharge) // map the page to its request charge
+ .reduce((totalCharge, charge) -> totalCharge + charge); // sum up all the request charges
+
+ final CountDownLatch doneLatch = new CountDownLatch(1);
+
+ // subscribe(.) is the same as forEach(.)
+ totalChargeObservable.subscribe(totalCharge -> {
+ System.out.println(totalCharge);
+ doneLatch.countDown();
+ });
+
+ doneLatch.await();
+ }
+
+ @Test
+ public void testQueryDocuments_unsubscribeAfterFirstPage() throws Exception {
+
+ // subscriber unsubscribes after first page
+
+ int requestPageSize = 3;
+ FeedOptions options = new FeedOptions();
+ options.setPageSize(requestPageSize);
+
+ Observable> requestChargeObservable = asyncClient
+ .queryDocuments(createdCollection.getSelfLink(), "SELECT * FROM root", options);
+
+ AtomicInteger onNextCounter = new AtomicInteger();
+ AtomicInteger onCompletedCounter = new AtomicInteger();
+ AtomicInteger onErrorCounter = new AtomicInteger();
+
+ requestChargeObservable.subscribe(new Subscriber>() {
+
+ @Override
+ public void onCompleted() {
+ onCompletedCounter.incrementAndGet();
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ onErrorCounter.incrementAndGet();
+ }
+
+ @Override
+ public void onNext(FeedResponsePage page) {
+ onNextCounter.incrementAndGet();
+ unsubscribe();
+ }
+ });
+
+ Thread.sleep(4000);
+
+ // after subscriber unsubscribes, it doesn't receive any more events.
+ assertThat(onNextCounter.get(), equalTo(1));
+ assertThat(onCompletedCounter.get(), equalTo(0));
+ assertThat(onErrorCounter.get(), equalTo(0));
+ }
+
+ @Test
+ public void testQueryDocuments_filterFetchedResults() throws Exception {
+ // queries for documents and filter out the fetched results
+
+ int requestPageSize = 3;
+ FeedOptions options = new FeedOptions();
+ options.setPageSize(requestPageSize);
+
+ Func1 isPrimeNumber = new Func1() {
+
+ @Override
+ public Boolean call(Document doc) {
+ int n = doc.getInt("counter");
+ if (n <= 1) return false;
+ for(int i = 2; 2*i < n; i++) {
+ if(n % i == 0)
+ return false;
+ }
+ return true;
+ }
+ };
+
+ List resultList = Collections.synchronizedList(new ArrayList());
+
+ asyncClient
+ .queryDocuments(createdCollection.getSelfLink(), "SELECT * FROM root", options)
+ .map(FeedResponsePage::getResults) // map the page to the list of documents
+ .concatMap(Observable::from) // flatten the observable> to observable
+ .filter(isPrimeNumber) // filter documents using isPrimeNumber predicate
+ .subscribe(doc -> resultList.add(doc)); // collect the results
+
+ Thread.sleep(4000);
+
+ int expectedNumberOfPrimes = 0;
+ // find all the documents with prime number counter
+ for(int i = 0; i < numberOfDocuments; i++) {
+ boolean isPrime = true;
+ if (i <= 1) isPrime = false;
+ for(int j = 2; 2*j < i; j++) {
+ if(i % j == 0) {
+ isPrime = false;
+ break;
+ }
+ }
+
+ if (isPrime) {
+ expectedNumberOfPrimes++;
+ }
+ }
+
+ // assert that we only collected what's expected
+ assertThat(resultList, hasSize(expectedNumberOfPrimes));
+ }
+
+ @Test
+ public void testQueryDocuments_toBlocking_toIterator() throws DocumentClientException {
+
+ // queries for documents
+ // converts the document query observable to blocking observable and
+ // uses that to find all documents
+
+ // query for documents
+ int requestPageSize = 3;
+ FeedOptions options = new FeedOptions();
+ options.setPageSize(requestPageSize);
+
+ Observable> documentQueryObservable = asyncClient
+ .queryDocuments(createdCollection.getSelfLink(), "SELECT * FROM root", options);
+
+ // covert the observable to a blocking observable, then convert the blocking observable to an iterator
+ Iterator> it = documentQueryObservable.toBlocking().getIterator();
+
+ int pageCounter = 0;
+ int numberOfResults = 0;
+ while (it.hasNext()) {
+ FeedResponsePage page = it.next();
+ pageCounter++;
+
+ String pageSizeAsString = page.getResponseHeaders().get(HttpConstants.HttpHeaders.ITEM_COUNT);
+ assertThat("header item count must be present", pageSizeAsString, notNullValue());
+ int pageSize = Integer.valueOf(pageSizeAsString);
+ assertThat("Result size must match header item count", page.getResults(), hasSize(pageSize));
+ numberOfResults += pageSize;
+ }
+ assertThat("number of total results", numberOfResults, equalTo(numberOfDocuments));
+ assertThat("number of result pages", pageCounter,
+ equalTo((numberOfDocuments + requestPageSize - 1) / requestPageSize));
+ }
+
+ @Test
+ public void testOrderBy_Async() throws Exception {
+ // create a partitioned collection
+ String collectionId = UUID.randomUUID().toString();
+ DocumentCollection multiPartitionCollection = createMultiPartitionCollection(createdDatabase.getSelfLink(), collectionId, "/key");
+
+ // insert documents
+ int totalNumberOfDocumentsInMultiPartitionCollection = 10;
+ for (int i = 0; i < totalNumberOfDocumentsInMultiPartitionCollection; i++) {
+
+ Document doc = new Document(String.format( "{\"id\":\"documentId%d\",\"key\":\"%s\",\"prop\":%d}",
+ i, RandomStringUtils.randomAlphabetic(2), i));
+ asyncClient.createDocument(multiPartitionCollection.getSelfLink(), doc, null, true).toBlocking().single();
+ }
+
+ // query for the documents order by the prop field
+ SqlQuerySpec query = new SqlQuerySpec("SELECT r.id FROM r ORDER BY r.prop", new SqlParameterCollection());
+ FeedOptions options = new FeedOptions();
+ options.setEnableCrossPartitionQuery(true);
+ options.setPageSize(1);
+
+ // get the observable order by query documents
+ Observable> documentQueryObservable = asyncClient
+ .queryDocuments(multiPartitionCollection.getSelfLink(), query, options);
+
+ List resultList = (List) Collections.synchronizedList(new ArrayList());
+
+ documentQueryObservable
+ .map(FeedResponsePage::getResults) // map the logical page to the list of documents in the page
+ .concatMap(Observable::from) // flatten the list of documents
+ .map(doc -> doc.getId()) // map to the document Id
+ .forEach(docId -> resultList.add(docId)); // add each document Id to the resultList
+
+ Thread.sleep(4000);
+
+ // assert we found all the results
+ assertThat(resultList, hasSize(totalNumberOfDocumentsInMultiPartitionCollection));
+ for(int i = 0; i < totalNumberOfDocumentsInMultiPartitionCollection; i++) {
+ String docId = resultList.get(i);
+ // assert that the order of the documents are valid
+ assertThat(docId, equalTo("documentId" + i));
+ }
+ }
+
+ @Test
+ public void testTransformObservableToGoogleGuavaListenableFuture() throws Exception {
+ // You can convert an Observable to a ListenableFuture.
+ // ListenableFuture (part of google guava library) is a popular extension
+ // of Java's Future which allows registering listener callbacks:
+ // https://github.com/google/guava/wiki/ListenableFutureExplained
+
+ int requestPageSize = 3;
+ FeedOptions options = new FeedOptions();
+ options.setPageSize(requestPageSize);
+
+ Observable> documentQueryObservable = asyncClient
+ .queryDocuments(createdCollection.getSelfLink(), "SELECT * FROM root", options);
+
+ // convert to observable of list of pages
+ Observable>> allPagesObservable = documentQueryObservable.toList();
+
+ // convert the observable of list of pages to a Future
+ ListenableFuture>> future = ListenableFutureObservable.to(allPagesObservable);
+
+ List> pageList = future.get();
+
+ int totalNumberOfRetrievedDocuments = 0;
+ for(FeedResponsePage page: pageList) {
+ totalNumberOfRetrievedDocuments += page.getResults().size();
+ }
+ assertThat(numberOfDocuments, equalTo(totalNumberOfRetrievedDocuments));
+ }
+
+ private void cleanUpGeneratedDatabases() throws DocumentClientException {
+ LOGGER.info("cleanup databases invoked");
+
+ String[] allDatabaseIds = { DATABASE_ID };
+
+ for (String id : allDatabaseIds) {
+ try {
+ List> feedResponsePages = asyncClient
+ .queryDatabases(new SqlQuerySpec("SELECT * FROM root r WHERE r.id=@id",
+ new SqlParameterCollection(new SqlParameter("@id", id))), null)
+ .toList().toBlocking().single();
+
+ if (!feedResponsePages.get(0).getResults().isEmpty()) {
+ Database res = feedResponsePages.get(0).getResults().get(0);
+ LOGGER.info("deleting a database " + feedResponsePages.get(0));
+ asyncClient.deleteDatabase(res.getSelfLink(), null).toBlocking().single();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private DocumentCollection createMultiPartitionCollection(String databaseLink, String collectionId,
+ String partitionKeyPath) throws DocumentClientException {
+ PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
+ ArrayList paths = new ArrayList();
+ paths.add(partitionKeyPath);
+ partitionKeyDef.setPaths(paths);
+
+ RequestOptions options = new RequestOptions();
+ options.setOfferThroughput(10100);
+ DocumentCollection collectionDefinition = new DocumentCollection();
+ collectionDefinition.setId(collectionId);
+ collectionDefinition.setPartitionKey(partitionKeyDef);
+ DocumentCollection createdCollection = asyncClient
+ .createCollection(databaseLink, collectionDefinition, options).toBlocking().single().getResource();
+
+ return createdCollection;
+ }
+}
diff --git a/azure-documentdb-examples/src/test/java/com/microsoft/azure/documentdb/rx/examples/TestConfigurations.java b/azure-documentdb-examples/src/test/java/com/microsoft/azure/documentdb/rx/examples/TestConfigurations.java
new file mode 100644
index 000000000000..15d8605b8dfc
--- /dev/null
+++ b/azure-documentdb-examples/src/test/java/com/microsoft/azure/documentdb/rx/examples/TestConfigurations.java
@@ -0,0 +1,35 @@
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2016 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.documentdb.rx.examples;
+
+/**
+ * Contains the configurations for test file
+ */
+public final class TestConfigurations {
+ // Replace MASTER_KEY and HOST with values from your DocumentDB account.
+ // The default values are credentials of the local emulator, which are not used in any production environment.
+ //
+ public static final String MASTER_KEY =
+ "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
+ public static final String HOST = "https://localhost:443/";
+}
diff --git a/azure-documentdb-examples/src/test/resources/log4j.properties b/azure-documentdb-examples/src/test/resources/log4j.properties
new file mode 100644
index 000000000000..7a8ae3aef1e5
--- /dev/null
+++ b/azure-documentdb-examples/src/test/resources/log4j.properties
@@ -0,0 +1,17 @@
+# this is the log4j configuration for tests
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+# Set HTTP components' logger to INFO
+log4j.category.org.apache.http=WARN
+log4j.category.org.apache.http.wire=WARN
+log4j.category.org.apache.http.headers=WARN
+log4j.category.com.microsoft.azure.documentdb.internal.ServiceJNIWrapper=ERROR
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%d %5X{pid} [%t] %-5p %c - %m%n
\ No newline at end of file
diff --git a/azure-documentdb-rx/pom.xml b/azure-documentdb-rx/pom.xml
new file mode 100644
index 000000000000..4c5d417268ea
--- /dev/null
+++ b/azure-documentdb-rx/pom.xml
@@ -0,0 +1,302 @@
+
+ 4.0.0
+
+ com.microsoft.azure
+ azure-documentdb-rx
+ 0.9.0-SNAPSHOT
+ jar
+
+ azure-documentdb-rx
+ Java Reactive Extension (Rx) for Microsoft Azure DocumentDB SDK
+ http://azure.microsoft.com/en-us/services/documentdb/
+
+
+ MIT License
+ http://www.opensource.org/licenses/mit-license.php
+
+
+
+ UTF-8
+ 1.7.6
+ 1.2.17
+ 4.1.7.Final
+ 0.4.20
+ 1.2.5
+ 6.8.8
+ 1.7.0
+ 1.10.8
+ unit
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.6.0
+
+ 1.8
+ 1.8
+
+
+
+ org.apache.maven.plugins
+ maven-gpg-plugin
+ 1.5
+
+
+ sign-artifacts
+ verify
+
+ sign
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-eclipse-plugin
+ 2.8
+
+
+
+ org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.19.1
+
+ ${test.groups}
+
+
+
+ org.apache.maven.surefire
+ surefire-testng
+ 2.19.1
+
+
+
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+ 2.19.1
+
+ ${test.groups}
+
+
+
+ maven-assembly-plugin
+ 2.2
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+ org.sonatype.plugins
+ nexus-staging-maven-plugin
+ 1.6.3
+ true
+
+ ossrh
+ https://oss.sonatype.org/
+ true
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 2.2.1
+
+
+ attach-sources
+
+ jar-no-fork
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+ 2.9.1
+
+
+ http://reactivex.io/RxJava/javadoc/
+ http://azure.github.io/azure-documentdb-java/
+
+
+ **/internal/**/*.java
+ **/*Internal.java
+
+
+
+
+ attach-javadocs
+
+ jar
+
+
+
+
+
+
+
+
+ default
+
+ default
+ unit
+
+
+ true
+
+
+
+ fast
+
+ default
+ unit,simple
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-report-plugin
+ 2.19.1
+
+
+ org.codehaus.mojo
+ findbugs-maven-plugin
+ 3.0.4
+
+
+ org.apache.maven.plugins
+ maven-jxr-plugin
+ 2.1
+
+
+
+
+
+ commons-io
+ commons-io
+ 2.5
+
+
+ io.reactivex
+ rxjava
+ ${rxjava.version}
+
+
+ io.reactivex
+ rxjava-string
+ 1.1.1
+
+
+ com.microsoft.azure
+ azure-documentdb
+ 1.9.7-SNAPSHOT
+
+
+ io.reactivex
+ rxnetty
+ ${rxnetty.version}
+
+
+ io.reactivex
+ rxnetty-servo
+ ${rxnetty.version}
+
+
+ io.netty
+ netty-codec-http
+ ${netty.version}
+
+
+ io.netty
+ netty-handler
+ ${netty.version}
+
+
+ io.netty
+ netty-transport
+ ${netty.version}
+
+
+ io.netty
+ netty-transport-native-epoll
+ ${netty.version}
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
+
+ org.apache.commons
+ commons-lang3
+ 3.5
+
+
+ org.testng
+ testng
+ ${testng.version}
+ test
+
+
+ org.assertj
+ assertj-core
+ ${assertj.version}
+ test
+
+
+ org.mockito
+ mockito-all
+ ${mockito.version}
+ test
+
+
+ org.slf4j
+ slf4j-log4j12
+ ${slf4j.version}
+ test
+
+
+ log4j
+ log4j
+ ${log4j.version}
+ test
+
+
+
+
+ DocumentDB Developer Platform Devs
+ docdbdevplatdevs@microsoft.com
+ Microsoft
+ http://www.microsoft.com/
+
+
+
+ scm:git:git@github.com:Azure/azure-documentdb-java.git
+ scm:git:git@github.com:Azure/azure-documentdb-java.git
+ git@github.com:Azure/azure-documentdb-java.git
+
+
diff --git a/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/BridgeInternal.java b/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/BridgeInternal.java
new file mode 100644
index 000000000000..9531b542b928
--- /dev/null
+++ b/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/BridgeInternal.java
@@ -0,0 +1,105 @@
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2016 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.documentdb;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import com.microsoft.azure.documentdb.internal.AbstractDocumentServiceRequest;
+import com.microsoft.azure.documentdb.internal.CollectionCacheInternal;
+import com.microsoft.azure.documentdb.internal.DocumentServiceResponse;
+import com.microsoft.azure.documentdb.internal.EndpointManager;
+import com.microsoft.azure.documentdb.internal.UserAgentContainer;
+import com.microsoft.azure.documentdb.internal.routing.ClientCollectionCache;
+import com.microsoft.azure.documentdb.rx.AsyncDocumentClient;
+import com.microsoft.azure.documentdb.rx.internal.Constants;
+import com.microsoft.azure.documentdb.rx.internal.RxDocumentClientImpl;
+
+/**
+ * This is meant to be used only internally as a bridge access to
+ * classes in com.microsoft.azure.documentdb
+ **/
+public class BridgeInternal {
+
+ public static Document documentFromObject(Object document) {
+ return Document.FromObject(document);
+ }
+
+ public static DocumentClient createDocumentClient(String serviceEndpoint, String masterKey, ConnectionPolicy connectionPolicy, ConsistencyLevel consistencyLevel) {
+ return new DocumentClient(serviceEndpoint, masterKey, connectionPolicy, consistencyLevel, null, null,
+ new UserAgentContainer(Constants.Versions.SDK_NAME, Constants.Versions.SDK_VERSION));
+ }
+
+ public static ResourceResponse toResourceResponse(DocumentServiceResponse response, Class cls) {
+ return new ResourceResponse(response, cls);
+ }
+
+ public static void validateResource(Resource resource){
+ DocumentClient.validateResource(resource);
+ }
+
+ public static void addPartitionKeyInformation(AbstractDocumentServiceRequest request,
+ Document document,
+ RequestOptions options, DocumentCollection collection){
+ DocumentClient.addPartitionKeyInformation(request, document, options, collection);
+ }
+
+ public static ClientCollectionCache createClientCollectionCache(AsyncDocumentClient asyncClient, ExecutorService executorService) {
+ CollectionCacheInternal collectionReader = new CollectionCacheInternal() {
+
+ @Override
+ public ResourceResponse readCollection(String collectionLink, RequestOptions options)
+ throws DocumentClientException {
+ return asyncClient.readCollection(collectionLink, options).toBlocking().single();
+ }
+ };
+ return new ClientCollectionCache(collectionReader, executorService);
+ }
+
+ public static Map getRequestHeaders(RequestOptions options) {
+ return DocumentClient.getRequestHeaders(options);
+ }
+
+ public static EndpointManager createGlobalEndpointManager(RxDocumentClientImpl asyncClient) {
+
+ DatabaseAccountManagerInternal databaseAccountManager = new DatabaseAccountManagerInternal() {
+
+ @Override
+ public URI getServiceEndpoint() {
+ return asyncClient.getServiceEndpoint();
+ }
+
+ @Override
+ public DatabaseAccount getDatabaseAccountFromEndpoint(URI endpoint) throws DocumentClientException {
+ return asyncClient.getDatabaseAccountFromEndpoint(endpoint).toBlocking().single();
+ }
+
+ @Override
+ public ConnectionPolicy getConnectionPolicy() {
+ return asyncClient.getConnectionPolicy();
+ }
+ };
+ return new GlobalEndpointManager(databaseAccountManager);
+ }
+}
diff --git a/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/FeedResponsePage.java b/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/FeedResponsePage.java
new file mode 100644
index 000000000000..b1c8386eaa45
--- /dev/null
+++ b/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/FeedResponsePage.java
@@ -0,0 +1,313 @@
+package com.microsoft.azure.documentdb;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.microsoft.azure.documentdb.internal.Constants;
+import com.microsoft.azure.documentdb.internal.HttpConstants;
+
+public class FeedResponsePage {
+
+ private final List results;
+ private final Map header;
+ private final HashMap usageHeaders;
+ private final HashMap quotaHeaders;
+
+ public FeedResponsePage(List results, Map header) {
+ this.results = results;
+ this.header = header;
+ this.usageHeaders = new HashMap();
+ this.quotaHeaders = new HashMap();
+ }
+
+ public List getResults() {
+ return results;
+ }
+
+ /**
+ * Max Quota.
+ *
+ * @return the database quota.
+ */
+ public long getDatabaseQuota() {
+ return this.getMaxQuotaHeader(Constants.Quota.DATABASE);
+ }
+
+ /**
+ * Current Usage.
+ *
+ * @return the current database usage.
+ */
+ public long getDatabaseUsage() {
+ return this.getCurrentQuotaHeader(Constants.Quota.DATABASE);
+ }
+
+ /**
+ * Max Quota.
+ *
+ * @return the collection quota.
+ */
+ public long getCollectionQuota() {
+ return this.getMaxQuotaHeader(Constants.Quota.COLLECTION);
+ }
+
+ /**
+ * Current Usage.
+ *
+ * @return the current collection usage.
+ */
+ public long getCollectionUsage() {
+ return this.getCurrentQuotaHeader(Constants.Quota.COLLECTION);
+ }
+
+ /**
+ * Max Quota.
+ *
+ * @return the user quota.
+ */
+ public long getUserQuota() {
+ return this.getMaxQuotaHeader(Constants.Quota.USER);
+ }
+
+ /**
+ * Current Usage.
+ *
+ * @return the current user usage.
+ */
+ public long getUserUsage() {
+ return this.getCurrentQuotaHeader(Constants.Quota.USER);
+ }
+
+ /**
+ * Max Quota.
+ *
+ * @return the permission quota.
+ */
+ public long getPermissionQuota() {
+ return this.getMaxQuotaHeader(Constants.Quota.PERMISSION);
+ }
+
+ /**
+ * Current Usage.
+ *
+ * @return the current permission usage.
+ */
+ public long getPermissionUsage() {
+ return this.getCurrentQuotaHeader(Constants.Quota.PERMISSION);
+ }
+
+ /**
+ * Max Quota.
+ *
+ * @return the collection size quota.
+ */
+ public long getCollectionSizeQuota() {
+ return this.getMaxQuotaHeader(Constants.Quota.COLLECTION_SIZE);
+ }
+
+ /**
+ * Current Usage.
+ *
+ * @return the current collection size usage.
+ */
+ public long getCollectionSizeUsage() {
+ return this.getCurrentQuotaHeader(Constants.Quota.COLLECTION_SIZE);
+ }
+
+ /**
+ * Max Quota.
+ *
+ * @return the stored procedure quota.
+ */
+ public long getStoredProceduresQuota() {
+ return this.getMaxQuotaHeader(Constants.Quota.STORED_PROCEDURE);
+ }
+
+ /**
+ * Current Usage.
+ *
+ * @return the current stored procedure usage.
+ */
+ public long getStoredProceduresUsage() {
+ return this.getCurrentQuotaHeader(Constants.Quota.STORED_PROCEDURE);
+ }
+
+ /**
+ * Max Quota.
+ *
+ * @return the triggers quota.
+ */
+ public long getTriggersQuota() {
+ return this.getMaxQuotaHeader(Constants.Quota.TRIGGER);
+ }
+
+ /**
+ * Current Usage.
+ *
+ * @return the current triggers usage.
+ */
+ public long getTriggersUsage() {
+ return this.getCurrentQuotaHeader(Constants.Quota.TRIGGER);
+ }
+
+ /**
+ * Max Quota.
+ *
+ * @return the user defined functions quota.
+ */
+ public long getUserDefinedFunctionsQuota() {
+ return this.getMaxQuotaHeader(Constants.Quota.USER_DEFINED_FUNCTION);
+ }
+
+ /**
+ * Current Usage.
+ *
+ * @return the current user defined functions usage.
+ */
+ public long getUserDefinedFunctionsUsage() {
+ return this.getCurrentQuotaHeader(Constants.Quota.USER_DEFINED_FUNCTION);
+ }
+
+ /**
+ * Gets the maximum size limit for this entity (in megabytes (MB) for server resources and in count for master
+ * resources).
+ *
+ * @return the max resource quota.
+ */
+ public String getMaxResourceQuota() {
+ return getValueOrNull(header,
+ HttpConstants.HttpHeaders.MAX_RESOURCE_QUOTA);
+ }
+
+ /**
+ * Gets the current size of this entity (in megabytes (MB) for server resources and in count for master resources).
+ *
+ * @return the current resource quota usage.
+ */
+ public String getCurrentResourceQuotaUsage() {
+ return getValueOrNull(header,
+ HttpConstants.HttpHeaders.CURRENT_RESOURCE_QUOTA_USAGE);
+ }
+
+ /**
+ * Gets the number of index paths (terms) generated by the operation.
+ *
+ * @return the request charge.
+ */
+ public double getRequestCharge() {
+ String value = getValueOrNull(header,
+ HttpConstants.HttpHeaders.REQUEST_CHARGE);
+ if (StringUtils.isEmpty(value)) {
+ return 0;
+ }
+ return Double.valueOf(value);
+ }
+
+ /**
+ * Gets the activity ID for the request.
+ *
+ * @return the activity id.
+ */
+ public String getActivityId() {
+ return getValueOrNull(header, HttpConstants.HttpHeaders.ACTIVITY_ID);
+ }
+
+ /**
+ * Gets the continuation token to be used for continuing the enumeration.
+ *
+ * @return the response continuation.
+ */
+ public String getResponseContinuation() {
+ return getValueOrNull(header, HttpConstants.HttpHeaders.CONTINUATION);
+ }
+
+ /**
+ * Gets the session token for use in session consistency.
+ *
+ * @return the session token.
+ */
+ public String getSessionToken() {
+ return getValueOrNull(header, HttpConstants.HttpHeaders.SESSION_TOKEN);
+ }
+
+ /**
+ * Gets the response headers.
+ *
+ * @return the response headers.
+ */
+ public Map getResponseHeaders() {
+ return header;
+ }
+
+ private long getCurrentQuotaHeader(String headerName) {
+ if (this.usageHeaders.size() == 0 && !this.getMaxResourceQuota().isEmpty() &&
+ !this.getCurrentResourceQuotaUsage().isEmpty()) {
+ this.populateQuotaHeader(this.getMaxResourceQuota(), this.getCurrentResourceQuotaUsage());
+ }
+
+ if (this.usageHeaders.containsKey(headerName)) {
+ return this.usageHeaders.get(headerName);
+ }
+
+ return 0;
+ }
+
+ private long getMaxQuotaHeader(String headerName) {
+ if (this.quotaHeaders.size() == 0 &&
+ !this.getMaxResourceQuota().isEmpty() &&
+ !this.getCurrentResourceQuotaUsage().isEmpty()) {
+ this.populateQuotaHeader(this.getMaxResourceQuota(), this.getCurrentResourceQuotaUsage());
+ }
+
+ if (this.quotaHeaders.containsKey(headerName)) {
+ return this.quotaHeaders.get(headerName);
+ }
+
+ return 0;
+ }
+
+ private void populateQuotaHeader(String headerMaxQuota,
+ String headerCurrentUsage) {
+ String[] headerMaxQuotaWords = headerMaxQuota.split(Constants.Quota.DELIMITER_CHARS, -1);
+ String[] headerCurrentUsageWords = headerCurrentUsage.split(Constants.Quota.DELIMITER_CHARS, -1);
+
+ for (int i = 0; i < headerMaxQuotaWords.length; ++i) {
+ if (headerMaxQuotaWords[i].equalsIgnoreCase(Constants.Quota.DATABASE)) {
+ this.quotaHeaders.put(Constants.Quota.DATABASE, Long.valueOf(headerMaxQuotaWords[i + 1]));
+ this.usageHeaders.put(Constants.Quota.DATABASE, Long.valueOf(headerCurrentUsageWords[i + 1]));
+ } else if (headerMaxQuotaWords[i].equalsIgnoreCase(Constants.Quota.COLLECTION)) {
+ this.quotaHeaders.put(Constants.Quota.COLLECTION, Long.valueOf(headerMaxQuotaWords[i + 1]));
+ this.usageHeaders.put(Constants.Quota.COLLECTION, Long.valueOf(headerCurrentUsageWords[i + 1]));
+ } else if (headerMaxQuotaWords[i].equalsIgnoreCase(Constants.Quota.USER)) {
+ this.quotaHeaders.put(Constants.Quota.USER, Long.valueOf(headerMaxQuotaWords[i + 1]));
+ this.usageHeaders.put(Constants.Quota.USER, Long.valueOf(headerCurrentUsageWords[i + 1]));
+ } else if (headerMaxQuotaWords[i].equalsIgnoreCase(Constants.Quota.PERMISSION)) {
+ this.quotaHeaders.put(Constants.Quota.PERMISSION, Long.valueOf(headerMaxQuotaWords[i + 1]));
+ this.usageHeaders.put(Constants.Quota.PERMISSION, Long.valueOf(headerCurrentUsageWords[i + 1]));
+ } else if (headerMaxQuotaWords[i].equalsIgnoreCase(Constants.Quota.COLLECTION_SIZE)) {
+ this.quotaHeaders.put(Constants.Quota.COLLECTION_SIZE, Long.valueOf(headerMaxQuotaWords[i + 1]));
+ this.usageHeaders.put(Constants.Quota.COLLECTION_SIZE, Long.valueOf(headerCurrentUsageWords[i + 1]));
+ } else if (headerMaxQuotaWords[i].equalsIgnoreCase(Constants.Quota.STORED_PROCEDURE)) {
+ this.quotaHeaders.put(Constants.Quota.STORED_PROCEDURE, Long.valueOf(headerMaxQuotaWords[i + 1]));
+ this.usageHeaders.put(Constants.Quota.STORED_PROCEDURE, Long.valueOf(headerCurrentUsageWords[i + 1]));
+ } else if (headerMaxQuotaWords[i].equalsIgnoreCase(Constants.Quota.TRIGGER)) {
+ this.quotaHeaders.put(Constants.Quota.TRIGGER, Long.valueOf(headerMaxQuotaWords[i + 1]));
+ this.usageHeaders.put(Constants.Quota.TRIGGER, Long.valueOf(headerCurrentUsageWords[i + 1]));
+ } else if (headerMaxQuotaWords[i].equalsIgnoreCase(Constants.Quota.USER_DEFINED_FUNCTION)) {
+ this.quotaHeaders.put(Constants.Quota.USER_DEFINED_FUNCTION, Long.valueOf(headerMaxQuotaWords[i + 1]));
+ this.usageHeaders.put(Constants.Quota.USER_DEFINED_FUNCTION,
+ Long.valueOf(headerCurrentUsageWords[i + 1]));
+ }
+ }
+ }
+
+ private static String getValueOrNull(Map map, String key) {
+ if (map != null) {
+ return map.get(key);
+ }
+ return null;
+ }
+}
diff --git a/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/internal/RetryPolicyBridgeInternal.java b/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/internal/RetryPolicyBridgeInternal.java
new file mode 100644
index 000000000000..3a7877ae60a8
--- /dev/null
+++ b/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/internal/RetryPolicyBridgeInternal.java
@@ -0,0 +1,51 @@
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2016 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.documentdb.internal;
+
+import com.microsoft.azure.documentdb.ConnectionPolicy;
+import com.microsoft.azure.documentdb.internal.routing.ClientCollectionCache;
+
+/**
+ * This is meant to be used only internally as a bridge access to
+ * classes in com.microsoft.azure.documentdb.internal
+ **/
+public class RetryPolicyBridgeInternal {
+
+ public static RetryPolicy createSessionReadRetryPolicy(EndpointManager globalEndpointManager, AbstractDocumentServiceRequest request) {
+ return new SessionReadRetryPolicy(globalEndpointManager, request);
+ }
+
+ public static RetryPolicy createEndpointDiscoveryRetryPolicy(ConnectionPolicy connectionPolicy, EndpointManager globalEndpointManager) {
+ return new EndpointDiscoveryRetryPolicy(connectionPolicy, globalEndpointManager);
+ }
+
+ public static RetryPolicy createResourceThrottleRetryPolicy(int maxRetryAttemptsOnThrottledRequests,
+ int maxRetryWaitTimeInSeconds) {
+ return new ResourceThrottleRetryPolicy(maxRetryAttemptsOnThrottledRequests, maxRetryWaitTimeInSeconds);
+ }
+
+ public static RetryPolicy createPartitionKeyMismatchRetryPolicy(String resourcePath,
+ ClientCollectionCache clientCollectionCache) {
+ return new PartitionKeyMismatchRetryPolicy(resourcePath, clientCollectionCache);
+ }
+}
diff --git a/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/rx/AsyncDocumentClient.java b/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/rx/AsyncDocumentClient.java
new file mode 100644
index 000000000000..64384bfa49ab
--- /dev/null
+++ b/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/rx/AsyncDocumentClient.java
@@ -0,0 +1,1429 @@
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2016 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.documentdb.rx;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import com.microsoft.azure.documentdb.Attachment;
+import com.microsoft.azure.documentdb.Conflict;
+import com.microsoft.azure.documentdb.ConnectionMode;
+import com.microsoft.azure.documentdb.ConnectionPolicy;
+import com.microsoft.azure.documentdb.ConsistencyLevel;
+import com.microsoft.azure.documentdb.Database;
+import com.microsoft.azure.documentdb.DatabaseAccount;
+import com.microsoft.azure.documentdb.Document;
+import com.microsoft.azure.documentdb.DocumentClient;
+import com.microsoft.azure.documentdb.DocumentCollection;
+import com.microsoft.azure.documentdb.FeedOptions;
+import com.microsoft.azure.documentdb.FeedResponsePage;
+import com.microsoft.azure.documentdb.MediaOptions;
+import com.microsoft.azure.documentdb.MediaResponse;
+import com.microsoft.azure.documentdb.Offer;
+import com.microsoft.azure.documentdb.Permission;
+import com.microsoft.azure.documentdb.RequestOptions;
+import com.microsoft.azure.documentdb.ResourceResponse;
+import com.microsoft.azure.documentdb.SqlQuerySpec;
+import com.microsoft.azure.documentdb.StoredProcedure;
+import com.microsoft.azure.documentdb.StoredProcedureResponse;
+import com.microsoft.azure.documentdb.Trigger;
+import com.microsoft.azure.documentdb.User;
+import com.microsoft.azure.documentdb.UserDefinedFunction;
+import com.microsoft.azure.documentdb.rx.internal.RxDocumentClientImpl;
+import com.microsoft.azure.documentdb.rx.internal.RxWrapperDocumentClientImpl;
+
+import rx.Observable;
+
+/**
+ * Provides a client-side logical representation of the Azure DocumentDB
+ * database service. This async client is used to configure and execute requests
+ * against the service.
+ *
+ *
+ * {@link AsyncDocumentClient} async APIs return rxJava's {@code
+ * Observable}, and so you can use rxJava {@link Observable} functionalities.
+ * The async {@link Observable} based APIs perform the requested operation only after
+ * subscription.
+ *
+ *
+ * The service client encapsulates the endpoint and credentials used to access
+ * the DocumentDB service.
+ *
+ * To instantiate you can use the {@link Builder}
+ *
+ */
+ public static class Builder {
+
+ private String masterKey;
+ private ConnectionPolicy connectionPolicy;
+ private ConsistencyLevel desiredConsistencyLevel;
+ private URI serviceEndpoint;
+ private int eventLoopSize = -1;
+ private int separateComputationPoolSize = -1;
+
+ public Builder withServiceEndpoint(String serviceEndpoint) {
+ try {
+ this.serviceEndpoint = new URI(serviceEndpoint);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ return this;
+ }
+
+ public Builder withMasterKey(String masterKey) {
+ this.masterKey = masterKey;
+ return this;
+ }
+
+ public Builder withConsistencyLevel(ConsistencyLevel desiredConsistencyLevel) {
+ this.desiredConsistencyLevel = desiredConsistencyLevel;
+ return this;
+ }
+
+ /**
+ * NOTE: This is experimental.
+ * If sets, modifies the event loop size and the computation pool size.
+ *
+ * As a rule of thumb (eventLoopSize + separateComputationPoolSize) ~ #
+ * CPU cores. If you have 8 CPU cores, eventLoopSize = 5, and
+ * separateComputationPoolSize = 3 is a logical choice for better throughput.
+ *
+ * Computation intensive work, e.g., authentication token generation, is
+ * performed on the computation scheduler. If
+ * separateComputationPoolSize is set to 0, computation will take place
+ * on the observable subscription thread.
+ *
+ * @param eventLoopSize the size of the event loop (the number of event loop threads).
+ * @param separateComputationPoolSize the size the thread pool backing computation scheduler up.
+ * @return Builder
+ */
+ Builder withWorkers(int eventLoopSize, int separateComputationPoolSize) {
+ ifThrowIllegalArgException(eventLoopSize <= 0, "invalid event loop size");
+ ifThrowIllegalArgException(separateComputationPoolSize < 0, "invalid computation scheduler pool size");
+ this.eventLoopSize = eventLoopSize;
+ this.separateComputationPoolSize = separateComputationPoolSize;
+ return this;
+ }
+
+ public Builder withConnectionPolicy(ConnectionPolicy connectionPolicy) {
+ this.connectionPolicy = connectionPolicy;
+ return this;
+ }
+
+ private void ifThrowIllegalArgException(boolean value, String error) {
+ if (value) {
+ throw new IllegalArgumentException(error);
+ }
+ }
+
+ public AsyncDocumentClient build() {
+
+ ifThrowIllegalArgException(this.serviceEndpoint == null, "cannot build client without service endpoint");
+ ifThrowIllegalArgException(this.masterKey == null, "cannot build client without masterKey");
+
+ if (connectionPolicy != null && ConnectionMode.DirectHttps != connectionPolicy.getConnectionMode()) {
+ return new RxDocumentClientImpl(serviceEndpoint, masterKey, connectionPolicy, desiredConsistencyLevel,
+ eventLoopSize, separateComputationPoolSize);
+ } else {
+
+ ifThrowIllegalArgException(this.eventLoopSize != -1, "eventLoopSize is not applicable in direct mode");
+ ifThrowIllegalArgException(this.separateComputationPoolSize != -1, "separateComputationPoolSize is not applicable in direct mode");
+
+ // fall back to RX wrapper with blocking IO
+ return new RxWrapperDocumentClientImpl(
+ new DocumentClient(serviceEndpoint.toString(), masterKey, connectionPolicy, desiredConsistencyLevel));
+ }
+ }
+ }
+
+ /**
+ * Gets the default service endpoint as passed in by the user during construction.
+ *
+ * @return the service endpoint URI
+ */
+ URI getServiceEndpoint();
+
+ /**
+ * Gets the current write endpoint chosen based on availability and preference.
+ *
+ * @return the write endpoint URI
+ */
+ URI getWriteEndpoint();
+
+ /**
+ * Gets the current read endpoint chosen based on availability and preference.
+ *
+ * @return the read endpoint URI
+ */
+ URI getReadEndpoint();
+
+ /**
+ * Gets the connection policy
+ *
+ * @return the connection policy
+ */
+ ConnectionPolicy getConnectionPolicy();
+
+ /**
+ * Creates a database.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the created database.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param database the database.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the created database or an error.
+ */
+ Observable> createDatabase(Database database, RequestOptions options);
+
+ /**
+ * Deletes a database.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the deleted database.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param databaseLink the database link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the deleted database or an error.
+ */
+ Observable> deleteDatabase(String databaseLink, RequestOptions options);
+
+ /**
+ * Reads a database.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the read database.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param databaseLink the database link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the read database or an error.
+ */
+ Observable> readDatabase(String databaseLink, RequestOptions options);
+
+ /**
+ * Reads all databases.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response of the read databases.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of read databases or an error.
+ */
+ Observable> readDatabases(FeedOptions options);
+
+ /**
+ * Query for databases.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response of the read databases.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param query the query.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of read databases or an error.
+ */
+ Observable> queryDatabases(String query, FeedOptions options);
+
+ /**
+ * Query for databases.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response of the obtained databases.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param querySpec the SQL query specification.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained databases or an error.
+ */
+ Observable> queryDatabases(SqlQuerySpec querySpec, FeedOptions options);
+
+ /**
+ * Creates a document collection.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the created collection.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param databaseLink the database link.
+ * @param collection the collection.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the created collection or an error.
+ */
+ Observable> createCollection(String databaseLink, DocumentCollection collection,
+ RequestOptions options);
+
+ /**
+ * Replaces a document collection.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the replaced document collection.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collection the document collection to use.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the replaced document collection or an error.
+ */
+ Observable> replaceCollection(DocumentCollection collection, RequestOptions options);
+
+ /**
+ * Deletes a document collection by the collection link.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response for the deleted database.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response for the deleted database or an error.
+
+ */
+ Observable> deleteCollection(String collectionLink, RequestOptions options);
+
+ /**
+ * Reads a document collection by the collection link.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the read collection.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the read collection or an error.
+ */
+ Observable> readCollection(String collectionLink, RequestOptions options);
+
+ /**
+ * Reads all document collections in a database.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response of the read collections.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param databaseLink the database link.
+ * @param options the fee options.
+ * @return the feed response with the read collections.
+ * @return an {@link Observable} containing one or several feed response pages of the read collections or an error.
+ */
+ Observable> readCollections(String databaseLink, FeedOptions options);
+
+ /**
+ * Query for document collections in a database.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response of the obtained collections.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param databaseLink the database link.
+ * @param query the query.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained collections or an error.
+ */
+ Observable> queryCollections(String databaseLink, String query, FeedOptions options);
+
+ /**
+ * Query for document collections in a database.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response of the obtained collections.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param databaseLink the database link.
+ * @param querySpec the SQL query specification.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained collections or an error.
+ */
+ Observable> queryCollections(String databaseLink, SqlQuerySpec querySpec, FeedOptions options);
+
+ /**
+ * Creates a document.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the created document.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the link to the parent document collection.
+ * @param document the document represented as a POJO or Document object.
+ * @param options the request options.
+ * @param disableAutomaticIdGeneration the flag for disabling automatic id generation.
+ * @return an {@link Observable} containing the single resource response with the created document or an error.
+
+ */
+ Observable> createDocument(String collectionLink, Object document, RequestOptions options,
+ boolean disableAutomaticIdGeneration);
+
+ /**
+ * Upserts a document.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the upserted document.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the link to the parent document collection.
+ * @param document the document represented as a POJO or Document object to upsert.
+ * @param options the request options.
+ * @param disableAutomaticIdGeneration the flag for disabling automatic id generation.
+ * @return an {@link Observable} containing the single resource response with the upserted document or an error.
+ */
+ Observable> upsertDocument(String collectionLink, Object document, RequestOptions options,
+ boolean disableAutomaticIdGeneration);
+
+ /**
+ * Replaces a document using a POJO object.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the replaced document.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param documentLink the document link.
+ * @param document the document represented as a POJO or Document object.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the replaced document or an error.
+ */
+ Observable> replaceDocument(String documentLink, Object document, RequestOptions options);
+
+ /**
+ * Replaces a document with the passed in document.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the replaced document.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param document the document to replace (containing the document id).
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the replaced document or an error.
+ */
+ Observable> replaceDocument(Document document, RequestOptions options);
+
+ /**
+ * Deletes a document by the document link.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response for the deleted document.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param documentLink the document link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response for the deleted document or an error.
+ */
+ Observable> deleteDocument(String documentLink, RequestOptions options);
+
+ /**
+ * Reads a document by the document link.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the read document.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param documentLink the document link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the read document or an error.
+ */
+ Observable> readDocument(String documentLink, RequestOptions options);
+
+ /**
+ * Reads all documents in a document collection.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response of the read documents.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the read documents or an error.
+ */
+ Observable> readDocuments(String collectionLink, FeedOptions options);
+
+
+ /**
+ * Query for documents in a document collection.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response of the obtained documents.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the link to the parent document collection.
+ * @param query the query.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained document or an error.
+ */
+ Observable> queryDocuments(String collectionLink, String query, FeedOptions options);
+
+ /**
+ * Query for documents in a document collection with a partitionKey
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response of the obtained documents.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the link to the parent document collection.
+ * @param query the query.
+ * @param options the feed options.
+ * @param partitionKey the partitionKey.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained documents or an error.
+ */
+ Observable> queryDocuments(String collectionLink, String query, FeedOptions options,
+ Object partitionKey);
+
+ /**
+ * Query for documents in a document collection.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response of the obtained documents.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the link to the parent document collection.
+ * @param querySpec the SQL query specification.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained documents or an error.
+ */
+ Observable> queryDocuments(String collectionLink, SqlQuerySpec querySpec, FeedOptions options);
+
+ /**
+ * Query for documents in a document collection.
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response of the obtained documents.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the link to the parent document collection.
+ * @param querySpec the SQL query specification.
+ * @param options the feed options.
+ * @param partitionKey the partitionKey.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained documents or an error.
+ */
+ Observable> queryDocuments(String collectionLink, SqlQuerySpec querySpec, FeedOptions options,
+ Object partitionKey);
+
+ /**
+ * Creates a stored procedure.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the created stored procedure.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param storedProcedure the stored procedure to create.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the created stored procedure or an error.
+ */
+ Observable> createStoredProcedure(String collectionLink, StoredProcedure storedProcedure,
+ RequestOptions options);
+
+ /**
+ * Upserts a stored procedure.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the upserted stored procedure.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param storedProcedure the stored procedure to upsert.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the upserted stored procedure or an error.
+ */
+ Observable> upsertStoredProcedure(String collectionLink, StoredProcedure storedProcedure,
+ RequestOptions options);
+
+ /**
+ * Replaces a stored procedure.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the replaced stored procedure.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param storedProcedure the stored procedure to use.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the replaced stored procedure or an error.
+ */
+ Observable> replaceStoredProcedure(StoredProcedure storedProcedure, RequestOptions options);
+
+ /**
+ * Deletes a stored procedure by the stored procedure link.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response for the deleted stored procedure.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param storedProcedureLink the stored procedure link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response for the deleted stored procedure or an error.
+ */
+ Observable> deleteStoredProcedure(String storedProcedureLink, RequestOptions options);
+
+ /**
+ * Read a stored procedure by the stored procedure link.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the read stored procedure.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param storedProcedureLink the stored procedure link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the read stored procedure or an error.
+ */
+ Observable> readStoredProcedure(String storedProcedureLink, RequestOptions options);
+
+ /**
+ * Reads all stored procedures in a document collection link.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the read stored procedures.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the read stored procedures or an error.
+ */
+ Observable> readStoredProcedures(String collectionLink, FeedOptions options);
+
+ /**
+ * Query for stored procedures in a document collection.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the obtained stored procedures.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param query the query.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained stored procedures or an error.
+ */
+ Observable> queryStoredProcedures(String collectionLink, String query, FeedOptions options);
+
+ /**
+ * Query for stored procedures in a document collection.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the obtained stored procedures.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param querySpec the SQL query specification.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained stored procedures or an error.
+ */
+ Observable> queryStoredProcedures(String collectionLink, SqlQuerySpec querySpec,
+ FeedOptions options);
+
+ /**
+ * Executes a stored procedure by the stored procedure link.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the stored procedure response.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param storedProcedureLink the stored procedure link.
+ * @param procedureParams the array of procedure parameter values.
+ * @return an {@link Observable} containing the single resource response with the stored procedure response or an error.
+ */
+ Observable executeStoredProcedure(String storedProcedureLink, Object[] procedureParams);
+
+ /**
+ * Executes a stored procedure by the stored procedure link.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the stored procedure response.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param storedProcedureLink the stored procedure link.
+ * @param options the request options.
+ * @param procedureParams the array of procedure parameter values.
+ * @return an {@link Observable} containing the single resource response with the stored procedure response or an error.
+ */
+ Observable executeStoredProcedure(String storedProcedureLink, RequestOptions options,
+ Object[] procedureParams);
+
+ /**
+ * Creates a trigger.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the created trigger.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param trigger the trigger.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the created trigger or an error.
+ */
+ Observable> createTrigger(String collectionLink, Trigger trigger, RequestOptions options);
+
+ /**
+ * Upserts a trigger.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the upserted trigger.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param trigger the trigger to upsert.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the upserted trigger or an error.
+ */
+ Observable> upsertTrigger(String collectionLink, Trigger trigger, RequestOptions options);
+
+ /**
+ * Replaces a trigger.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the replaced trigger.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param trigger the trigger to use.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the replaced trigger or an error.
+ */
+ Observable> replaceTrigger(Trigger trigger, RequestOptions options);
+
+ /**
+ * Deletes a trigger.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response for the deleted trigger.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param triggerLink the trigger link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response for the deleted trigger or an error.
+ */
+ Observable> deleteTrigger(String triggerLink, RequestOptions options);
+
+ /**
+ * Reads a trigger by the trigger link.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response for the read trigger.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param triggerLink the trigger link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response for the read trigger or an error.
+ */
+ Observable> readTrigger(String triggerLink, RequestOptions options);
+
+ /**
+ * Reads all triggers in a document collection.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the read triggers.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the read triggers or an error.
+ */
+ Observable> readTriggers(String collectionLink, FeedOptions options);
+
+ /**
+ * Query for triggers.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the obtained triggers.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param query the query.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained triggers or an error.
+ */
+ Observable> queryTriggers(String collectionLink, String query, FeedOptions options);
+
+ /**
+ * Query for triggers.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the obtained triggers.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param querySpec the SQL query specification.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained triggers or an error.
+ */
+ Observable> queryTriggers(String collectionLink, SqlQuerySpec querySpec, FeedOptions options);
+
+ /**
+ * Creates a user defined function.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the created user defined function.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param udf the user defined function.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the created user defined function or an error.
+ */
+ Observable> createUserDefinedFunction(String collectionLink, UserDefinedFunction udf,
+ RequestOptions options);
+
+ /**
+ * Upserts a user defined function.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the upserted user defined function.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param udf the user defined function to upsert.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the upserted user defined function or an error.
+ */
+ Observable> upsertUserDefinedFunction(String collectionLink, UserDefinedFunction udf,
+ RequestOptions options);
+
+ /**
+ * Replaces a user defined function.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the replaced user defined function.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param udf the user defined function.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the replaced user defined function or an error.
+ */
+ Observable> replaceUserDefinedFunction(UserDefinedFunction udf, RequestOptions options);
+
+ /**
+ * Deletes a user defined function.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response for the deleted user defined function.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param udfLink the user defined function link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response for the deleted user defined function or an error.
+ */
+ Observable> deleteUserDefinedFunction(String udfLink, RequestOptions options);
+
+ /**
+ * Read a user defined function.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response for the read user defined function.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param udfLink the user defined function link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response for the read user defined function or an error.
+ */
+ Observable> readUserDefinedFunction(String udfLink, RequestOptions options);
+
+ /**
+ * Reads all user defined functions in a document collection.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the read user defined functions.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the read user defined functions or an error.
+ */
+ Observable> readUserDefinedFunctions(String collectionLink, FeedOptions options);
+
+ /**
+ * Query for user defined functions.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the obtained user defined functions.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param query the query.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained user defined functions or an error.
+ */
+ Observable> queryUserDefinedFunctions(String collectionLink, String query,
+ FeedOptions options);
+
+ /**
+ * Query for user defined functions.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the obtained user defined functions.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param querySpec the SQL query specification.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained user defined functions or an error.
+ */
+ Observable> queryUserDefinedFunctions(String collectionLink, SqlQuerySpec querySpec,
+ FeedOptions options);
+
+ /**
+ * Creates an attachment.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the created attachment.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param documentLink the document link.
+ * @param attachment the attachment to create.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the created attachment or an error.
+ */
+ Observable> createAttachment(String documentLink, Attachment attachment, RequestOptions options);
+
+ /**
+ * Upserts an attachment.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the upserted attachment.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param documentLink the document link.
+ * @param attachment the attachment to upsert.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the upserted attachment or an error.
+ */
+ Observable> upsertAttachment(String documentLink, Attachment attachment, RequestOptions options);
+
+ /**
+ * Replaces an attachment.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the replaced attachment.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param attachment the attachment to use.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the replaced attachment or an error.
+ */
+ Observable> replaceAttachment(Attachment attachment, RequestOptions options);
+
+ /**
+ * Deletes an attachment.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response for the deleted attachment.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param attachmentLink the attachment link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response for the deleted attachment or an error.
+ */
+ Observable> deleteAttachment(String attachmentLink, RequestOptions options);
+
+ /**
+ * Reads an attachment.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the read attachment.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param attachmentLink the attachment link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the read attachment or an error.
+ */
+ Observable> readAttachment(String attachmentLink, RequestOptions options);
+
+ /**
+ * Reads all attachments in a document.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the read attachments.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param documentLink the document link.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the read attachments or an error.
+ */
+ Observable> readAttachments(String documentLink, FeedOptions options);
+
+ /**
+ * Query for attachments.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the obtained attachments.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param documentLink the document link.
+ * @param query the query.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained attachments or an error.
+ */
+ Observable> queryAttachments(String documentLink, String query, FeedOptions options);
+
+ /**
+ * Query for attachments.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the obtained attachments.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param documentLink the document link.
+ * @param querySpec the SQL query specification.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained attachments or an error.
+ */
+ Observable> queryAttachments(String documentLink, SqlQuerySpec querySpec, FeedOptions options);
+
+ /**
+ * Creates an attachment.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the created attachment.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param documentLink the document link.
+ * @param mediaStream the media stream for creating the attachment.
+ * @param options the media options.
+ * @return an {@link Observable} containing the single resource response with the created attachment or an error.
+ */
+ Observable> createAttachment(String documentLink, InputStream mediaStream, MediaOptions options);
+
+ /**
+ * Upserts an attachment to the media stream
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the upserted attachment.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param documentLink the document link.
+ * @param mediaStream the media stream for upserting the attachment.
+ * @param options the media options.
+ * @return an {@link Observable} containing the single resource response with the upserted attachment or an error.
+ */
+ Observable> upsertAttachment(String documentLink, InputStream mediaStream, MediaOptions options);
+
+ /**
+ * Reads a media by the media link.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single media response.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param mediaLink the media link.
+ * @return an {@link Observable} containing the single meadia response or an error.
+ */
+ Observable readMedia(String mediaLink);
+
+ /**
+ * Updates a media by the media link.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single media response.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param mediaLink the media link.
+ * @param mediaStream the media stream to upload.
+ * @param options the media options.
+ * @return an {@link Observable} containing the single meadia response or an error.
+ */
+ Observable updateMedia(String mediaLink, InputStream mediaStream, MediaOptions options);
+
+ /**
+ * Reads a conflict.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the read conflict.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param conflictLink the conflict link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the read conflict or an error.
+ */
+ Observable> readConflict(String conflictLink, RequestOptions options);
+
+ /**
+ * Reads all conflicts in a document collection.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the read conflicts.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the read conflicts or an error.
+ */
+ Observable> readConflicts(String collectionLink, FeedOptions options);
+
+ /**
+ * Query for conflicts.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the obtained conflicts.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param query the query.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained conflicts or an error.
+ */
+ Observable> queryConflicts(String collectionLink, String query, FeedOptions options);
+
+ /**
+ * Query for conflicts.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the obtained conflicts.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param collectionLink the collection link.
+ * @param querySpec the SQL query specification.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained conflicts or an error.
+ */
+ Observable> queryConflicts(String collectionLink, SqlQuerySpec querySpec, FeedOptions options);
+
+ /**
+ * Deletes a conflict.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response for the deleted conflict.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param conflictLink the conflict link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response for the deleted conflict or an error.
+ */
+ Observable> deleteConflict(String conflictLink, RequestOptions options);
+
+ /**
+ * Creates a user.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the created user.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param databaseLink the database link.
+ * @param user the user to create.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the created user or an error.
+ */
+ Observable> createUser(String databaseLink, User user, RequestOptions options);
+
+ /**
+ * Upserts a user.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the upserted user.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param databaseLink the database link.
+ * @param user the user to upsert.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the upserted user or an error.
+ */
+ Observable> upsertUser(String databaseLink, User user, RequestOptions options);
+
+ /**
+ * Replaces a user.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the replaced user.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param user the user to use.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the replaced user or an error.
+ */
+ Observable> replaceUser(User user, RequestOptions options);
+
+ /**
+ * Deletes a user.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response for the deleted user.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param userLink the user link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response for the deleted user or an error.
+ */
+ Observable> deleteUser(String userLink, RequestOptions options);
+
+ /**
+ * Reads a user.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the read user.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param userLink the user link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the read user or an error.
+ */
+ Observable> readUser(String userLink, RequestOptions options);
+
+ /**
+ * Reads all users in a database.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the read users.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param databaseLink the database link.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the read users or an error.
+ */
+ Observable> readUsers(String databaseLink, FeedOptions options);
+
+ /**
+ * Query for users.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the obtained users.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param databaseLink the database link.
+ * @param query the query.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained users or an error.
+ */
+ Observable> queryUsers(String databaseLink, String query, FeedOptions options);
+
+ /**
+ * Query for users.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the obtained users.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param databaseLink the database link.
+ * @param querySpec the SQL query specification.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained users or an error.
+ */
+ Observable> queryUsers(String databaseLink, SqlQuerySpec querySpec, FeedOptions options);
+
+ /**
+ * Creates a permission.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the created permission.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param userLink the user link.
+ * @param permission the permission to create.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the created permission or an error.
+ */
+ Observable> createPermission(String userLink, Permission permission, RequestOptions options);
+
+ /**
+ * Upserts a permission.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the upserted permission.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param userLink the user link.
+ * @param permission the permission to upsert.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the upserted permission or an error.
+ */
+ Observable> upsertPermission(String userLink, Permission permission, RequestOptions options);
+
+ /**
+ * Replaces a permission.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the replaced permission.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param permission the permission to use.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the replaced permission or an error.
+ */
+ Observable> replacePermission(Permission permission, RequestOptions options);
+
+ /**
+ * Deletes a permission.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response for the deleted permission.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param permissionLink the permission link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response for the deleted permission or an error.
+ */
+ Observable> deletePermission(String permissionLink, RequestOptions options);
+
+ /**
+ * Reads a permission.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the read permission.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param permissionLink the permission link.
+ * @param options the request options.
+ * @return an {@link Observable} containing the single resource response with the read permission or an error.
+ */
+ Observable> readPermission(String permissionLink, RequestOptions options);
+
+ /**
+ * Reads all permissions.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the read permissions.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param permissionLink the permission link.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the read permissions or an error.
+ */
+ Observable> readPermissions(String permissionLink, FeedOptions options);
+
+ /**
+ * Query for permissions.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the obtained permissions.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param permissionLink the permission link.
+ * @param query the query.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained permissions or an error.
+ */
+ Observable> queryPermissions(String permissionLink, String query, FeedOptions options);
+
+ /**
+ * Query for permissions.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the obtained permissions.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param permissionLink the permission link.
+ * @param querySpec the SQL query specification.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained permissions or an error.
+ */
+ Observable> queryPermissions(String permissionLink, SqlQuerySpec querySpec, FeedOptions options);
+
+ /**
+ * Replaces an offer.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the replaced offer.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param offer the offer to use.
+ * @return an {@link Observable} containing the single resource response with the replaced offer or an error.
+ */
+ Observable> replaceOffer(Offer offer);
+
+ /**
+ * Reads an offer.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the read offer.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param offerLink the offer link.
+ * @return an {@link Observable} containing the single resource response with the read offer or an error.
+ */
+ Observable> readOffer(String offerLink);
+
+ /**
+ * Reads offers.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of the read offers.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the read offers or an error.
+ */
+ Observable> readOffers(FeedOptions options);
+
+ /**
+ * Query for offers in a database.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of obtained obtained offers.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param query the query.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained offers or an error.
+ */
+ Observable> queryOffers(String query, FeedOptions options);
+
+ /**
+ * Query for offers in a database.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} will contain one or several feed response pages of obtained obtained offers.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @param querySpec the query specification.
+ * @param options the feed options.
+ * @return an {@link Observable} containing one or several feed response pages of the obtained offers or an error.
+ */
+ Observable> queryOffers(SqlQuerySpec querySpec, FeedOptions options);
+
+ /**
+ * Gets database account information.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Observable} upon successful completion will contain a single resource response with the database account.
+ * In case of failure the {@link Observable} will error.
+ *
+ * @return an {@link Observable} containing the single resource response with the database account or an error.
+ */
+ Observable getDatabaseAccount();
+
+ /**
+ * Close this {@link AsyncDocumentClient} instance and cleans up the resources.
+ */
+ void close();
+
+}
\ No newline at end of file
diff --git a/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/rx/internal/Constants.java b/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/rx/internal/Constants.java
new file mode 100644
index 000000000000..c07f2eadf13a
--- /dev/null
+++ b/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/rx/internal/Constants.java
@@ -0,0 +1,31 @@
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2016 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.documentdb.rx.internal;
+
+public class Constants {
+
+ public static class Versions {
+ public static final String SDK_VERSION = "0.9.0-SNAPSHOT";
+ public static final String SDK_NAME = "documentdb-rxjava-sdk";
+ }
+}
diff --git a/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/rx/internal/CreateDocumentRetryHandler.java b/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/rx/internal/CreateDocumentRetryHandler.java
new file mode 100644
index 000000000000..f19b7eaf69e8
--- /dev/null
+++ b/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/rx/internal/CreateDocumentRetryHandler.java
@@ -0,0 +1,83 @@
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2016 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.documentdb.rx.internal;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.microsoft.azure.documentdb.DocumentClientException;
+import com.microsoft.azure.documentdb.internal.HttpConstants;
+import com.microsoft.azure.documentdb.internal.RetryPolicy;
+import com.microsoft.azure.documentdb.internal.RetryPolicyBridgeInternal;
+import com.microsoft.azure.documentdb.internal.routing.ClientCollectionCache;
+
+import rx.Observable;
+
+class CreateDocumentRetryHandler implements RxRetryHandler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CreateDocumentRetryHandler.class);
+ private final RetryPolicy keyMismatchRetryPolicy;
+
+ public CreateDocumentRetryHandler(ClientCollectionCache clientCollectionCache,
+ String resourcePath) {
+
+ this.keyMismatchRetryPolicy = RetryPolicyBridgeInternal
+ .createPartitionKeyMismatchRetryPolicy(resourcePath, clientCollectionCache);
+ }
+
+ @Override
+ public Observable handleRetryAttempt(Throwable t, int attemptNumber) {
+
+ if (t instanceof DocumentClientException) {
+ try {
+ return handleRetryAttemptInternal((DocumentClientException) t, attemptNumber);
+ } catch (DocumentClientException e) {
+ return Observable.error(e);
+ }
+ } else {
+ return Observable.error(t);
+ }
+ }
+
+ private Observable handleRetryAttemptInternal(DocumentClientException e, int attemptNumber) throws DocumentClientException {
+
+ RetryPolicy retryPolicy = null;
+ if (e.getStatusCode() == HttpConstants.StatusCodes.BADREQUEST && e.getSubStatusCode() != null
+ && e.getSubStatusCode() == HttpConstants.SubStatusCodes.PARTITION_KEY_MISMATCH) {
+ // If HttpStatusCode is 404 (NotFound) and SubStatusCode is
+ // 1001 (PartitionKeyMismatch), invoke the partition key mismatch retry policy
+ retryPolicy = keyMismatchRetryPolicy;
+ }
+
+ if (retryPolicy == null || !retryPolicy.shouldRetry(e)) {
+ LOGGER.trace("Execution encontured exception: {}, status code {} sub status code {}. Won't retry!",
+ e.getMessage(), e.getStatusCode(), e.getSubStatusCode());
+ return Observable.error(e);
+ }
+ LOGGER.trace("Execution encontured exception: {}, status code {} sub status code {}. Will retry in {}ms",
+ e.getMessage(), e.getStatusCode(), e.getSubStatusCode(), retryPolicy.getRetryAfterInMilliseconds());
+
+ long waitTime = retryPolicy.getRetryAfterInMilliseconds();
+ return Observable.just(waitTime);
+ }
+}
diff --git a/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/rx/internal/ExecuteDocumentClientRequestRetryHandler.java b/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/rx/internal/ExecuteDocumentClientRequestRetryHandler.java
new file mode 100644
index 000000000000..112d892a3e65
--- /dev/null
+++ b/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/rx/internal/ExecuteDocumentClientRequestRetryHandler.java
@@ -0,0 +1,110 @@
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2016 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.documentdb.rx.internal;
+
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.microsoft.azure.documentdb.DocumentClientException;
+import com.microsoft.azure.documentdb.internal.RetryPolicyBridgeInternal;
+import com.microsoft.azure.documentdb.internal.EndpointManager;
+import com.microsoft.azure.documentdb.internal.HttpConstants;
+import com.microsoft.azure.documentdb.internal.RetryPolicy;
+import com.microsoft.azure.documentdb.rx.AsyncDocumentClient;
+
+import rx.Observable;
+
+/**
+ * Provides a Retry handler for executing the code block and retry if needed.
+ */
+class ExecuteDocumentClientRequestRetryHandler implements RxRetryHandler {
+ private final static Logger LOGGER = LoggerFactory.getLogger(ExecuteDocumentClientRequestRetryHandler.class);
+
+ private final RetryPolicy discoveryRetryPolicy;
+ private final RetryPolicy throttleRetryPolicy;
+ private final RetryPolicy sessionReadRetryPolicy;
+
+ public ExecuteDocumentClientRequestRetryHandler(RxDocumentServiceRequest request,
+ EndpointManager globalEndpointManager, AsyncDocumentClient client) {
+
+ this.discoveryRetryPolicy = RetryPolicyBridgeInternal.createEndpointDiscoveryRetryPolicy(
+ client.getConnectionPolicy(),
+ globalEndpointManager);
+
+ this.throttleRetryPolicy = RetryPolicyBridgeInternal.createResourceThrottleRetryPolicy(
+ client.getConnectionPolicy().getRetryOptions().getMaxRetryAttemptsOnThrottledRequests(),
+ client.getConnectionPolicy().getRetryOptions().getMaxRetryWaitTimeInSeconds());
+
+ this.sessionReadRetryPolicy = RetryPolicyBridgeInternal.createSessionReadRetryPolicy(
+ globalEndpointManager, request);
+ }
+
+ @Override
+ public Observable handleRetryAttempt(Throwable t, int attemptNumber) {
+
+ if (t instanceof DocumentClientException) {
+ try {
+ return handleRetryAttemptInternal((DocumentClientException) t, attemptNumber);
+ } catch (Exception e) {
+ return Observable.error(e);
+ }
+ } else {
+ return Observable.error(t);
+ }
+ }
+
+ public Observable handleRetryAttemptInternal(DocumentClientException e, int attemptNumber) throws DocumentClientException {
+
+ LOGGER.trace("Executing DocumentClientRequest");
+
+ RetryPolicy retryPolicy = null;
+ if (e.getStatusCode() == HttpConstants.StatusCodes.FORBIDDEN && e.getSubStatusCode() != null
+ && e.getSubStatusCode() == HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN) {
+ // If HttpStatusCode is 403 (Forbidden) and SubStatusCode is
+ // 3 (WriteForbidden),
+ // invoke the endpoint discovery retry policy
+ retryPolicy = discoveryRetryPolicy;
+ } else if (e.getStatusCode() == HttpConstants.StatusCodes.TOO_MANY_REQUESTS) {
+ // If HttpStatusCode is 429 (Too Many Requests), invoke the
+ // throttle retry policy
+ retryPolicy = throttleRetryPolicy;
+ } else if (e.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND && e.getSubStatusCode() != null
+ && e.getSubStatusCode() == HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE) {
+ // If HttpStatusCode is 404 (NotFound) and SubStatusCode is
+ // 1002 (ReadSessionNotAvailable), invoke the session read retry policy
+ retryPolicy = sessionReadRetryPolicy;
+ }
+
+ if (retryPolicy == null || !retryPolicy.shouldRetry(e)) {
+ LOGGER.trace("Execution encontured exception: {}, status code {} sub status code {}. Won't retry!",
+ e.getMessage(), e.getStatusCode(), e.getSubStatusCode());
+ return Observable.error(e);
+ }
+ LOGGER.trace("Execution encontured exception: {}, status code {} sub status code {}. Will retry in {}ms",
+ e.getMessage(), e.getStatusCode(), e.getSubStatusCode(), retryPolicy.getRetryAfterInMilliseconds());
+
+ return Observable.timer(retryPolicy.getRetryAfterInMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+}
diff --git a/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/rx/internal/RetryFunctionFactory.java b/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/rx/internal/RetryFunctionFactory.java
new file mode 100644
index 000000000000..7acf997d9871
--- /dev/null
+++ b/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/rx/internal/RetryFunctionFactory.java
@@ -0,0 +1,88 @@
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2016 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.documentdb.rx.internal;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.microsoft.azure.documentdb.DocumentClientException;
+
+import rx.Observable;
+import rx.functions.Func1;
+
+class RetryFunctionFactory {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(RetryFunctionFactory.class);
+
+ // this is just a safe guard, to ensure even if the retry policy doesn't give up we avoid infinite retries.
+ private final static int MAX_RETRIES_LIMIT = 200;
+
+ public static Func1, Observable> from(RxRetryHandler retryPolicy) {
+ return new Func1, Observable>() {
+
+ @Override
+ public Observable call(final Observable extends Throwable> failures) {
+
+ return failures
+ .zipWith(Observable.range(1, MAX_RETRIES_LIMIT),
+ (err, attempt) ->
+ attempt < MAX_RETRIES_LIMIT ?
+ handleRetryAttempt(err, attempt, retryPolicy) :
+ Observable.error(extractDocumentClientCause(err, attempt)) )
+ .flatMap(x -> x);
+ }
+ };
+ }
+
+ private static Throwable extractDocumentClientCause(Throwable t, int attemptNumber) {
+ if (t instanceof DocumentClientException) {
+ return t;
+ } else if (t instanceof RuntimeException && t.getCause() instanceof DocumentClientException) {
+ return t.getCause();
+ } else {
+ LOGGER.warn("unknown failure, cannot retry [{}], attempt number [{}]", t.getMessage(), attemptNumber, t);
+ return t;
+ }
+ }
+
+ private static Observable handleRetryAttempt(Throwable t, int attemptNumber, RxRetryHandler retryPolicy) {
+ Throwable cause = extractDocumentClientCause(t, attemptNumber);
+
+ if (LOGGER.isDebugEnabled()) {
+ if (cause instanceof DocumentClientException) {
+ DocumentClientException ex = (DocumentClientException) cause;
+ LOGGER.debug("Handling Failure Attempt [{}], StatusCode [{}], SubStatusCode,"
+ + " Error: [{}] ", attemptNumber, ex.getStatusCode(), ex.getSubStatusCode(), ex.getError(), ex);
+ } else {
+ LOGGER.debug("Handling Failure Attempt [{}], req [{}]", attemptNumber, cause);
+ }
+ }
+
+ try {
+ return retryPolicy.handleRetryAttempt(cause, attemptNumber);
+ } catch (Exception e) {
+ return Observable.error(e);
+ }
+ }
+}
diff --git a/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/rx/internal/RxDocumentClientImpl.java b/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/rx/internal/RxDocumentClientImpl.java
new file mode 100644
index 000000000000..900e34880953
--- /dev/null
+++ b/azure-documentdb-rx/src/main/java/com/microsoft/azure/documentdb/rx/internal/RxDocumentClientImpl.java
@@ -0,0 +1,1638 @@
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2016 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.documentdb.rx.internal;
+
+import static com.microsoft.azure.documentdb.BridgeInternal.documentFromObject;
+import static com.microsoft.azure.documentdb.BridgeInternal.toResourceResponse;
+
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.microsoft.azure.documentdb.Attachment;
+import com.microsoft.azure.documentdb.BridgeInternal;
+import com.microsoft.azure.documentdb.Conflict;
+import com.microsoft.azure.documentdb.ConnectionMode;
+import com.microsoft.azure.documentdb.ConnectionPolicy;
+import com.microsoft.azure.documentdb.ConsistencyLevel;
+import com.microsoft.azure.documentdb.Database;
+import com.microsoft.azure.documentdb.DatabaseAccount;
+import com.microsoft.azure.documentdb.Document;
+import com.microsoft.azure.documentdb.DocumentClient;
+import com.microsoft.azure.documentdb.DocumentClientException;
+import com.microsoft.azure.documentdb.DocumentCollection;
+import com.microsoft.azure.documentdb.FeedOptions;
+import com.microsoft.azure.documentdb.FeedResponsePage;
+import com.microsoft.azure.documentdb.MediaOptions;
+import com.microsoft.azure.documentdb.MediaResponse;
+import com.microsoft.azure.documentdb.Offer;
+import com.microsoft.azure.documentdb.Permission;
+import com.microsoft.azure.documentdb.RequestOptions;
+import com.microsoft.azure.documentdb.Resource;
+import com.microsoft.azure.documentdb.ResourceResponse;
+import com.microsoft.azure.documentdb.SqlQuerySpec;
+import com.microsoft.azure.documentdb.StoredProcedure;
+import com.microsoft.azure.documentdb.StoredProcedureResponse;
+import com.microsoft.azure.documentdb.Trigger;
+import com.microsoft.azure.documentdb.User;
+import com.microsoft.azure.documentdb.UserDefinedFunction;
+import com.microsoft.azure.documentdb.internal.BaseAuthorizationTokenProvider;
+import com.microsoft.azure.documentdb.internal.DocumentServiceResponse;
+import com.microsoft.azure.documentdb.internal.EndpointManager;
+import com.microsoft.azure.documentdb.internal.HttpConstants;
+import com.microsoft.azure.documentdb.internal.OperationType;
+import com.microsoft.azure.documentdb.internal.Paths;
+import com.microsoft.azure.documentdb.internal.QueryCompatibilityMode;
+import com.microsoft.azure.documentdb.internal.ResourceType;
+import com.microsoft.azure.documentdb.internal.RuntimeConstants;
+import com.microsoft.azure.documentdb.internal.SessionContainer;
+import com.microsoft.azure.documentdb.internal.UserAgentContainer;
+import com.microsoft.azure.documentdb.internal.Utils;
+import com.microsoft.azure.documentdb.internal.routing.ClientCollectionCache;
+import com.microsoft.azure.documentdb.rx.AsyncDocumentClient;
+
+import io.netty.buffer.ByteBuf;
+import io.reactivex.netty.RxNetty;
+import io.reactivex.netty.channel.RxEventLoopProvider;
+import io.reactivex.netty.channel.SingleNioLoopProvider;
+import io.reactivex.netty.client.RxClient.ClientConfig;
+import io.reactivex.netty.pipeline.ssl.DefaultFactories;
+import io.reactivex.netty.protocol.http.client.HttpClient;
+import io.reactivex.netty.protocol.http.client.HttpClientBuilder;
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+import rx.internal.util.RxThreadFactory;
+import rx.schedulers.Schedulers;
+
+public class RxDocumentClientImpl implements AsyncDocumentClient {
+
+ private final static int MAX_COLLECTION_CACHE_CONCURRENCY = 10;
+ private final Logger logger = LoggerFactory.getLogger(RxDocumentClientImpl.class);
+ private final String masterKey;
+ private final ExecutorService collectionCacheExecutorService;
+ private final URI serviceEndpoint;
+ private final ConnectionPolicy connectionPolicy;
+ private final SessionContainer sessionContainer;
+ private final ConsistencyLevel consistencyLevel;
+ private final BaseAuthorizationTokenProvider authorizationTokenProvider;
+ private final ClientCollectionCache collectionCache;
+ private final RxGatewayStoreModel gatewayProxy;
+ private final RxWrapperDocumentClientImpl rxWrapperClient;
+ private final Scheduler computationScheduler;
+ private Map resourceTokens;
+ /**
+ * Compatibility mode: Allows to specify compatibility mode used by client
+ * when making query requests. Should be removed when application/sql is no
+ * longer supported.
+ */
+ private final QueryCompatibilityMode queryCompatibilityMode = QueryCompatibilityMode.Default;
+ private final HttpClient rxClient;
+ private final EndpointManager globalEndpointManager;
+ private final ExecutorService computationExecutor;
+
+ public RxDocumentClientImpl(URI serviceEndpoint, String masterKey, ConnectionPolicy connectionPolicy,
+ ConsistencyLevel consistencyLevel, int eventLoopSize, int computationPoolSize) {
+
+ logger.info("Initializing DocumentClient with"
+ + " serviceEndpoint [{}], ConnectionPolicy [{}], ConsistencyLevel [{}]",
+ serviceEndpoint, connectionPolicy, consistencyLevel);
+
+ this.masterKey = masterKey;
+ this.serviceEndpoint = serviceEndpoint;
+
+ if (connectionPolicy != null) {
+ this.connectionPolicy = connectionPolicy;
+ } else {
+ this.connectionPolicy = new ConnectionPolicy();
+ }
+
+ this.sessionContainer = new SessionContainer(this.serviceEndpoint.getHost());
+ this.consistencyLevel = consistencyLevel;
+
+ UserAgentContainer userAgentContainer = new UserAgentContainer(Constants.Versions.SDK_NAME, Constants.Versions.SDK_VERSION);
+ String userAgentSuffix = this.connectionPolicy.getUserAgentSuffix();
+ if (userAgentSuffix != null && userAgentSuffix.length() > 0) {
+ userAgentContainer.setSuffix(userAgentSuffix);
+ }
+
+ if (eventLoopSize <= 0) {
+ int cpuCount = Runtime.getRuntime().availableProcessors();
+ if (cpuCount >= 4) {
+ // do authentication token generation on a scheduler
+ computationPoolSize = (cpuCount / 4);
+ eventLoopSize = cpuCount - computationPoolSize;
+ } else {
+ // do authentication token generation on subscription thread
+ computationPoolSize = 0;
+ eventLoopSize = cpuCount;
+ }
+ logger.debug("Auto configuring eventLoop size and computation pool size. CPU cores {[]}, eventLoopSize [{}], computationPoolSize [{}]",
+ cpuCount, eventLoopSize, computationPoolSize);
+ }
+
+ logger.debug("EventLoop size [{}]", eventLoopSize);
+
+ synchronized (RxDocumentClientImpl.class) {
+ SingleNioLoopProvider rxEventLoopProvider = new SingleNioLoopProvider(1, eventLoopSize);
+ RxEventLoopProvider oldEventLoopProvider = RxNetty.useEventLoopProvider(rxEventLoopProvider);
+ this.rxClient = httpClientBuilder().build();
+ RxNetty.useEventLoopProvider(oldEventLoopProvider);
+ }
+
+ if (computationPoolSize > 0) {
+ logger.debug("Intensive computation configured on a computation scheduler backed by thread pool size [{}]", computationPoolSize);
+ this.computationExecutor = new ThreadPoolExecutor(computationPoolSize, computationPoolSize,
+ 0L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue(2),
+ new RxThreadFactory("rxdocdb-computation"), new CallerRunsPolicy());
+
+ this.computationScheduler = Schedulers.from(this.computationExecutor);
+ } else {
+ logger.debug("Intensive computation configured on the subscription thread");
+ this.computationExecutor = null;
+ this.computationScheduler = Schedulers.immediate();
+ }
+
+ this.authorizationTokenProvider = new BaseAuthorizationTokenProvider(this.masterKey);
+ this.collectionCacheExecutorService = new ThreadPoolExecutor(1, MAX_COLLECTION_CACHE_CONCURRENCY, 10,
+ TimeUnit.MINUTES, new ArrayBlockingQueue(MAX_COLLECTION_CACHE_CONCURRENCY, true),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+
+ this.collectionCache = BridgeInternal.createClientCollectionCache(this, collectionCacheExecutorService);
+
+ this.globalEndpointManager = BridgeInternal.createGlobalEndpointManager(this);
+
+ this.gatewayProxy = new RxGatewayStoreModel(this.connectionPolicy, consistencyLevel, this.queryCompatibilityMode,
+ this.masterKey, this.resourceTokens, userAgentContainer, this.globalEndpointManager, this.rxClient);
+
+ this.rxWrapperClient = new RxWrapperDocumentClientImpl(
+ new DocumentClient(serviceEndpoint.toString(), masterKey, connectionPolicy, consistencyLevel));
+
+ // If DirectHttps mode is configured in AsyncDocumentClient.Builder we fallback
+ // to RxWrapperDocumentClientImpl. So we should never get here
+
+ if (this.connectionPolicy.getConnectionMode() == ConnectionMode.DirectHttps) {
+ throw new UnsupportedOperationException("Direct Https is not supported");
+ }
+ }
+
+ private HttpClientBuilder httpClientBuilder() {
+ HttpClientBuilder builder = RxNetty
+ .newHttpClientBuilder(this.serviceEndpoint.getHost(), this.serviceEndpoint.getPort())
+ .withSslEngineFactory(DefaultFactories.trustAll()).withMaxConnections(connectionPolicy.getMaxPoolSize())
+ .withIdleConnectionsTimeoutMillis(this.connectionPolicy.getIdleConnectionTimeout() * 1000);
+
+ ClientConfig config = new ClientConfig.Builder()
+ .readTimeout(connectionPolicy.getRequestTimeout(), TimeUnit.SECONDS).build();
+ return builder.config(config);
+ }
+
+ @Override
+ public URI getServiceEndpoint() {
+ return this.serviceEndpoint;
+ }
+
+ @Override
+ public URI getWriteEndpoint() {
+ return this.globalEndpointManager.getWriteEndpoint();
+ }
+
+ @Override
+ public URI getReadEndpoint() {
+ return this.globalEndpointManager.getReadEndpoint();
+ }
+
+ @Override
+ public ConnectionPolicy getConnectionPolicy() {
+ return this.connectionPolicy;
+ }
+
+ @Override
+ public Observable> createDatabase(Database database, RequestOptions options) {
+
+ return Observable.defer(() -> {
+ try {
+
+ if (database == null) {
+ throw new IllegalArgumentException("Database");
+ }
+
+ logger.debug("Creating a Database. id: [{}]", database.getId());
+ validateResource(database);
+
+ Map requestHeaders = this.getRequestHeaders(options);
+ RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Create,
+ ResourceType.Database, Paths.DATABASES_ROOT, database, requestHeaders);
+
+ return this.doCreate(request).map(response -> toResourceResponse(response, Database.class));
+ } catch (Exception e) {
+ logger.debug("Failure in creating a database. due to [{}]", e.getMessage(), e);
+ return Observable.error(e);
+ }
+ });
+ }
+
+ @Override
+ public Observable> deleteDatabase(String databaseLink, RequestOptions options) {
+ return Observable.defer(() -> {
+ try {
+ if (StringUtils.isEmpty(databaseLink)) {
+ throw new IllegalArgumentException("databaseLink");
+ }
+
+ logger.debug("Deleting a Database. databaseLink: [{}]", databaseLink);
+ String path = Utils.joinPath(databaseLink, null);
+ Map requestHeaders = this.getRequestHeaders(options);
+ RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Delete,
+ ResourceType.Database, path, requestHeaders);
+
+ return this.doDelete(request).map(response -> toResourceResponse(response, Database.class));
+ } catch (Exception e) {
+ logger.debug("Failure in deleting a database. due to [{}]", e.getMessage(), e);
+ return Observable.error(e);
+ }
+ });
+ }
+
+ @Override
+ public Observable> readDatabase(String databaseLink, RequestOptions options) {
+
+ return Observable.defer(() -> {
+ try {
+ if (StringUtils.isEmpty(databaseLink)) {
+ throw new IllegalArgumentException("databaseLink");
+ }
+
+ logger.debug("Reading a Database. databaseLink: [{}]", databaseLink);
+ String path = Utils.joinPath(databaseLink, null);
+ Map requestHeaders = this.getRequestHeaders(options);
+ RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Read,
+ ResourceType.Database, path, requestHeaders);
+
+ return this.doRead(request).map(response -> toResourceResponse(response, Database.class));
+ } catch (Exception e) {
+ logger.debug("Failure in reading a database. due to [{}]", e.getMessage(), e);
+ return Observable.error(e);
+ }
+ });
+ }
+
+ @Override
+ public Observable> readDatabases(FeedOptions options) {
+ return this.rxWrapperClient.readDatabases(options);
+ }
+
+ @Override
+ public Observable> queryDatabases(String query, FeedOptions options) {
+ return this.rxWrapperClient.queryDatabases(query, options);
+ }
+
+ @Override
+ public Observable> queryDatabases(SqlQuerySpec querySpec, FeedOptions options) {
+ return this.rxWrapperClient.queryDatabases(querySpec, options);
+ }
+
+ @Override
+ public Observable> createCollection(String databaseLink,
+ DocumentCollection collection, RequestOptions options) {
+
+ return Observable.defer(() -> {
+ try {
+ if (StringUtils.isEmpty(databaseLink)) {
+ throw new IllegalArgumentException("databaseLink");
+ }
+ if (collection == null) {
+ throw new IllegalArgumentException("collection");
+ }
+
+ logger.debug("Creating a Collection. databaseLink: [{}], Collection id: [{}]", databaseLink,
+ collection.getId());
+ validateResource(collection);
+
+ String path = Utils.joinPath(databaseLink, Paths.COLLECTIONS_PATH_SEGMENT);
+ Map requestHeaders = this.getRequestHeaders(options);
+ RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Create,
+ ResourceType.DocumentCollection, path, collection, requestHeaders);
+ return this.doCreate(request).map(response -> toResourceResponse(response, DocumentCollection.class));
+ } catch (Exception e) {
+ logger.debug("Failure in creating a collection. due to [{}]", e.getMessage(), e);
+ return Observable.error(e);
+ }
+ });
+ }
+
+ @Override
+ public Observable> replaceCollection(DocumentCollection collection,
+ RequestOptions options) {
+ return Observable.defer(() -> {
+ try {
+ if (collection == null) {
+ throw new IllegalArgumentException("collection");
+ }
+
+ logger.debug("Replacing a Collection. id: [{}]", collection.getId());
+ validateResource(collection);
+
+ String path = Utils.joinPath(collection.getSelfLink(), null);
+ Map requestHeaders = this.getRequestHeaders(options);
+
+ RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Replace,
+ ResourceType.DocumentCollection, path, collection, requestHeaders);
+
+ return this.doReplace(request).map(response -> toResourceResponse(response, DocumentCollection.class));
+
+ } catch (Exception e) {
+ logger.debug("Failure in replacing a collection. due to [{}]", e.getMessage(), e);
+ return Observable.error(e);
+ }
+ });
+ }
+
+ @Override
+ public Observable> deleteCollection(String collectionLink,
+ RequestOptions options) {
+ return Observable.defer(() -> {
+ try {
+ if (StringUtils.isEmpty(collectionLink)) {
+ throw new IllegalArgumentException("collectionLink");
+ }
+
+ logger.debug("Deleting a Collection. collectionLink: [{}]", collectionLink);
+ String path = Utils.joinPath(collectionLink, null);
+ Map requestHeaders = this.getRequestHeaders(options);
+ RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Delete,
+ ResourceType.DocumentCollection, path, requestHeaders);
+ return this.doDelete(request).map(response -> toResourceResponse(response, DocumentCollection.class));
+
+ } catch (Exception e) {
+ logger.debug("Failure in deleting a collection, due to [{}]", e.getMessage(), e);
+ return Observable.error(e);
+ }
+ });
+ }
+
+ private Observable doDelete(RxDocumentServiceRequest request)
+ throws DocumentClientException {
+
+ Observable responseObservable = Observable.defer(() -> {
+ try {
+ return this.gatewayProxy.doDelete(request).doOnNext(response -> {
+ if (request.getResourceType() != ResourceType.DocumentCollection) {
+ this.captureSessionToken(request, response);
+ } else {
+ this.clearToken(request, response);
+ }
+ });
+ } catch (Exception e) {
+ return Observable.error(e);
+ }
+ }).retryWhen(createExecuteRequestRetryHandler(request));
+
+ return createPutMoreContentObservable(request, HttpConstants.HttpMethods.DELETE)
+ .doOnNext(req -> this.applySessionToken(request))
+ .flatMap(req -> responseObservable);
+ }
+
+ private Observable doRead(RxDocumentServiceRequest request)
+ throws DocumentClientException {
+
+ Observable responseObservable = Observable.defer(() -> {
+ try {
+ return this.gatewayProxy.processMessage(request).doOnNext(response -> {
+ this.captureSessionToken(request, response);
+ });
+ } catch (Exception e) {
+ return Observable.error(e);
+ }
+ }).retryWhen(createExecuteRequestRetryHandler(request));
+
+ return createPutMoreContentObservable(request, HttpConstants.HttpMethods.GET)
+ .doOnNext(req -> this.applySessionToken(request))
+ .flatMap(req -> responseObservable);
+ }
+
+ @Override
+ public Observable> readCollection(String collectionLink,
+ RequestOptions options) {
+
+ return Observable.defer(() -> {
+ // we are using an observable factory here
+ // observable will be created fresh upon subscription
+ // this is to ensure we capture most up to date information (e.g.,
+ // session)
+ try {
+ if (StringUtils.isEmpty(collectionLink)) {
+ throw new IllegalArgumentException("collectionLink");
+ }
+
+ logger.debug("Reading a Collection. collectionLink: [{}]", collectionLink);
+ String path = Utils.joinPath(collectionLink, null);
+ Map requestHeaders = this.getRequestHeaders(options);
+ RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Read,
+ ResourceType.DocumentCollection, path, requestHeaders);
+
+ return this.doRead(request).map(response -> toResourceResponse(response, DocumentCollection.class));
+ } catch (Exception e) {
+ // this is only in trace level to capture what's going on
+ logger.debug("Failure in reading a collection, due to [{}]", e.getMessage(), e);
+ return Observable.error(e);
+ }
+ });
+ }
+
+ @Override
+ public Observable> readCollections(String databaseLink, FeedOptions options) {
+ return this.rxWrapperClient.readCollections(databaseLink, options);
+ }
+
+ @Override
+ public Observable> queryCollections(String databaseLink, String query,
+ FeedOptions options) {
+ return this.rxWrapperClient.queryCollections(databaseLink, query, options);
+ }
+
+ @Override
+ public Observable> queryCollections(String databaseLink,
+ SqlQuerySpec querySpec, FeedOptions options) {
+ return this.rxWrapperClient.queryCollections(databaseLink, querySpec, options);
+ }
+
+ private String getTargetDocumentCollectionLink(String collectionLink, Object document) {
+ if (StringUtils.isEmpty(collectionLink)) {
+ throw new IllegalArgumentException("collectionLink");
+ }
+ if (document == null) {
+ throw new IllegalArgumentException("document");
+ }
+
+ String documentCollectionLink = collectionLink;
+ if (Utils.isDatabaseLink(collectionLink)) {
+
+ // TODO: not supported yet
+
+ // // Gets the partition resolver(if it exists) for the specified
+ // database link
+ // PartitionResolver partitionResolver =
+ // this.getPartitionResolver(collectionLink);
+ //
+ // // If the partition resolver exists, get the collection to which
+ // the Create/Upsert should be directed using the partition key
+ // if (partitionResolver != null) {
+ // documentCollectionLink =
+ // partitionResolver.resolveForCreate(document);
+ // } else {
+ // throw new
+ // IllegalArgumentException(PartitionResolverErrorMessage);
+ // }
+ }
+
+ return documentCollectionLink;
+ }
+
+ private static void validateResource(Resource resource) {
+ BridgeInternal.validateResource(resource);
+ }
+
+ private Map getRequestHeaders(RequestOptions options) {
+ return BridgeInternal.getRequestHeaders(options);
+ }
+
+ private void addPartitionKeyInformation(RxDocumentServiceRequest request, Document document, RequestOptions options,
+ DocumentCollection collection) {
+ BridgeInternal.addPartitionKeyInformation(request, document, options, collection);
+ }
+
+ private RxDocumentServiceRequest getCreateDocumentRequest(String documentCollectionLink, Object document,
+ RequestOptions options, boolean disableAutomaticIdGeneration, OperationType operationType) {
+
+ if (StringUtils.isEmpty(documentCollectionLink)) {
+ throw new IllegalArgumentException("documentCollectionLink");
+ }
+ if (document == null) {
+ throw new IllegalArgumentException("document");
+ }
+
+ Document typedDocument = documentFromObject(document);
+
+ RxDocumentClientImpl.validateResource(typedDocument);
+
+ if (typedDocument.getId() == null && !disableAutomaticIdGeneration) {
+ // We are supposed to use GUID. Basically UUID is the same as GUID
+ // when represented as a string.
+ typedDocument.setId(UUID.randomUUID().toString());
+ }
+ String path = Utils.joinPath(documentCollectionLink, Paths.DOCUMENTS_PATH_SEGMENT);
+ Map requestHeaders = this.getRequestHeaders(options);
+
+ RxDocumentServiceRequest request = RxDocumentServiceRequest.create(operationType, ResourceType.Document, path,
+ typedDocument, requestHeaders);
+
+ // NOTE: if the collection is not currently cached this will be a
+ // blocking call
+ DocumentCollection collection = this.collectionCache.resolveCollection(request);
+
+ this.addPartitionKeyInformation(request, typedDocument, options, collection);
+ return request;
+ }
+
+ private void putMoreContentIntoDocumentServiceRequest(RxDocumentServiceRequest request, String httpMethod) {
+ if (this.masterKey != null) {
+ final Date currentTime = new Date();
+ final SimpleDateFormat sdf = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz", Locale.US);
+ sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+ String xDate = sdf.format(currentTime);
+
+ request.getHeaders().put(HttpConstants.HttpHeaders.X_DATE, xDate);
+ }
+
+ if (this.masterKey != null || this.resourceTokens != null) {
+ String resourceName = request.getResourceFullName();
+ String authorization = this.getAuthorizationToken(resourceName, request.getPath(),
+ request.getResourceType(), httpMethod, request.getHeaders(), this.masterKey, this.resourceTokens);
+ try {
+ authorization = URLEncoder.encode(authorization, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new IllegalStateException("Failed to encode authtoken.", e);
+ }
+ request.getHeaders().put(HttpConstants.HttpHeaders.AUTHORIZATION, authorization);
+ }
+
+ if ((HttpConstants.HttpMethods.POST.equals(httpMethod) || HttpConstants.HttpMethods.PUT.equals(httpMethod))
+ && !request.getHeaders().containsKey(HttpConstants.HttpHeaders.CONTENT_TYPE)) {
+ request.getHeaders().put(HttpConstants.HttpHeaders.CONTENT_TYPE, RuntimeConstants.MediaTypes.JSON);
+ }
+
+ if (!request.getHeaders().containsKey(HttpConstants.HttpHeaders.ACCEPT)) {
+ request.getHeaders().put(HttpConstants.HttpHeaders.ACCEPT, RuntimeConstants.MediaTypes.JSON);
+ }
+ }
+
+ private String getAuthorizationToken(String resourceOrOwnerId, String path, ResourceType resourceType,
+ String requestVerb, Map headers, String masterKey, Map resourceTokens) {
+ if (masterKey != null) {
+ return this.authorizationTokenProvider.generateKeyAuthorizationSignature(requestVerb, resourceOrOwnerId, resourceType, headers);
+ } else {
+ assert resourceTokens != null;
+ return this.authorizationTokenProvider.getAuthorizationTokenUsingResourceTokens(resourceTokens, path, resourceOrOwnerId);
+ }
+ }
+
+ private void applySessionToken(RxDocumentServiceRequest request) {
+ Map headers = request.getHeaders();
+ if (headers != null && !StringUtils.isEmpty(headers.get(HttpConstants.HttpHeaders.SESSION_TOKEN))) {
+ return; // User is explicitly controlling the session.
+ }
+
+ String requestConsistency = request.getHeaders().get(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL);
+ boolean sessionConsistency = this.consistencyLevel == ConsistencyLevel.Session ||
+ (!StringUtils.isEmpty(requestConsistency) && StringUtils.equalsIgnoreCase(requestConsistency, ConsistencyLevel.Session.toString()));
+ if (!sessionConsistency) {
+ return; // Only apply the session token in case of session consistency
+ }
+
+ // Apply the ambient session.
+ if (!StringUtils.isEmpty(request.getResourceAddress())) {
+ String sessionToken = this.sessionContainer.resolveSessionToken(request);
+
+ if (!StringUtils.isEmpty(sessionToken)) {
+ headers.put(HttpConstants.HttpHeaders.SESSION_TOKEN, sessionToken);
+ }
+ }
+ }
+
+ void captureSessionToken(RxDocumentServiceRequest request, DocumentServiceResponse response) {
+ this.sessionContainer.setSessionToken(request, response);
+ }
+
+ void clearToken(RxDocumentServiceRequest request, DocumentServiceResponse response) {
+ this.sessionContainer.clearToken(request);
+ }
+
+ private Observable doCreate(RxDocumentServiceRequest request) {
+
+ Observable responseObservable =
+ Observable.defer(() -> {
+ try {
+ return this.gatewayProxy.processMessage(request)
+ .doOnNext(response -> {
+ this.captureSessionToken(request, response);
+ });
+ } catch (Exception e) {
+ return Observable.error(e);
+ }
+ })
+ .retryWhen(createExecuteRequestRetryHandler(request));
+
+ return createPutMoreContentObservable(request, HttpConstants.HttpMethods.POST)
+ .doOnNext(r -> applySessionToken(request)).flatMap(req -> responseObservable);
+
+ }
+
+ /**
+ * Creates an observable which does the CPU intensive operation of generating authentication token and putting more content in the request
+ *
+ * This observable runs on computationScheduler
+ * @param request
+ * @param method
+ * @return
+ */
+ private Observable