## Impala Clients Example

#### You can use the Impyla library to connect directly to an Impala Data Warehouse in the Cloudera Data Warehouse Service

#### Notice we are reusing the workload user and password variables we had set up for the phoenix client. Thanks to IDBroker, CDP will allow you to connect to different services using the same credentials - as long as your user and roles have been set at the environment/service level

#### To find the Impala host, navigate to your CDW virtual warehouse and copy the url from the "Copy JDBC URL button". 

![title](images/Impala_copyurl.png)

#### Paste the value in a notepad editor and cut the url string starting after the "//" and up to and including ".cloudera.site"

#### Finally, set this as an environment variable called "IMPALA_HOST"

You can connect to the virtual warehouse as shown below and issue queries

In [1]:
import os
import pandas as pd
from impala.dbapi import connect
from impala.util import as_pandas

IMPALA_PORT="443" 

conn = connect(host=os.environ["IMPALA_HOST"],
               port=IMPALA_PORT,
               auth_mechanism="LDAP",
               user=os.environ["WORKLOAD_USER"],
               password=os.environ["WORKLOAD_PASSWORD"],
               use_http_transport=True,
               http_path="/cliservice",
               use_ssl=True)

# Execute using SQL
# cursor = conn.cursor()
cursor = conn.cursor()
cursor.execute('show databases')
for row in cursor:
    print(row)

('_impala_builtins', 'System database for Impala builtin functions')
('default', 'Default Hive database')


Now we can load the other dataframe we are going to compare in notebook 3

In [2]:
import pandas as pd
df = pd.read_parquet("data/fake_df_r.parquet", engine="pyarrow")

In [3]:
query = """
        CREATE TABLE IF NOT EXISTS CML_WORKSHOP_TABLE_RIGHT (
        unique_id VARCHAR,
        first_name VARCHAR,
        surname VARCHAR,
        dob VARCHAR,
        city  VARCHAR,
        email VARCHAR,
        groupp VARCHAR)
        """

In [4]:
cursor.execute(query)

In [5]:
def upsert_linkage(data):

    sql = """insert into CML_WORKSHOP_TABLE_RIGHT \
         (unique_id ,first_name,surname,dob,city,email,groupp) \
         values (?,?,?,?,?,?,?)"""
    #print(data)
    cursor.executemany(sql,data)

In [6]:
def upsert_data(data, records=100):
    total_records=0
    header = True
    rows = []
    i=1

    for index, row in data.iterrows():

        rows.append ([f"{row['unique_id']}",\
                  f"{row['first_name']}",f"{row['surname']}",\
                  f"{row['dob']}",f"{row['city']}", \
                  f"{row['email']}", f"{row['group']}"])
        total_records=total_records+1

        if i < records + 1 :   
            i=i+1
        else :
            upsert_linkage(rows)
            rows = []
            i=1
            print (f"Ingested {total_records} records")

    if len(rows) > 0 :
        upsert_linkage(rows)

    print (f"Ingested {total_records} records")

In [7]:
upsert_data(df, records=100)

Ingested 101 records
Ingested 202 records
Ingested 303 records
Ingested 404 records
Ingested 505 records
Ingested 606 records
Ingested 707 records
Ingested 808 records
Ingested 819 records


#### Next, check that your insert has executed successfully

In [8]:
query = """
        SELECT COUNT(*) FROM CML_WORKSHOP_TABLE_RIGHT
        """
cursor.execute(query)

## The Jaydebeapi library is an effective alternative if you need to load data faster

You can also use the Ibis library. It has a richer set of options for interacting with Pandas dataframes.

In [9]:
def upsert_data_jaydebeapi(data, records=100):
    total_records=0
    header = True
    rows = []
    i=1

    for index, row in data.iterrows():

        rows.append ([f"{row['unique_id']}",\
                  f"{row['first_name']}",f"{row['surname']}",\
                  f"{row['dob']}",f"{row['city']}", \
                  f"{row['email']}", f"{row['group']}"])
        total_records=total_records+1

        if i < records + 1 :   
            i=i+1
        else :
            upsert_linkage(rows)
            rows = []
            i=1
            print (f"Ingested {total_records} records")

    if len(rows) > 0 :
        upsert_linkage(rows)

    print (f"Ingested {total_records} records")

In [10]:
import os
import jaydebeapi
conn = jaydebeapi.connect("com.cloudera.impala.jdbc.DataSource",
                          "jdbc:impala://"+os.environ["IMPALA_HOST"]+":443/;ssl=1;transportMode=http;httpPath=cliservice;AuthMech=3;",
                          {'UID': os.environ["WORKLOAD_USER"], 'PWD': os.environ["WORKLOAD_PASSWORD"]},
                          '/home/cdsw/impala_drivers/ImpalaJDBC41.jar')
cursor = conn.cursor()

upsert_data_jaydebeapi(df, records=100)

#curs.fetchall()

curs.close()
conn.close()

Ingested 101 records
Ingested 202 records
Ingested 303 records
Ingested 404 records
Ingested 505 records
Ingested 606 records
Ingested 707 records
Ingested 808 records
Ingested 819 records


NameError: name 'curs' is not defined