# Demo 3 - Data Ingestion

This notebook reads the inference from ceph s3 storage for demo2 and will ingest these inference as a table to trino. These tables will be used for creating visualizations using Apache Superset.

In [1]:
import os
import pathlib
from dotenv import load_dotenv
import trino
import pandas as pd
import glob
import config
from src.data.s3_communication import S3Communication, S3FileType

### Injecting Credentials

In order to run this notebook, we need credentials to connect with S3 storage to retrieve data and the Trino server to create tables.

In an automated environment, the credentials can be specified in a pipeline's environment variables or through Openshift secrets.

For running the notebook in a local environment, we will define them as environment variables in a `credentials.env` file at the root of the project repository, and load them using dotenv. An example of what the contents of `credentials.env` could look like is shown below

```
# s3 credentials
S3_ENDPOINT=https://s3.us-east-1.amazonaws.com
S3_BUCKET=ocp-odh-os-demo-s3
AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx

# trino credentials
TRINO_USER=xxx
TRINO_PASSWD=xxx
TRINO_HOST=trino-secure-odh-trino.apps.odh-cl1.apps.os-climate.org
TRINO_PORT=443
```

In [2]:
# Load credentials
dotenv_dir = "/opt/app-root/src/aicoe-osc-demo"
dotenv_path = pathlib.Path(dotenv_dir) / "credentials.env"
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path=dotenv_path, override=True)

## Read Raw Data from S3

First, we will read some sample data from s3. We will format the column data types to ensure they can be understood by Trino, as well as rename the columns so that they are compatible with SQL naming conventions.

In [3]:
# init s3 connector
s3c = S3Communication(
    s3_endpoint_url=os.getenv("S3_ENDPOINT"),
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
    s3_bucket=os.getenv("S3_BUCKET"),
)

In [4]:
if os.getenv("AUTOMATION"):
    if not os.path.exists(config.BASE_INFER_KPI_FOLDER):
        pathlib.Path(config.BASE_INFER_KPI_FOLDER).mkdir(parents=True, exist_ok=True)

    # Download a sample dataset file from s3
    s3c.download_files_in_prefix_to_dir(
        s3_prefix=config.BASE_INFER_KPI_S3_PREFIX,
        destination_dir=config.BASE_INFER_KPI_FOLDER
    )

In [5]:
list_of_files =  []

for filename in all_files:
    df = pd.read_csv(filename, index_col=None, header=0).convert_dtypes().drop(columns=['Unnamed: 0'],axis=1)
    list_of_files.append(df)

preds_kpi = pd.concat(list_of_files, axis=0, ignore_index=True)

len_preds_kpi = len(preds_kpi)

# convert columns to specific data types
preds_kpi = preds_kpi.convert_dtypes().drop(['index'], axis=1, errors='ignore')
preds_kpi.head()

Unnamed: 0,pdf_name,kpi,kpi_id,answer,page,paragraph,source,score,no_ans_score,no_answer_score_plus_boost
0,413749035_Eversource Energy_2019-12-31,In which year was the annual report or the sus...,,2019,7.0,• Our core utility operations performed very w...,Text,13.372849,-10.76948,-25.76948
1,413749035_Eversource Energy_2019-12-31,In which year was the annual report or the sus...,,2019,34.0,The American Council for an Energy-Efficient E...,Text,12.66205,-9.417558,-24.417558
2,413749035_Eversource Energy_2019-12-31,In which year was the annual report or the sus...,,2019,12.0,The Eversource Internal Audit Department perfo...,Text,12.373636,-10.899869,-25.899869
3,413749035_Eversource Energy_2019-12-31,In which year was the annual report or the sus...,,2019,118.0,These are referenced throughout our 2019 Susta...,Text,12.245757,-10.556628,-25.556628
4,413749035_Eversource Energy_2019-12-31,What is the annual total production from coal?,,no_answer,,,Text,2.720188,,


In [6]:
# Author: Erik Erlandson <eje@redhat.com>

_p2smap = {"string": "varchar", "Float64": "double", "Int64": "bigint"}


def pandas_type_to_sql(pt):
    st = _p2smap.get(pt)
    if st is not None:
        return st
    raise ValueError("unexpected pandas column type '{pt}'".format(pt=pt))


