<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Processing-Query-Results-as-a-Stream-of-Records" data-toc-modified-id="Processing-Query-Results-as-a-Stream-of-Records-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Processing Query Results as a Stream of Records</a></span><ul class="toc-item"><li><span><a href="#Introduction" data-toc-modified-id="Introduction-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Introduction</a></span></li><li><span><a href="#Prerequisites" data-toc-modified-id="Prerequisites-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Prerequisites</a></span></li><li><span><a href="#Setup" data-toc-modified-id="Setup-1.3"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>Setup</a></span><ul class="toc-item"><li><span><a href="#Ensure-database-is-running" data-toc-modified-id="Ensure-database-is-running-1.3.1"><span class="toc-item-num">1.3.1&nbsp;&nbsp;</span>Ensure database is running</a></span></li><li><span><a href="#Download-and-install-additional-components." data-toc-modified-id="Download-and-install-additional-components.-1.3.2"><span class="toc-item-num">1.3.2&nbsp;&nbsp;</span>Download and install additional components.</a></span></li><li><span><a href="#Initialize-Client" data-toc-modified-id="Initialize-Client-1.3.3"><span class="toc-item-num">1.3.3&nbsp;&nbsp;</span>Initialize Client</a></span><ul class="toc-item"><li><span><a href="#Initialize-event-loops-for-async-processing-mode" data-toc-modified-id="Initialize-event-loops-for-async-processing-mode-1.3.3.1"><span class="toc-item-num">1.3.3.1&nbsp;&nbsp;</span>Initialize event loops for async processing mode</a></span></li><li><span><a href="#Initialize-client-with-event-loops" data-toc-modified-id="Initialize-client-with-event-loops-1.3.3.2"><span class="toc-item-num">1.3.3.2&nbsp;&nbsp;</span>Initialize client with event loops</a></span></li></ul></li><li><span><a href="#Includes-and-Constants" data-toc-modified-id="Includes-and-Constants-1.3.4"><span class="toc-item-num">1.3.4&nbsp;&nbsp;</span>Includes and Constants</a></span></li><li><span><a href="#Populate-Test-Data." data-toc-modified-id="Populate-Test-Data.-1.3.5"><span class="toc-item-num">1.3.5&nbsp;&nbsp;</span>Populate Test Data.</a></span></li><li><span><a href="#Create-Indexes" data-toc-modified-id="Create-Indexes-1.3.6"><span class="toc-item-num">1.3.6&nbsp;&nbsp;</span>Create Indexes</a></span><ul class="toc-item"><li><span><a href="#Create-Secondary-Index" data-toc-modified-id="Create-Secondary-Index-1.3.6.1"><span class="toc-item-num">1.3.6.1&nbsp;&nbsp;</span>Create Secondary Index</a></span></li><li><span><a href="#Create-Set-Index" data-toc-modified-id="Create-Set-Index-1.3.6.2"><span class="toc-item-num">1.3.6.2&nbsp;&nbsp;</span>Create Set Index</a></span></li></ul></li></ul></li><li><span><a href="#Define-Convenience-Functions" data-toc-modified-id="Define-Convenience-Functions-1.4"><span class="toc-item-num">1.4&nbsp;&nbsp;</span>Define Convenience Functions</a></span></li></ul></li><li><span><a href="#Issue:-After-Key-Cursor-Does-Not-Behave-As-Expected-with-Sindex-Query" data-toc-modified-id="Issue:-After-Key-Cursor-Does-Not-Behave-As-Expected-with-Sindex-Query-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Issue: After-Key Cursor Does Not Behave As Expected with Sindex Query</a></span><ul class="toc-item"><li><span><a href="#After-key-with-Sindex-Query" data-toc-modified-id="After-key-with-Sindex-Query-2.1"><span class="toc-item-num">2.1&nbsp;&nbsp;</span>After-key with Sindex Query</a></span></li><li><span><a href="#After-key-with-Set-Index-and-Primary-Index-Queries" data-toc-modified-id="After-key-with-Set-Index-and-Primary-Index-Queries-2.2"><span class="toc-item-num">2.2&nbsp;&nbsp;</span>After-key with Set Index and Primary Index Queries</a></span></li></ul></li><li><span><a href="#Issue:-All-records-must-be-read-in-sync-mode-for-cursors-to-behave-correctly-for-the-next-retrieval." data-toc-modified-id="Issue:-All-records-must-be-read-in-sync-mode-for-cursors-to-behave-correctly-for-the-next-retrieval.-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Issue: All records must be read in sync mode for cursors to behave correctly for the next retrieval.</a></span></li><li><span><a href="#Issue:-Even-when-all-records-are-read-from-the-stream,-it-may-not-work-when-cursors-are-set-in-another-query." data-toc-modified-id="Issue:-Even-when-all-records-are-read-from-the-stream,-it-may-not-work-when-cursors-are-set-in-another-query.-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Issue: Even when all records are read from the stream, it may not work when cursors are set in another query.</a></span></li></ul></div>

