## Data Ingestion and Preparation
In this notebook we will cover a number of patterns for ingesting data into native AWS services and make them accessible via SQL queries and pandas dataframes. We will cover the following tools and services:
- [AWS S3:]() A scalable object storage service that can be used to store and retrieve data.
- [AWS Glue:]() Provides a data catalog that can be used to discover and search available data sets.
- [AWS Athena:]() An interactive query service that can be used to query data stored in S3 using SQL.
- [AWS SDK for Pandas (awswrangler):](https://aws-sdk-pandas.readthedocs.io/en/stable/) A library that can be used to query data stored in various AWS data sources and return the results as pandas dataframes.

In [None]:
# Install the latest version of awswrangler
%pip install -Uqq awswrangler

### Obtaining the Data
We will use a sample synthetic dataset. First, we will download the data and store it in our notebook instance. We will then upload the data to an S3 bucket.

In [None]:
import pandas as pd
import sagemaker  # AWS SageMaker Python SDK makes it easier to work with various SageMaker APIs
import awswrangler as wr
import os

import json

wr.engine.set("python")
wr.memory_format.set("pandas")

In [10]:
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
prefix = "fico_ml_workshop"

In [4]:
! aws s3 rm --recursive s3://{bucket}/{prefix}/ > /dev/null

In [None]:
# upload local csv file to S3
s3_csv_data = sagemaker_session.upload_data("data/ln_large.csv", bucket, prefix + "/data/csv")
print("Data uploaded to " + s3_csv_data)

### Data Ingestion
In this section we will explore 2 approaches for ingesting data into Amazon Athena:
1. **Using the awswrangler library:** This is a simpler approach where we will simply read the raw csv data from S3, clean it up a bit using pandas, and then convert it into a parquet format and make it accessible via Athena.
2. **Using Athena DDL:** This is a more complex approach where we will create a table in Athena using DDL statements and then query the data. The advantage of this approach is that all of the compute is done on the Athena side and we can query the data without having to download it to our notebook instance.

#### Ingest data into Athena using awswrangler

With this approach, we'll use the awswrangler library to create a new database. We'll then read the raw csv data from S3, clean it up a bit using pandas, and then convert it into a parquet format.

**Advantages**
- Simple to use
- No need to write DDL statements
- Uses pandas for data manipulation

**Disadvantages**
- Data is downloaded to the notebook instance
- Larger datasets may not fit in memory and would require larger instances or running on a cluster
- Could be slower for large datasets

In [None]:
database_name = "workshop"
wrangler_parquet_table_name = "loan_data_parquet_wrangler"

# create a new database if it doesn't exist
if database_name not in wr.catalog.databases().values:
    wr.catalog.create_database(name=database_name)

# read the CSV file from S3
data = wr.s3.read_csv(path=[s3_csv_data])

For best performance, it is recommended to partition the data on columns that are frequently used in queries. This will allow Athena to skip reading unnecessary data when executing queries. We'll partition by the year and month in which the account was opened. We can partition on a more granular column such as the date the account was opened, however this has the risk of creating too many partitions with very small files which would hurt performance.

In [None]:
# convert the columns containing the word "DATE" to datetime
date_cols = [col for col in data.columns if "DATE" in col]
print(f"Converting {date_cols} to datetime")
for col in date_cols:
    data[col] = pd.to_datetime(data[col], errors="coerce", format="%Y-%m-%d")

# create additional columns for year and month the account opened 
# We will use these columns to partition the data in the next step
data["TI_LN_DATE_OPEN_YEAR"] = data["TI_LN_DATE_OPEN"].dt.year
data["TI_LN_DATE_OPEN_MONTH"] = data["TI_LN_DATE_OPEN"].dt.month

In [None]:
# Write the data to S3 in Parquet format and create a table in the Glue Data Catalog
s3_output_path = f"s3://{bucket}/{prefix}/data/wrangler/parquet/"
wr.s3.to_parquet(
    df=data,
    path=s3_output_path,
    dataset=True,
    mode="overwrite",
    database=database_name,
    table=wrangler_parquet_table_name,
    partition_cols=["TI_LN_DATE_OPEN_YEAR", "TI_LN_DATE_OPEN_MONTH"],
)

In [None]:
# we can now validate that the table was created
tables = [tbl["Name"] for tbl in  wr.catalog.get_tables(database=database_name)]
print(f"Tables in database {database_name}: {tables}")


In [None]:
# query the data
wr.athena.read_sql_query(f"SELECT * FROM {wrangler_parquet_table_name} LIMIT 5", database=database_name)

### Ingest using Athena API and SQL
In this approach we will create a table on top of the existing CSV file in S3 using Athena DDL statements. We will then use Athena to convert the data into a parquet format, and query the data.

**Advantages**
- No need to download the data to the notebook instance
- No need to load the data into memory
- We can use tiny instances even for large datasets as all work is done on the Athena side
- Use SQL to wrangle and query the data

**Disadvantages**
- Requires knowledge of Athena SQL 
- May fail if the raw csv has data quality issues
- Wrangling with SQL can be more complex and less flexible than using pandas

To automatically construct the SQL DDL statement, we will read a small sample of the CSV using pandas and infer the schema. We will then use this schema to create the table in Athena. 

In [None]:
sample_df = pd.read_csv(s3_csv_data, nrows=1000)

In [None]:
sample_df.head()

In the cells below we wil use a few functions from awswrangler to generate a schema for our dataset and create a table directly over the CSV file in S3

In [125]:
# awswrangler provides a utility function to convert pandas data types to Athena data types
schema = wr.catalog.extract_athena_types(sample_df)[0]

In [34]:
csv_table_name = "loan_data_csv"

# we have to pass in the folder containing the CSV files rather than the file itself
s3_csv_folder = os.path.dirname(s3_csv_data)

wr.catalog.create_csv_table(
    database=database_name,
    table=csv_table_name,
    path=s3_csv_folder,
    columns_types=schema,
)

In [None]:
# validate that the table was created
tables = [tbl["Name"] for tbl in  wr.catalog.get_tables(database=database_name)]
print(f"Tables in database {database_name}: {tables}")

In [None]:
# query the csv table
csv_table_sample = wr.athena.read_sql_query(f"SELECT * FROM {csv_table_name} LIMIT 5", database=database_name)
csv_table_sample.head()

The CSV table has an issue. All of the date columns are stored as string as seen below. Let's fix this by writing a SQL query to convert the date columns to date format. We will then materialize the output of the query into a new optimized parquet based table in Athena.

In [None]:
# check the data types of the date columns
# We will convert the string to dates but ignore the Int64 ti_ln_write_off_date column for now
csv_table_sample.dtypes.filter(like="date")

In [127]:
def generate_cleanup_sql_query(table_name, database_name, columns, partition_col):
    
    "helper function to try cast the string date columns to date"
    
    query = "select\n"
    for col in columns:
        if "date" in col and col != "ti_ln_write_off_date":
            query += f"     TRY_CAST({col} AS DATE) AS {col},\n"
        else:
            query += f"     {col},\n"
    
    # add partition columns
    query += f"     YEAR(TRY_CAST({partition_col} AS DATE)) AS {partition_col}_year,\n"
    query += f"     MONTH(TRY_CAST({partition_col} AS DATE)) AS {partition_col}_month\n"
    
    query += f"from {table_name}\n"
    
    return query

In [None]:
cleanup_sql_query = generate_cleanup_sql_query(
    csv_table_name, database_name, csv_table_sample.columns, "ti_ln_date_open"
)

print(cleanup_sql_query)

In [None]:
# let's validate that the query works
wr.athena.read_sql_query(cleanup_sql_query + "\n limit 10", database_name)

We can use the `create_ctas_table` function from awswrangler to create a new table in Athena using a SQL query. **CTAS** stands for Create Table As Select. This function will execute the query and store the results in a new table in Athena.
The newly added `ti_ln_date_open_year`, `ti_ln_date_open_month` will be used to partition the data by the year and month the account was opened. This will allow Athena to skip reading unnecessary data when executing queries.

In [None]:
sql_parquet_table_name = "loan_data_parquet_sql"
sql_parquet_output_path = f"s3://{bucket}/{prefix}/data/sql/parquet/"

!aws s3 rm --recursive {sql_parquet_output_path}
wr.catalog.delete_table_if_exists(database=database_name, table=sql_parquet_table_name)

ctas_query= wr.athena.create_ctas_table(
    sql=cleanup_sql_query,
    database=database_name,
    ctas_table=sql_parquet_table_name,
    s3_output=sql_parquet_output_path,
    storage_format="PARQUET",
    partitioning_info=["ti_ln_date_open_year", "ti_ln_date_open_month"],
)

wr.athena.wait_query(query_execution_id=ctas_query["ctas_query_id"])

In [None]:
tables = [tbl["Name"] for tbl in  wr.catalog.get_tables(database=database_name)]
print(f"Tables in database {database_name}: {tables}")

In [None]:
# query the data
wr.athena.read_sql_query(f"SELECT * FROM {sql_parquet_table_name} LIMIT 5", database_name)

We can do a quick benchmark to compare the performance of a query on the raw CSV table and the two parquet tables. You likely won't see a significant difference given the small amount of data and the large amount of time it takes to send the query results back to the notebook instance. However, for larger datasets, the difference in performance can be significant.

In [None]:
%%timeit -n 1 -r 3
wr.athena.read_sql_query(f"""SELECT ti_cu_customer_id, 
                         ti_ln_account_id, 
                         count(*) as count 
                         FROM {csv_table_name} 
                         where ti_ln_original_term = ti_ln_remaining_term 
                         GROUP BY 1,2""", 
                         database=database_name)

In [None]:
%%timeit -n 1 -r 3
wr.athena.read_sql_query(f"""SELECT ti_cu_customer_id, 
                         ti_ln_account_id, 
                         count(*) as count 
                         FROM {wrangler_parquet_table_name} 
                         where ti_ln_original_term = ti_ln_remaining_term 
                         GROUP BY 1,2""", 
                         database=database_name)

In [None]:
%%timeit -n 1 -r 3
wr.athena.read_sql_query(f"""SELECT ti_cu_customer_id, 
                         ti_ln_account_id, 
                         count(*) as count 
                         FROM {sql_parquet_table_name} 
                         where ti_ln_original_term = ti_ln_remaining_term 
                         GROUP BY 1,2""", 
                         database=database_name)

We can also compare the file sizes of the raw CSV table and the two parquet tables. You should see a significant reduction in file size for the parquet tables. Since Athena charges based on the amount of data scanned, this can result in significant cost savings.

In [None]:
# get size of csv data in MB
!aws s3api list-objects-v2 --bucket $bucket --prefix $prefix/data/csv --query "Contents[].Size" --output json | jq '. | add / (1024 * 1024)'

In [None]:
# get size of parquet data created with pandas in MB
!aws s3api list-objects-v2 --bucket $bucket --prefix $prefix/data/wrangler --query "Contents[].Size" --output json | jq '. | add / (1024 * 1024)'

In [None]:
# get size of parquet data created with sql in MB
!aws s3api list-objects-v2 --bucket $bucket --prefix $prefix/data/sql --query "Contents[].Size" --output json | jq '. | add / (1024 * 1024)'

In [137]:
# save the lab values to a json file for use in the next notebook
with open("lab_values.json", "w") as f:
    json.dump({
        "database_name": database_name,
        "s3_csv_folder": s3_csv_folder,
        "csv_table_name": csv_table_name,
        "wrangler_parquet_table_name": wrangler_parquet_table_name,
        "sql_parquet_table_name": sql_parquet_table_name,
        "parquet_output_path": s3_output_path,
    },f)

### Conclusion
In this notebook we covered two approaches for ingesting data into Athena. The first approach used the awswrangler library to read the raw CSV data from S3, clean it up using pandas, and then convert it into a parquet format. The second approach used Athena DDL statements to create a table on top of the existing CSV file in S3 and then convert the data into a parquet format. We then compared the performance and file sizes of the raw CSV table and the two parquet tables.