# add ability to specify optional dict for specific fields?
# if column name is present, use specified value?
def generate_table_schema_pairs(df):
    ptypes = [str(e) for e in df.dtypes.to_list()]
    stypes = [pandas_type_to_sql(e) for e in ptypes]
    pz = list(zip(df.columns.to_list(), stypes))
    return ",\n".join(["    {n} {t}".format(n=e[0], t=e[1]) for e in pz])

In [7]:
# a way to examine the structure of a pandas data frame
preds_kpi.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 689 entries, 0 to 688
Data columns (total 10 columns):
 #   Column                      Non-Null Count  Dtype  
---  ------                      --------------  -----  
 0   pdf_name                    689 non-null    string 
 1   kpi                         689 non-null    string 
 2   kpi_id                      0 non-null      Int64  
 3   answer                      689 non-null    string 
 4   page                        555 non-null    Int64  
 5   paragraph                   555 non-null    string 
 6   source                      689 non-null    string 
 7   score                       689 non-null    Float64
 8   no_ans_score                555 non-null    Float64
 9   no_answer_score_plus_boost  555 non-null    Float64
dtypes: Float64(3), Int64(2), string(5)
memory usage: 57.3 KB


## Save Processed Data to S3

Now that our data is in a form ingestible by Trino, we will upload it back into our s3 bucket. This will be the data source for our Trino table.

In [8]:
# parquet has multiple options for appending or updating data
# including adding new files, or appending, sharding directory trees, etc
for filename in all_files:
    df = pd.read_csv(filename, index_col=None, header=0).convert_dtypes().drop(columns=['Unnamed: 0'],axis=1)
    ret = s3c.upload_df_to_s3(
        df,
        s3_prefix=config.BASE_INFER_KPI_TABLE_S3_PREFIX,
        s3_key=f"{os.path.basename(filename).split('.')[0]}.parquet",
        filetype=S3FileType.PARQUET,
        index=False,
    )
    print(ret['ResponseMetadata']['HTTPStatusCode'])

200
200
200
200
200
200
200
200
200
200


## Create a Table on Trino

Finally, we will create a table in our Trino database that uses the parquet files we uploaded in the previous section as the data source.

In [9]:
# use trino password env-var to hold token values
JWT_TOKEN = os.environ['TRINO_PASSWD']
conn = trino.dbapi.connect(
    host=os.environ['TRINO_HOST'],
    port=os.environ['TRINO_PORT'],
    user=os.environ['TRINO_USER'],
    http_scheme='https',
    auth=trino.auth.JWTAuthentication(JWT_TOKEN),
)
cur = conn.cursor()

In [10]:
# generate a sql schema that will correspond to the data types
# of columns in the pandas DF
# to-do: add some mechanisms for overriding types, either here
# or on the pandas data-frame itself before we write it out
schema = generate_table_schema_pairs(preds_kpi)

tabledef = """create table if not exists osc_datacommons_dev.urgentem.infer_kpi(
{schema}
) with (
    format = 'parquet',
    external_location = 's3a://{s3_bucket}/{kpi_table_s3_prefix}/'
)""".format(
    schema=schema,
    s3_bucket=os.environ["S3_BUCKET"],
    kpi_table_s3_prefix=config.BASE_INFER_KPI_TABLE_S3_PREFIX,
)
# tables created externally may not show up immediately in cloud-beaver
cur.execute(tabledef)
cur.fetchall()

[[True]]

In [11]:
## Check if infer_kpi table is there
cur.execute("select * from osc_datacommons_dev.urgentem.infer_kpi LIMIT 5")
cur.fetchall()[1]

['sustainability-report-2019',
 'In which year was the annual report or the sustainability report published?',
 None,
 '2019',
 26,
 'Equinor Sustainability report 2019 High value — creating shared value',
 'Text',
 12.427505493164062,
 -9.680328369140623,
 -24.680328369140625]

# Conclusion

In this notebook, we read inference for KPI sustainability report, 2019 which follows the same format as the output of the KPI Inference model in Demo 2. After reading the report, we automatically infer the data schema from the report, preprocess it and create a table in trino that could be used for visualization in Apache Superset.