<h3 align="center" style="margin:0px">
    <img width="200" src="../_assets/images/logo_purple.png" alt="Kinetica Logo"/>
</h3>
<h5 align="center" style="margin:0px">
    <a href="https://www.kinetica.com/">Website</a>
    <span> | </span>
    <a href="https://docs.kinetica.com/7.2/">Docs</a>
    <span> | </span>
    <a href="https://docs.kinetica.com/7.2/api/">API Docs</a>
    <span> | </span>
    <a href="https://join.slack.com/t/kinetica-community/shared_invite/zt-1bt9x3mvr-uMKrXlSDXfy3oU~sKi84qg">Community Slack</a>   
</h5>

# Using GPUdbSqlIterator and GPUdbIngestor 

Learn about using `GPUdbSqlIterator` for conveniently retrieving large result sets and `GPUdbIngestor` for multi-head ingest.

## Overview

https://docs.kinetica.com/7.2/api/python/

The [GPUdbIngestor](https://docs.kinetica.com/7.2/api/python/frame/source/gpudbingestor.html#) class facilities high speed multi head ingest. In this example we will use the python `faker` package to generate test data. we will then bulk insert the data using `GPUdbIngestor`.

The `GPUdbSqlIterator` class makes retrieval of large result sets easy because it transparently fetches batches and handles generation of result tables. It is accessed through convenience functions and we will cover some use cases:

* Using `GPUdb.execute()` to create a table.
* Using `GPUdb.query_one()` to get the result of a group by.
* Using `GPUDb.query()` and `GPUdbSqlIterator` to iterate through results.
* Fetching a result with SQL parameters.

In [1]:
# install Kinetica package and dataframe dependencies
%pip install -U -q 'gpudb>=7.2' typeguard pandas pyarrow

# packages used in this notebook
%pip install -U -q 'gpudb>=7.2' faker

# install packages needed for Jupyter widgets
%pip install -U -q ipykernel ipywidgets

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


## Connect to Kinetica

In [2]:
from gpudb import GPUdb, GPUdbTable
import os

HOST = os.environ['KINETICA_URL']
USER = os.environ['KINETICA_USER']
PASSWORD = os.environ['KINETICA_PASSWD']

def create_kdbc(url: str, user: str, password: str) -> GPUdb:
    options = GPUdb.Options()
    options.username = user
    options.password = password
    options.skip_ssl_cert_verification = True
    options.disable_failover = True
    options.logging_level = 'INFO'
    kdbc = GPUdb(host=url, options = options)
    print(f"Connected to {kdbc.get_url()}. (version {str(kdbc.server_version)})")
    return kdbc

kdbc: GPUdb = create_kdbc(HOST, USER, PASSWORD)

TABLE_NAME = "python_dev_guide.test_users"



Connected to http://172.31.31.29:9191. (version 7.1.9.29)


## Loading data with GPUdbSqlIterator

### Use the faker package to create records

In [3]:
import faker
import datetime
from typing import Generator

def date_to_timestamp_ms(dt: datetime.date) -> int:
    """ Convert a date to a timestamp in milliseconds. """
    dts = datetime.datetime.fromordinal(dt.toordinal())
    ts = dts.timestamp()
    return int(ts) * 1000


def make_records(count: int) -> Generator:
    """ Create a fake record. """
    faker.Faker.seed(5467)
    faker_inst = faker.Faker(locale="en-US")

    for id in range(0, count):
        rec = dict(id=id, **faker_inst.simple_profile())
        rec["birthdate"] = date_to_timestamp_ms(rec["birthdate"])
        yield rec

list(make_records(3))

[{'id': 0,
  'username': 'eduardo69',
  'name': 'Haley Beck',
  'sex': 'F',
  'address': '59836 Carla Causeway Suite 939\nPort Eugene, IN 32487',
  'mail': 'meltondenise@yahoo.com',
  'birthdate': 873954000000},
 {'id': 1,
  'username': 'lbarrera',
  'name': 'Joshua Stephens',
  'sex': 'M',
  'address': '3108 Christina Forges\nPort Timothychester, KY 72980',
  'mail': 'erica80@hotmail.com',
  'birthdate': -1440702000000},
 {'id': 2,
  'username': 'bburton',
  'name': 'Paula Kaiser',
  'sex': 'F',
  'address': 'Unit 7405 Box 3052\nDPO AE 09858',
  'mail': 'timothypotts@gmail.com',
  'birthdate': -1145991600000}]

### execute(): Create the table

You can use GPUdb.execute() for SQL statements that return no data.

In [4]:
sql=f"""
CREATE OR REPLACE TABLE {TABLE_NAME}
(
    "id" INTEGER NOT NULL,
    "username" VARCHAR(32) NOT NULL,
    "name" VARCHAR(64) NOT NULL,
    "sex" VARCHAR(2) NOT NULL,
    "address" VARCHAR(64) NOT NULL,
    "mail" VARCHAR(64) NOT NULL,
    "birthdate" TIMESTAMP NOT NULL
);
"""
count_affected = kdbc.execute(sql)

# We can use teh  GPUdbTable to return the schema as a dataframe.
table = GPUdbTable(db=kdbc, name=TABLE_NAME)
table.type_as_df()

Unnamed: 0,name,type,properties
0,id,int,[data]
1,username,string,"[char32, data]"
2,name,string,"[char64, data]"
3,sex,string,"[char2, data]"
4,address,string,"[char64, data]"
5,mail,string,"[char64, data]"
6,birthdate,long,"[data, timestamp]"


### GPUdbIngestor: Bulk insert rows

See the [Python API docs](https://docs.kinetica.com/7.2/api/python/frame/source/gpudbingestor.html#) for more information about GPUdbIngestor.

> Note: If your client does not have direc access to all worker ports then set `use_head_node_only=True`

In [5]:
from gpudb import GPUdbWorkerList, GPUdbIngestor, GPUdbRecord
from tqdm.auto import tqdm

# number of rows to generate
NUM_ROWS = 1000

# Get the worker list. These are the Kinetica nodes that will be used for the insert
workers = GPUdbWorkerList(kdbc, use_head_node_only=False)
display(f"Workers: {workers.get_worker_urls()}")

# Create an ingestor
table_type = table.get_table_type()
ingestor = GPUdbIngestor(kdbc, 
                        table_name=table.name, 
                        batch_size = 1000, 
                        workers = workers, 
                        options = dict(),
                        record_type=table_type)

# generate a progress bar with tqdm
for rec in tqdm(make_records(NUM_ROWS), total=NUM_ROWS):
    g_rec = GPUdbRecord(table_type, rec)
    ingestor.insert_record(g_rec)
ingestor.flush()

print(f"Inserted rows: {ingestor.get_count_inserted()}")

"Workers: ['http://172.31.31.29:9192', 'http://172.31.31.29:9193']"

  0%|          | 0/1000 [00:00<?, ?it/s]

Inserted rows: 1000


## Retrieve results with GPUdbSqlIterator

### query(): Using the for clause

In [6]:
sql = f"select * from {TABLE_NAME} order by id limit 10"

for row in kdbc.query(sql):
    print(row)

[0, 'eduardo69', 'Haley Beck', 'F', '59836 Carla Causeway Suite 939\nPort Eugene, IN 32487', 'meltondenise@yahoo.com', 873954000000]
[1, 'lbarrera', 'Joshua Stephens', 'M', '3108 Christina Forges\nPort Timothychester, KY 72980', 'erica80@hotmail.com', -1440702000000]
[2, 'bburton', 'Paula Kaiser', 'F', 'Unit 7405 Box 3052\nDPO AE 09858', 'timothypotts@gmail.com', -1145991600000]
[3, 'melissa49', 'Wendy Reese', 'F', '6408 Christopher Hill Apt. 459\nNew Benjamin, NJ 15096', 'dadams@gmail.com', 586242000000]
[4, 'melissacarter', 'Manuel Rios', 'M', '2241 Bell Gardens Suite 723\nScottside, CA 38463', 'williamayala@gmail.com', -1231696800000]
[5, 'james26', 'Patricia Potter', 'F', '7977 Jonathan Meadow\nJerryside, OH 55205', 'jpatrick@gmail.com', 1207544400000]
[6, 'vanessavalentine', 'Anthony Simpson', 'M', '405 Mathew Island\nPort Rebecca, AZ 03104', 'kristineparker@hotmail.com', -722628000000]
[7, 'andersonbridget', 'Jonathan Payne', 'M', '78393 Williams Heights Apt. 776\nPort Lisa, GU 1

### query(): Using the with clause

In this example we use the with clause to get an instance of the iterator so we 
can access the fields `total_count` and `type_map` before starting the iteration. A progress bar is displayed as the rows are retrieved.

In [7]:
sql = f"select * from {TABLE_NAME} order by id limit 10"

with kdbc.query(sql) as sql_iter:
    # get columns and types
    print(f"Type Map: {sql_iter.type_map}")

    # generate a progress bar
    for rec in tqdm(iterable=sql_iter,
                total=sql_iter.total_count,
                desc='Fetching Records'):
        print(rec)


Type Map: {'id': 'int', 'username': 'char32', 'name': 'char64', 'sex': 'char2', 'address': 'char64', 'mail': 'char64', 'birthdate': 'timestamp'}


Fetching Records:   0%|          | 0/10 [00:00<?, ?it/s]

[0, 'eduardo69', 'Haley Beck', 'F', '59836 Carla Causeway Suite 939\nPort Eugene, IN 32487', 'meltondenise@yahoo.com', 873954000000]
[1, 'lbarrera', 'Joshua Stephens', 'M', '3108 Christina Forges\nPort Timothychester, KY 72980', 'erica80@hotmail.com', -1440702000000]
[2, 'bburton', 'Paula Kaiser', 'F', 'Unit 7405 Box 3052\nDPO AE 09858', 'timothypotts@gmail.com', -1145991600000]
[3, 'melissa49', 'Wendy Reese', 'F', '6408 Christopher Hill Apt. 459\nNew Benjamin, NJ 15096', 'dadams@gmail.com', 586242000000]
[4, 'melissacarter', 'Manuel Rios', 'M', '2241 Bell Gardens Suite 723\nScottside, CA 38463', 'williamayala@gmail.com', -1231696800000]
[5, 'james26', 'Patricia Potter', 'F', '7977 Jonathan Meadow\nJerryside, OH 55205', 'jpatrick@gmail.com', 1207544400000]
[6, 'vanessavalentine', 'Anthony Simpson', 'M', '405 Mathew Island\nPort Rebecca, AZ 03104', 'kristineparker@hotmail.com', -722628000000]
[7, 'andersonbridget', 'Jonathan Payne', 'M', '78393 Williams Heights Apt. 776\nPort Lisa, GU 1

### query_one(): Fetch a single row

If your result returns only one row there is a convenience function.

In [8]:
kdbc.query_one(sql = f"select count(1) from {TABLE_NAME}")

[1000]

### query_one(): passing SQL parameters

> Note: You can also use parameters with `query()` or `execute()`

In [9]:
kdbc.query_one(f"""select * from {TABLE_NAME} where id = $1""", 
                    sql_params=[3])

[3,
 'melissa49',
 'Wendy Reese',
 'F',
 '6408 Christopher Hill Apt. 459\nNew Benjamin, NJ 15096',
 'dadams@gmail.com',
 586242000000]