# Processing Query Results as a Stream of Records
This tutorial shows processing of query results as a stream of records, and related capabilities. 

This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the [Aerospike Notebooks Repo](https://github.com/aerospike-examples/interactive-notebooks).

## Introduction
The notebook shows how to:
- process query results as a stream of records, 
- paginate over results,  
- partition a query for parallelism, and
- resume query execution at a later time.

Please refer to the adjunct blog post [Working with Query Result Streams](https://developer.aerospike.com/blog/query_streams) for additional discussion.

## Prerequisites
This tutorial assumes familiarity with the following topics:
- [Hello World](hello_world.ipynb)

## Setup

### Ensure database is running
This notebook requires that Aerospike database is running. 

In [1]:
import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);
%sh asd

### Download and install additional components.
Install the Java client.

In [2]:
%%loadFromPOM
<dependencies>
  <dependency>
    <groupId>com.aerospike</groupId>
    <artifactId>aerospike-client</artifactId>
    <version>6.1.0</version>
  </dependency>
</dependencies>

### Initialize Client
Initialize the client that can be used for both sync and async processing modes.

#### Initialize event loops for async processing mode
We will use async processing using NIO event loops, but the other event loop types may also be used. The event loops initialization is needed only if asynchronous API calls are used.

In [3]:
import java.util.concurrent.atomic.AtomicInteger;
import com.aerospike.client.async.EventPolicy;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.async.EventLoop;
import com.aerospike.client.async.Throttles;
import com.aerospike.client.async.Monitor;
import com.aerospike.client.async.NioEventLoops;
import com.aerospike.client.listener.RecordSequenceListener;

// initialize event loops 
final int NumLoops = 2;
final int CommandsPerEventLoop = 50;
final int DelayQueueSize = 50;

EventPolicy eventPolicy = new EventPolicy();
eventPolicy.maxCommandsInProcess = CommandsPerEventLoop;
eventPolicy.maxCommandsInQueue = DelayQueueSize;
EventLoops eventLoops = new NioEventLoops(eventPolicy, NumLoops);

// initialize event loop throttles
Throttles throttles = new Throttles(NumLoops, CommandsPerEventLoop);

System.out.format("Throttles initialized for %s loops with %s concurrent operations per loop.\n", 
                    NumLoops, CommandsPerEventLoop);;


Throttles initialized for 2 loops with 50 concurrent operations per loop.


#### Initialize client with event loops

In [4]:
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Host;
import com.aerospike.client.policy.ClientPolicy;

ClientPolicy clientPolicy = new ClientPolicy();

// needed only if async apis are used
clientPolicy.eventLoops = eventLoops;
int concurrentMax = CommandsPerEventLoop * NumLoops;
if (clientPolicy.maxConnsPerNode < concurrentMax) {
   clientPolicy.maxConnsPerNode = concurrentMax; 
}

// initialize the client 
Host[] hosts = Host.parseHosts("localhost", 3000); 
AerospikeClient client = new AerospikeClient(clientPolicy, hosts);

System.out.println("Initialized the client and connected to the cluster.");;

Initialized the client and connected to the cluster.


### Includes and Constants

In [5]:
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.Filter;
import com.aerospike.client.query.PartitionFilter;
import com.aerospike.client.query.PartitionStatus;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.Statement;
import com.aerospike.client.Record;
import com.aerospike.client.exp.Exp;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.task.IndexTask;
import com.aerospike.client.ResultCode;
import com.aerospike.client.Operation;
import com.aerospike.client.Value;

final String Namespace = "test";
final String SetIndexed = "indexed";
final String SetUnindexed = "unindexed";
final String KeyPrefix = "id-";
final Integer NumRecords = 10000; 


### Populate Test Data.
The test data consists of NumRecords records in each set, each with a user key "id-\<i\>", an integer bin "bin1" with value i, and another integer bin with value 10*i, where 1 \<= i \<= NumRecords. 

The set SetIndexed has a set index and an integer secondary index on "bin1". The set SetUnindexed has no set or secondary index, and is used to illustrate primary index query functionality.

In [6]:
// convenience function to truncate test data
void truncateTestData() {
    try {
        client.truncate(null, Namespace, null, null);
    }
    catch (AerospikeException e) {
        // ignore
    }
}

// convenience function to initialize test data
void initializeTestData() {
    truncateTestData();
    WritePolicy wPolicy = new WritePolicy(client.writePolicyDefault);
    wPolicy.sendKey = true;
    for (int i=0; i < NumRecords; i++) {
        Bin bin1 = new Bin("bin1", i+1);
        Bin bin2 = new Bin("bin2", 10*(i+1));
        Key key1 = new Key(Namespace, SetIndexed, KeyPrefix+(i+1));
        Key key2 = new Key(Namespace, SetUnindexed, KeyPrefix+(i+1));
        try {
            client.put(wPolicy, key1, bin1, bin2);
            client.put(wPolicy, key2, bin1, bin2);
        }
        catch (AerospikeException e) {
           System.out.format("%s", e);
        } 
    }
}
initializeTestData();
System.out.println("Test data populated.");;

Test data populated.


### Create Indexes
The system defined primary index already exists for the namespace. We will create a secondary index and a set index on the set SetIndexed in order to show a secondary index and set index query (scan) capabilities using this set.

The set SetUnindexed does not have a secondary or set index, which means a query (scan) of this set must use the primary index. We will use this set to show the primary index query (scan) capabilities.

#### Create Secondary Index

In [7]:
final String IndexName = "idx_indexed_bin1_number";

try {
    IndexTask task = client.createIndex(null, Namespace, SetIndexed, IndexName, 
                                        "bin1", IndexType.NUMERIC);
    task.waitTillComplete();
}
catch (AerospikeException ae) {
    if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {
        throw ae;
    }
}

System.out.format("Created index %s on ns=%s set=%s bin=%s.", 
                                    IndexName, Namespace, SetIndexed, "bin1");;

Created index idx_indexed_bin1_number on ns=test set=indexed bin=bin1.

#### Create Set Index

In [8]:
// Enable set index on the set 'indexed'.
%sh asinfo -v "set-config:context=namespace;id=test;set=indexed;enable-index=true"
System.out.println("Set index created on set 'indexed'.");;

Set index created on set 'indexed'.


## Define Convenience Functions
Define convenience functions to process results, which simply involves printing them. 

In [9]:
// a convenience function to process a record which simply prints its user key and bins
void processRecord(Key key, Record rec) {
    System.out.format("Record key: %s, bins: %s\n", key.userKey, rec.bins);             
}

// a convenience function to process results 
void processResults(RecordSet rs) {
    int recs = 0;
    try {
        while (rs.next()) {
            recs++;
            Key key = rs.getKey();
            Record rec = rs.getRecord();
            processRecord(key, rec);
        }
    }
    finally {
        rs.close();
    }  
}

# Issue: After-Key Cursor Does Not Behave As Expected with Sindex Query
A secondary index query results include the cursor record `after(key)`, which they should not. A set and primary index queries return partition records after the record (as expected).

Below, the after-key cursor is set in sindex, set index, and primary index queries. 

Each query should retrieve results in that one partition **after** the key. Note, however, that the sindex query results include the record with the key. This is a bug.


## After-key with Sindex Query
Note the "after-key" record with key="id-260" IS returned in the results. 

In [10]:
// set the cursor after a record  
QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed);
stmt.setMaxRecords(10);

