# Open Data Hub + Trino:  <br>  Access and Explore your Data

## Introduction
**Open Data Hub**, a platform for data scientists and developers of intelligent applications, supports the full Machine Learning lifecycle by providing a robust, scalable platform and a flexible, interactive environment for teams to do their work. 

**Trino** (formerly PrestoSQL) provides a secure and performant single point of access to all of your data without having to first copy or move it to a central repository. Trino focuses on the first, and often most difficult problem teams face when starting a new project -- the acquisition and preparation of data. With Trino, data scientists and developers will be able to quickly and easily combine data from multiple sources to perform comprehensive analyses for their organizations.

This demonstration illustrates how quickly a data scientist can access data and pull it into the Open Data Hub Jupyter environment using Trino.

## Installing Required Packages
Open Data Hub provides images loaded with popular open source data science packages. These are also the notebook images we use when building our own intelligent applications. 

While these images typically have everything we need, we can always layer in specific package requirements using `pip` and a `requirements.txt` file. 

## Environment Initialization

### Import packages and environment variables
Let's import the package we installed in the previous step and assign environment variables we included while spawning our notebook. This way, we won't accidentally expose sensitive connection information!

Lastly, we use these variables and the `trino.dbapi.connect` function to create our Connection object. 

In [None]:
import os

import pandas
import trino


TRINO_USERNAME = os.environ.get('TRINO_USERNAME', 'trino')
TRINO_PASSWORD = os.environ.get('TRINO_PASSWORD', 'trino')
TRINO_HOSTNAME = os.environ.get('TRINO_HOSTNAME', 'trino-service')
TRINO_PORT = os.environ.get('TRINO_PORT', '8080')

In [None]:
conn = trino.dbapi.connect(
    host=TRINO_HOSTNAME,
    port=TRINO_PORT,
    user=TRINO_USERNAME,
    http_scheme='http',
)

In [None]:
def get_sql(sql, connector):
    """Return pandas DataFrame."""
    
    cur = connector.cursor()
    cur.execute(sql)
    response = pandas.DataFrame(
        cur.fetchall(), columns=[c[0] for c in cur.description]
    )
    return response

## What is Trino and how does it work? 
Trino is an incredibly efficient layer sitting between the data consumer and our data sources. It brings our data together so we can query sources individually or join them together in ways that previously required extensive ETL processes.

Let's use our connection object and SQL statements to interact with Trino. 

The following SQL statements help us understand our data sources:
* `'SHOW CATALOGS'` shows the data sources available to us at this time,
* `'SHOW SCHEMAS'` indicates how data tables are organized, and 
* `'SHOW TABLES'` exposes datasets within a catalog and schema.

In [None]:
sql = 'SHOW CATALOGS'
df = get_sql(sql, conn)
df.head()

In [None]:
sql = 'SHOW SCHEMAS from postgresql'
df = get_sql(sql, conn)
df.head()

In [None]:
sql = 'SHOW TABLES FROM postgresql.pg_catalog'
df = get_sql(sql, conn)
df.head()

**Please note**:
Your data sources will show up under `'SHOW CATALOGS'` after their respective connectors are configured in your catalog.

**Some useful terminology**:
* **data source** - your data stored in a database, bucket, or other. Trino has connectors for most sources already. 
* **connector** - connectors configure your catalog. They give you access to your data sources. They are similar to drivers, in a way.
* **catalog** - defines schemas and properties of connections so Trino knows how to query your data.
* **schema**  - how your tables are organized.
* **table**   - similar to tables in a relational database. A set of rows and columns representing your data based on connector properties.

