In [49]:
import os
import pandas as pd
import boto3
import awswrangler as wr
from datetime import datetime

from sqlalchemy import exc, create_engine
# okay so if validated, then split up the string into 4 parts: the bucket, validated, month, and file name.
# then read the parquet file in, validate the schema again, and then store it to sql.
# the sql will be named aws_adv_stats_source in some random new bucket
# make sure the sql tables are created in pgadmin first.

In [52]:
def sql_connection(rds_schema: str):
    """
    SQL Connection function connecting to my postgres db with schema = nba_source where initial data in ELT lands
    Args:
        None
    Returns:
        SQL Connection variable to schema: nba_source in my PostgreSQL DB
    """
    RDS_USER = os.environ.get("RDS_USER")
    RDS_PW = os.environ.get("RDS_PW")
    RDS_IP = os.environ.get("IP")
    RDS_DB = os.environ.get("RDS_DB")
    try:
        connection = create_engine(
            f"postgresql+psycopg2://{RDS_USER}:{RDS_PW}@{RDS_IP}:5432/{RDS_DB}",
            connect_args={"options": f"-csearch_path={rds_schema}"},
            # defining schema to connect to
            echo=False,
        )
        print(f"SQL Connection to schema: {rds_schema} Successful")
        return connection
    except exc.SQLAlchemyError as e:
        print(f"SQL Connection to schema: {rds_schema} Failed, Error: {e}")
        return e

def write_to_sql(con, table_name: str, df: pd.DataFrame, table_type: str):
    """
    SQL Table function to write a pandas data frame in aws_dfname_source format
    Args:
        data: The Pandas DataFrame to store in SQL
        table_type: Whether the table should replace or append to an existing SQL Table under that name
    Returns:
        Writes the Pandas DataFrame to a Table in Snowflake in the {nba_source} Schema we connected to.
    """
    try:
        if len(df) == 0:
            print(f"{table_name} is empty, not writing to SQL")
        elif df.schema == "Validated":
            df.to_sql(
                con=con,
                name=f"aws_{table_name}_source",
                index=False,
                if_exists=table_type,
            )
            print(f"Writing {len(df)} {table_name} rows to aws_{table_name}_source to SQL")
        else:
            print(f"{table_name} Schema Invalidated, not writing to SQL")
    except BaseException as error:
        print(f"SQL Write Script Failed, {error}")
        return error

In [56]:
aws_conn = sql_connection('s3_ingestion_prac')

SQL Connection to schema: s3_ingestion_prac Successful


In [45]:
bucket_name = 'jacobsbucket97'

def get_s3_files(bucket: str):
    try:
        s3_resource = boto3.resource('s3')
        bucket = s3_resource.Bucket(name = bucket)
        s3_keys = list(s3_object.key for s3_object in bucket.objects.all())
        print(f"Returning {len(s3_keys)} s3 files")
        return s3_keys
    except BaseException as e:
        print(f"Error Occurred, {e}")
        raise e

s3_keys2 = get_s3_files(bucket_name)

In [14]:
s3_keys = s3_keys2[0]
data_type = s3_keys.split('/')[0]
valid_type = s3_keys.split('/')[1]
month = s3_keys.split('/')[2]
file_name = s3_keys.split('/')[3]

In [38]:
s3_keys_1 = s3_keys[:1] # select the very first element of a list and keep it in a list

In [69]:
def execute_reingestion(conn, s3_files):
    try:
        for i in s3_files:
            if 'validated' in i:
                if len(i.split('/')) == 4:  # the string should match this schema and have 4 unique parts, if not then dont ingest
                    data_type = i.split('/')[0]
                    file_name = i.split('/')[3]
                    df = wr.s3.read_parquet(path = f's3://{bucket_name}/{i}')
                    df['file_name'] = file_name
                    df['upload_time'] = datetime.now()
                    df.schema = 'Validated'
                    write_to_sql(conn, data_type, df, "append")
    except BaseException as e:
        print(f"Error Occurred, {e}")
        raise e

In [70]:
execute_reingestion(aws_conn, s3_keys)

Writing 30 adv_stats rows to aws_adv_stats_source to SQL
Writing 30 adv_stats rows to aws_adv_stats_source to SQL
Writing 30 adv_stats rows to aws_adv_stats_source to SQL
Writing 30 adv_stats rows to aws_adv_stats_source to SQL
Writing 30 adv_stats rows to aws_adv_stats_source to SQL
Writing 30 adv_stats rows to aws_adv_stats_source to SQL
Writing 30 adv_stats rows to aws_adv_stats_source to SQL
Writing 30 adv_stats rows to aws_adv_stats_source to SQL
Writing 30 adv_stats rows to aws_adv_stats_source to SQL
Writing 30 adv_stats rows to aws_adv_stats_source to SQL
Writing 30 adv_stats rows to aws_adv_stats_source to SQL
Writing 30 adv_stats rows to aws_adv_stats_source to SQL
Writing 30 adv_stats rows to aws_adv_stats_source to SQL
Writing 30 adv_stats rows to aws_adv_stats_source to SQL
Writing 30 adv_stats rows to aws_adv_stats_source to SQL
Writing 30 adv_stats rows to aws_adv_stats_source to SQL
Writing 30 adv_stats rows to aws_adv_stats_source to SQL
Writing 30 adv_stats rows to aw

In [68]:
for i in s3_keys:
    if 'validated' in i:
        if len(i.split('/')) == 4:
            print(i)

adv_stats/validated/02/adv_stats-2022-02-26.parquet
adv_stats/validated/02/adv_stats-2022-02-27.parquet
adv_stats/validated/02/adv_stats-2022-02-28.parquet
adv_stats/validated/02/adv_stats-2022-03-01.parquet
adv_stats/validated/03/adv_stats-2022-03-02.parquet
adv_stats/validated/03/adv_stats-2022-03-03.parquet
adv_stats/validated/03/adv_stats-2022-03-04.parquet
adv_stats/validated/03/adv_stats-2022-03-05.parquet
adv_stats/validated/03/adv_stats-2022-03-06.parquet
adv_stats/validated/03/adv_stats-2022-03-07.parquet
adv_stats/validated/03/adv_stats-2022-03-08.parquet
adv_stats/validated/03/adv_stats-2022-03-09.parquet
adv_stats/validated/03/adv_stats-2022-03-10.parquet
adv_stats/validated/03/adv_stats-2022-03-11.parquet
adv_stats/validated/03/adv_stats-2022-03-12.parquet
adv_stats/validated/03/adv_stats-2022-03-13.parquet
adv_stats/validated/03/adv_stats-2022-03-14.parquet
adv_stats/validated/03/adv_stats-2022-03-15.parquet
adv_stats/validated/03/adv_stats-2022-03-16.parquet
adv_stats/va

In [65]:
len(s3_keys_1[0].split('/'))

4