# Data Processing

This notebook, contains the flow of data processing application. The goal of the app is to use Glue ETL data Glue Data Catalog to register data from Stack Overflow survey into PostgreSQL database. All required resources can be created using the `infra` defined in this repository. There is no external orchestration of the flow, but this notebook can be treated as guideline how to trigger required steps via AWS Console or Python.

*Note*: Resources are linked in markdown cells, where needed. Detailed description of the infrastructure set-up is given in README.

In [None]:
import awswrangler as wr
import boto3
import pandas as pd

from db.client import DBClient

# Raw Data

First step will be to simply upload CSV file to S3. AWS CLI can be used for this, to avoid loading file. For all AWS communication profile `pwr` will be used. For reference how to use credentials file for AWS go to https://docs.aws.amazon.com/cli/v1/userguide/cli-configure-files.html.

In [None]:
!aws s3 cp --profile pwr ../../data/stack-overflow-developer-survey-2023/survey_results_public_2023.csv s3://dps-glue-data/raw/2023/

The learning lab uses `us-east-1` region. To create `boto3.client` with given region and profile settings, `boto3.Session` needs to be used.

In [None]:
session = boto3.Session(profile_name="pwr", region_name="us-east-1")
glue = session.client("glue")
athena = session.client("athena")
ssm = session.client("ssm")

After uploading raw data, we want it to be registered in glue database. To do this, crawler will be run from boto3. <br> The only parameter is crawler name, its input and output are configured in the terraform during the resource creation.

In [None]:
glue.start_crawler(Name="raw-data-crawler")  # Exceptions will propagate to notebook, if they happen

To verify the run, checkout AWS Console with following path `AWS Glue` -> `Data Catalog` -> `Crawlers` -> `raw-data-crawler` -> `Crawler runs`.

Additionally, we can checkout, how the table structure looks in Athena. The `QueryString` is the SQL query, which is executed on serverless database (Athena). The engine reads data directly from S3, applies schema (registered by the glue crawler) and executes the query. Results are stored as JSON on S3 or can be accessed in the AWS Console with SQL editor.

https://docs.aws.amazon.com/athena/latest/ug/getting-started.html

In [None]:
response = athena.start_query_execution(
    # survery_db.raw is the table created by Glue
    QueryString="SELECT * FROM raw LIMIT 10",
    QueryExecutionContext={"Database": "survey_db"},
    ResultConfiguration={"OutputLocation": "s3://dps-glue-data/athena/"}
)
query_execution_id = response["QueryExecutionId"]

In python script, this could be triggered in `while` loop, until the execution is finished. In the notebook, in case the query is still `"RUNNING"`, rerun the cell until `"SUCCEEDED"` status or simply wait. The query should run quickly.

In [None]:
status = athena.get_query_execution(QueryExecutionId=query_execution_id)
status["QueryExecution"]["Status"]["State"]

In [None]:
# boto3_session needs to be passed, since non-default profile is used
raw_data = wr.athena.get_query_results(query_execution_id=query_execution_id, boto3_session=session)
raw_data.head(5)

# Transform Data

After registering data in Glue, Athena can be used to analyse the CSV file. Additionally, Glue supports ETL jobs using PySpark, which can read and write the Glue Data Catalog. Two such jobs are defined in `src/glue`, one for ingestion and one for data normalization to insert it into RDS. The first job can be triggered from python, passing required arguments.

*Note*: Arguments are defined in terraform with default values, those can be over-written by using boto3 client.

In [None]:
# only default arguments are used, so just the job name is passed.
response = glue.start_job_run(JobName="dps-ingest")
response

In [None]:
status = glue.get_job_run(JobName="dps-ingest", RunId=response["JobRunId"])
status["JobRun"]["JobRunState"]

In [None]:
response = athena.start_query_execution(
    # survery_db.raw is the table created by Glue
    QueryString="SELECT * FROM processed LIMIT 10",
    QueryExecutionContext={"Database": "survey_db"},
    ResultConfiguration={"OutputLocation": "s3://dps-glue-data/athena/"}
)
query_execution_id = response["QueryExecutionId"]

In [None]:
status = athena.get_query_execution(QueryExecutionId=query_execution_id)
status["QueryExecution"]["Status"]["State"]

In [None]:
# boto3_session needs to be passed, since non-default profile is used
processed_data = wr.athena.get_query_results(query_execution_id=query_execution_id, boto3_session=session)
processed_data.head(5)

# Normalize

The second ETL transform will convert the `processed` schema into normalize SQL tables, 3 tables and bridge tables.

*Note*: Using Spark to convert such data into SQL schema is not typically used, this is meant more for deminstration of RDS. The SQL schema given in the RDS is not following typical analytical best practices. 

In [None]:
# only default arguments are used, so just the job name is passed.
response = glue.start_job_run(JobName="dps-transform")
response

In [None]:
status = glue.get_job_run(JobName="dps-transform", RunId=response["JobRunId"])
status["JobRun"]["JobRunState"]

# Setup SQL

To connect to RDS only IP given by `vpn_ip` in the terraform config. This is passed via environment variable and it is a secret variable along with the database password. This is a safety features, implementing IP whitelisting using the EC2 Security Group. More details on: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-security-groups.html.

*Note*: IP given in the config is under VPN for extra safety. 

In [None]:
db_client = DBClient.from_ssm(prefix="db", client=ssm)

In [None]:
db_client.list_tables()

In [None]:
db_client.select(table="answers", limit=10)

Initially the fresh database is empty. Schema can be created by executing DDL SQL script from the file.  

In [None]:
with open("db/sql/schema.sql") as f:
    query = f.read()

db_client.execute(query)

In [None]:
db_client.list_tables()

After adding the schema, we can use PySpark script adding the S3 data into SQL. Data will be briefly kept in the memory of the computer this is running on, which means it will be transferred out and back into AWS.

*Note*: PySpark required additonal drivers to run the SQL operations.

In [None]:
!python db/insert.py --profile pwr --region us-east-1 --s3-dir dps-glue-data/normalized

In [None]:
db_client.select(table="countries", limit=10)

In [None]:
db_client.select(table="answers", limit=10)