### Extract

Goals:
- Connect to chinook Database
- Get data of customers table

#### Connection to Database

In [None]:
from sqlalchemy import create_engine

# Connect to the database
db_connection_string = 'sqlite:///chinook.db'
engine = create_engine(url=db_connection_string)
# engine = create_engine(url=db_connection_string, connect_args={'username': 'cmadmin', 'password': 'adpassword'})
conn = engine.connect()

#### Retrieve Data

In [None]:
import pandas as pd

# Read data from a table into a DataFrame
customers_df = pd.read_sql_table(table_name='customers', con=conn)
customers_df.tail(5)

### Transformation (Optional)

In reality, this is usually a very later step.

Goals:
- Filter for only USA customers

In [None]:
usa_customers_df = customers_df[customers_df['Country']=='USA']
usa_customers_df.tail(5)

### Load

Upload USA customer data into AWS S3 bucket

In [None]:
ACCESS_KEY = '<ENTER ACCESS KEY>'
SECRET_ACCESS_KEY = '<ENTER ACCESS SECRET>'

In [None]:
# S3 location
bucket_name = 'cm-aws-s3-dev'
folder = "chinook"
subfolder = 'khanh98'
s3_path = f"s3://{bucket_name}/{folder}/{subfolder}"
filename = 'usa_customers'

#### Option 1: Pandas + s3fs

In [None]:
import s3fs

# Create an s3fs file system object with credentials
s3 = s3fs.S3FileSystem(
    key=ACCESS_KEY, 
    secret=SECRET_ACCESS_KEY, 
    client_kwargs={'region_name': 'ap-southeast-2'}
)

# Write the DataFrame to S3
# csv format
with s3.open(f'{s3_path}/{filename}.csv', 'w') as csv_f:
    usa_customers_df.to_csv(csv_f, index=False)

#### Option 2: boto3 + awswrangler

In [None]:
import boto3
import awswrangler as wr

# create s3 client using boto3 + credentials
session = boto3.Session(
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_ACCESS_KEY,
)

# upload result to s3 as parquet file 
wr.s3.to_parquet(
    df=usa_customers_df,
    path=f'{s3_path}/{filename}.parquet',
    boto3_session=session
)

## ==================================================
### Best Practice

In [None]:
from sqlalchemy.engine.base import Engine
from pandas.core.frame import DataFrame
import boto3
import awswrangler as wr
import pandas as pd
from datetime import datetime
import pytz

def extract(
    db_engine: Engine, 
    table_name: str
) -> DataFrame:
    # Connect to the database
    conn = db_engine.connect()

    # Read data from a table into a DataFrame
    df = pd.read_sql_table(table_name=table_name, con=conn)

    # close connection
    conn.close()

    # return data as DataFrame
    return df

def load(
    df: DataFrame,
    table_name: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
    s3_path: dict
) -> bool:
    
    # create s3 client using boto3 + credentials
    session = boto3.Session(
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
    )

    # upload result to s3 as parquet file
    today = datetime.now(tz=pytz.timezone('Australia/Adelaide'))
    year, month, day = today.strftime('%Y'), today.strftime('%m'), today.strftime('%d')

    # Upload file to folder
    file_path = f"{s3_path}/year={year}/month={month}/day={day}/{table_name}.parquet"

    wr.s3.to_parquet(
        df=df,
        path=file_path,
        boto3_session=session
    )
    print(f'Data successfully loaded into: {file_path}\n')

#### Modular & Resuable Pipeline

In [None]:
from sqlalchemy import create_engine

# Engine connection to the database
db_connection_string = 'sqlite:///chinook.db'
chinook_engine = create_engine(url=db_connection_string)

In [None]:
customer_df = extract(db_engine=chinook_engine, table_name='customers')
customer_df.tail(5)

#### Config-Driven Pipeline

In [None]:
import yaml
import pprint

config_file = 'config.yml'

with open(config_file) as f:
    conf = yaml.safe_load(f)

pp = pprint.PrettyPrinter()
pp.pprint(conf)

In [None]:
source_conf = conf.get('source')
target_conf = conf.get('target')

In [None]:
for table_name in source_conf.get('table'):
    print(f"{'='*20}  {table_name}  {'='*20}")

    # extract data
    df = extract(
            db_engine=chinook_engine, 
            table_name=table_name
        )

    # target authentication & location
    s3_credentials = target_conf.get('credentials')
    s3_location = target_conf.get('location')

    # load to AWS S3
    load(
            df=df,
            table_name=table_name,
            aws_access_key_id=s3_credentials.get('aws_access_key_id'),
            aws_secret_access_key=s3_credentials.get('aws_secret_access_key'),
            s3_path=s3_location.get('path')
        )