**Useful links**:
[Trino Documentation](https://trino.io/docs/current/index.html)

## Raw data tables

In [None]:
bucket_to_columns = {
    'accounts': ('customer_id', 'snapshotdate', 'Feature_9'),
    'demographics': ('customer_id', 'snapshotdate', 'Feature_5', 'Feature_6', 'Feature_7', 'Feature_8'),
    'creditcards': ('customer_id', 'snapshotdate', 'Feature_2', 'Feature_3', 'Feature_4'),
    'loans': ('customer_id', 'snapshotdate', 'Feature_0', 'Feature_1'),
    'labels': ('customer_id', 'snapshotdate', 'label'),
}

In [None]:
def create_schema(bucket_name):
    print(f'Creating schema for bucket "{bucket_name}".')
    sql = f"CREATE SCHEMA hive.{bucket_name} WITH (location = 's3a://{bucket_name}/')"
    df = get_sql(sql, conn)
    df.head()


def create_table(bucket_name, column_names):
    print(f'Creating table for bucket "{bucket_name}" with columns "{column_names}".')
    table_arguments = [
        f'{column_name} VARCHAR' for column_name in column_names
    ]
    table_argument = ','.join(table_arguments)

    sql = f"""
    CREATE TABLE hive.{bucket_name}.raw_{bucket_name} ( 
       {table_argument}
    ) WITH ( 
      EXTERNAL_LOCATION = 's3a://{bucket_name}/',
      FORMAT = 'CSV',
      skip_header_line_count=1
    )

    """
    df = get_sql(sql, conn)
    df.head()

In [None]:
for bucket_name, column_names in bucket_to_columns.items():
    create_schema(bucket_name)
    create_table(bucket_name, column_names)

In [None]:
sql = 'DESCRIBE hive.creditcards.raw_creditcards'
df = get_sql(sql, conn)
df.head()

In [None]:
sql = 'SELECT * FROM hive.creditcards.raw_creditcards limit 40'
df = get_sql(sql, conn)
print(df)

## Fact tables

In [None]:
table_to_columns = {
    'accounts': ('customer_id', 'snapshotdate', 'Feature_9'),
    'demographics': ('customer_id', 'snapshotdate', 'Feature_5', 'Feature_6', 'Feature_7', 'Feature_8'),
    'creditcards': ('customer_id', 'snapshotdate', 'Feature_2', 'Feature_3', 'Feature_4'),
    'loans': ('customer_id', 'snapshotdate', 'Feature_0', 'Feature_1'),
    'labels': ('customer_id', 'snapshotdate', 'label'),
}

column_data_types = {
    'customer_id': 'BIGINT',
    'snapshotdate': 'VARCHAR',
    'label': 'TINYINT',
    'Feature_0': 'DOUBLE',
    'Feature_1': 'DOUBLE',
    'Feature_2': 'DOUBLE',
    'Feature_3': 'DOUBLE',
    'Feature_4': 'DOUBLE',
    'Feature_5': 'DOUBLE',
    'Feature_6': 'DOUBLE',
    'Feature_7': 'DOUBLE',
    'Feature_8': 'DOUBLE',
    'Feature_9': 'DOUBLE',    
}

In [None]:
# for renaming existing tables

#for table in table_to_columns.keys():
#    sql = f'ALTER TABLE postgresql.{table}.fact_{table} RENAME TO postgresql.{table}.fact_{table}_old3'
#    df = get_sql(sql, conn)
#    df.head()

In [None]:
def create_schema(table_name):
    print(f'Creating schema for table "{table_name}".')
    sql = f"CREATE SCHEMA postgresql.{table_name}"
    df = get_sql(sql, conn)
    df.head()


def create_table(table_name, column_names):
    print(f'Creating table for "{table_name}" with columns "{column_names}".')
    table_arguments = [
        f'{column_name} {column_data_types[column_name]}'
        for column_name in column_names
    ]
    table_argument = ','.join(table_arguments)

    sql = f"""
    CREATE TABLE postgresql.{table_name}.fact_{table_name} ( 
       {table_argument}
    )

    """
    df = get_sql(sql, conn)
    df.head()

In [None]:
for table_name, column_names in table_to_columns.items():
    create_schema(table_name)
    create_table(table_name, column_names)

In [None]:
sql = 'DESCRIBE postgresql.loans.fact_loans'
df = get_sql(sql, conn)
df.head()

In [None]:
sql = 'SELECT * FROM postgresql.loans.fact_loans limit 40'
df = get_sql(sql, conn)
print(df)

## Analytical dataset

In [None]:
print(f'Creating schema for analytical dataset.')
sql = f"CREATE SCHEMA postgresql.analytical_dataset"
df = get_sql(sql, conn)
df.head()

print(f'Creating table for the analytical dataset with columns "{column_data_types.items()}".')
table_arguments = [
        f'{column_name} {data_type}'
        for column_name, data_type in column_data_types.items()
    ]
table_argument = ','.join(table_arguments)

sql = f"""
CREATE TABLE postgresql.analytical_dataset.analytical_dataset ( 
   {table_argument}
)

"""
print(f'Running query against Trino : {sql}')
df = get_sql(sql, conn)
df.head()

In [None]:
sql = 'DESCRIBE postgresql.analytical_dataset.analytical_dataset'
df = get_sql(sql, conn)
print(df)

In [None]:
sql = 'SELECT * FROM postgresql.analytical_dataset.analytical_dataset limit 40'
df = get_sql(sql, conn)
print(df)