System.out.format("Sindex query results after key id-260: \n");
stmt.setFilter(Filter.range("bin1", 1, 10000)); // entire range; range filter uses the secondary index on bin1
Key key = new Key(Namespace, SetIndexed, "id-260"); 
PartitionFilter pFilter = PartitionFilter.after(key);
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
// process record stream
processResults(rs);

Sindex query results after key id-260: 
Record key: id-260, bins: {bin1=260, bin2=2600}
Record key: id-2176, bins: {bin1=2176, bin2=21760}
Record key: id-2426, bins: {bin1=2426, bin2=24260}
Record key: id-4124, bins: {bin1=4124, bin2=41240}


## After-key with Set Index and Primary Index Queries
Note the "after-key" record with key="id-2176" or key="id-6208" is NOT returned in the results. 

In [11]:
// set index scan
QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed);
stmt.setMaxRecords(10);

System.out.format("Set index query results after key id-2176: \n");
stmt.setFilter(null);   // no filter specified, uses set index
Key key = new Key(Namespace, SetIndexed, "id-2176"); 
PartitionFilter pFilter = PartitionFilter.after(key);
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
// process record stream
processResults(rs);

// primary index scan
QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetUnindexed);  // no set index defined
stmt.setMaxRecords(100);

System.out.format("\nPrimary index query results after key id-3522: \n");
stmt.setFilter(null);   // no filter specified, uses primary index
Key key = new Key(Namespace, SetUnindexed, "id-3522"); 
PartitionFilter pFilter = PartitionFilter.after(key);
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
// process record stream
processResults(rs);


Set index query results after key id-2176: 
Record key: id-260, bins: {bin1=260, bin2=2600}
Record key: id-4124, bins: {bin1=4124, bin2=41240}
Record key: id-2426, bins: {bin1=2426, bin2=24260}

Primary index query results after key id-3522: 
Record key: id-2741, bins: {bin1=2741, bin2=27410}
Record key: id-209, bins: {bin1=209, bin2=2090}
Record key: id-5820, bins: {bin1=5820, bin2=58200}


# Issue: All records must be read in sync mode for cursors to behave correctly for the next retrieval. 

In [14]:
// count results instead of printing
void countResults(RecordSet rs, int max) {
    int recs = 0;
    try {
        while ((max == 0 || recs < max) && rs.next()) {
            recs++;
            Key key = rs.getKey();
            Record rec = rs.getRecord();
        }
    }
    finally {
        rs.close();
    }  
    System.out.format("Read %d records\n", recs);
}

In [15]:
int MAX_RECORDS = 7000;
int CHUNK_SIZE = 1000;
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed);
stmt.setMaxRecords(MAX_RECORDS);
QueryPolicy qPolicy = new QueryPolicy();

stmt.setFilter(Filter.range("bin1", 1, 10000));
PartitionFilter pFilter = PartitionFilter.all();

for (int i=1; i<=MAX_RECORDS/CHUNK_SIZE; i++) {
    System.out.format("Iteration %d \n", i);
    pFilter.setPartitions(null);
    RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
    countResults(rs, i*CHUNK_SIZE);

    rs = client.queryPartitions(qPolicy, stmt, pFilter);
    countResults(rs, 10000);
}

Iteration 1 
Read 1000 records
Read 7000 records
Iteration 2 
Read 2000 records
Read 7000 records
Iteration 3 
Read 3000 records
Read 6993 records
Iteration 4 
Read 4000 records
Read 5976 records
Iteration 5 
Read 5000 records
Read 4989 records
Iteration 6 
Read 6000 records
Read 3955 records
Iteration 7 
Read 7000 records
Read 3001 records


# Issue: Even when all records are read from the stream, it may not work when cursors are set in another query.
A query is partially processed using one query instance, and resumed using another, identical query instance. Getting cursors from one query and setting in another doesn't seem to work.

In [27]:
import org.apache.commons.lang3.SerializationUtils;

for (int i=1; i<=10; i++) {
    System.out.format("Iteration %d \n", i);

    QueryPolicy qPolicy = new QueryPolicy();
    Statement stmt = new Statement();
    stmt.setNamespace(Namespace);
    stmt.setSetName(SetIndexed);
    stmt.setFilter(Filter.range("bin1", 1, 10000));
    stmt.setMaxRecords(6000);
    PartitionFilter pFilter = PartitionFilter.all();
    pFilter.setPartitions(null);
    RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
    //PartitionStatus[] cursors = pFilter.getPartitions();
    countResults(rs, 6000);
    PartitionStatus[] cursors = pFilter.getPartitions();

    // resume the same query from a different query instance
    QueryPolicy qPolicy2 = new QueryPolicy();
    Statement stmt2 = new Statement();
    stmt2.setNamespace(Namespace);
    stmt2.setSetName(SetIndexed);
    stmt2.setFilter(Filter.range("bin1", 1, 10000));
    stmt2.setMaxRecords(6000);
    PartitionFilter pFilter2 = PartitionFilter.all();
    pFilter2.setPartitions(cursors);
    try {
        RecordSet rs2 = client.queryPartitions(qPolicy2, stmt2, pFilter2);
        countResults(rs2, 10000);
    }
    catch (AerospikeException ae) {
        System.out.format("Error %s \n", ae);
    }

}

Iteration 1 
Read 6000 records
Error com.aerospike.client.AerospikeException$InvalidNode: Error -3,1: No nodes were assigned 
Iteration 2 
Read 6000 records
Read 4000 records
Iteration 3 
Read 6000 records
Error com.aerospike.client.AerospikeException$InvalidNode: Error -3,1: No nodes were assigned 
Iteration 4 
Read 6000 records
Error com.aerospike.client.AerospikeException$InvalidNode: Error -3,1: No nodes were assigned 
Iteration 5 
Read 6000 records
Error com.aerospike.client.AerospikeException$InvalidNode: Error -3,1: No nodes were assigned 
Iteration 6 
Read 6000 records
Error com.aerospike.client.AerospikeException$InvalidNode: Error -3,1: No nodes were assigned 
Iteration 7 
Read 6000 records
Error com.aerospike.client.AerospikeException$InvalidNode: Error -3,1: No nodes were assigned 
Iteration 8 
Read 6000 records
Error com.aerospike.client.AerospikeException$InvalidNode: Error -3,1: No nodes were assigned 
Iteration 9 
Read 6000 records
Error com.aerospike.client.AerospikeEx