# Intro to ETL & ELT Data Pipelines (*and SQLAlchemy ORM*)

### ETL: Extract --> Transform --> Load
* Traditional data pipeline process
* Strict schema
* Typically loaded to data warehouses and/or relational databases after transformation
* Schema-on-write technique

### ELT: Extract --> Load --> Transform
* Newer data pipeline process, particularly for big data datasets
* Flexible schema, dependent on end-user's specific needs
* Typically loaded to a data lake then is transformed by the end user to fit their needs
* Schema-on-read technique

In [18]:
import os
from sqlalchemy import create_engine
import pandas as pd
import logging
import datetime as dt

*Get username, password, etc from separate file*

In [2]:
secrets = ""
secrets_dict = dict()

with open(f"{os.getcwd()}\\..\\SECRETS.txt", "r") as f:
    secrets = f.readlines()

for secret in secrets:
    secret_no_newline_char = secret.replace("\n", "")
    key_value_pair = secret_no_newline_char.split(" = ")
    key = key_value_pair[0]
    value = key_value_pair[1]

    secrets_dict[key] = value

*Globals*

In [3]:
POSTGRES_USERNAME = secrets_dict["PostgreSQL Username"]
POSTGRES_PASSWORD = secrets_dict["PostgreSQL Password"]
POSTGRES_HOST = secrets_dict["PostgreSQL Host"]
POSTGRES_PORT = secrets_dict["PostgreSQL Port"]

In [19]:
def logger(log_level: str, msg):
    current_datetimestamp = dt.datetime.now().strftime("%m/%d/%Y %H:%M:%S")
    timestamped_message = f"{current_datetimestamp} -- {msg}"

    log_level = log_level.lower()

    if log_level == "info":
        logging.info(timestamped_message)
        print(timestamped_message) # for jupyter notebook output
    elif log_level == "debug":
        logging.debug(timestamped_message)
        print(timestamped_message) # for jupyter notebook output
    elif log_level == "warn" or log_level == "warning":
        logging.warning(timestamped_message)
        print(timestamped_message) # for jupyter notebook output
    elif log_level == "error":
        logging.error(timestamped_message)
        print(timestamped_message) # for jupyter notebook output
    elif log_level == "critical":
        logging.critical(timestamped_message)
        print(timestamped_message) # for jupyter notebook output
    else:
        print("INVALID LOG TYPE. Choose from the following: info, debug, warn, warning, error, critical")
        return

*Get PostgreSQL database connection object*

In [20]:
def get_postgres_db_connection(username, password, host, port, db_name):
    connection_url = f'postgresql://{username}:{password}@{host}:{port}/{db_name}'

    try:

        engine = create_engine(connection_url)
        logger("info", "Create database engine")
        connection = engine.connect() # Connect to the database
        logger("info", "DB Connection was successful.")
    except Exception as e:
        logger("error", f"DB Connection was unsuccessful.\nEXCEPTION: {e}")
        return None

    return connection

## Extraction

In [21]:
logger("info", "Starting extraction.")
books_db_name = "Books"
books_conn = get_postgres_db_connection(POSTGRES_USERNAME, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT, books_db_name)
authors_df = pd.read_sql("SELECT * FROM myauthors", books_conn)

logger("info", "Extraction completed.")
authors_df.head()

10/11/2024 12:37:13 -- Starting extraction.
10/11/2024 12:37:13 -- Create database engine
10/11/2024 12:37:13 -- DB Connection was successful.
10/11/2024 12:37:13 -- Extraction completed.


Unnamed: 0,author_id,first_name,middle_name,last_name
0,2,Linda,,Mul
1,1,Merrit,,Eric
2,3,Alecos,,Papadatos
3,4,Paul,C.van,Oorschot
4,5,David,,Cronin


In [22]:
books_conn.close()
logger("info", "Closed DB connection.")

10/11/2024 12:37:15 -- Closed DB connection.


In [15]:
coffee_db_name = "COFFEE_Final_Project"
coffee_conn = get_postgres_db_connection(POSTGRES_USERNAME, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT, coffee_db_name)
products_df = pd.read_sql("SELECT * FROM products", coffee_conn)

products_df.head()

Connection successful: <sqlalchemy.engine.base.Connection object at 0x0000026DBAF94730>


Unnamed: 0,product_id,product_name,description,price,product_type_id
0,1,Brazilian - Organic,It's like Carnival in a cup. Clean and smooth.,18.0,1
1,2,Our Old Time Diner Blend,Our packed blend of beans that is reminiscent ...,18.0,2
2,3,Espresso Roast,Our house blend for a good espresso shot.,14.75,3
3,4,Primo Espresso Roast,Our premium single source of hand roasted beans.,20.45,3
4,5,Columbian Medium Roast,A smooth cup of coffee any time of day.,15.0,4


In [16]:
expensive_products_df = pd.read_sql("SELECT * FROM products WHERE price > 20", coffee_conn)

expensive_products_df.head()

Unnamed: 0,product_id,product_name,description,price,product_type_id
0,4,Primo Espresso Roast,Our premium single source of hand roasted beans.,20.45,3
1,6,Ethiopia,From the home of coffee.,21.0,4
2,8,Civet Cat,"The most expensive coffee in the world, the ca...",45.0,5
3,9,Organic Decaf Blend,Our blend of hand picked organic beans that ha...,28.0,6
4,80,I Need My Bean! Toque,keep your head bean warm,23.0,31


In [17]:
coffee_conn.close()

*Reusable, modular extract function*

In [18]:
def extract_data_to_df(db_name, sql_query):
    conn = get_postgres_db_connection(POSTGRES_USERNAME, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT, db_name)
    try:
        df = pd.read_sql(sql_query, conn)
    except Exception as e:
        print(f"Query failed.\nEXCEPTION: {e}")
        conn.close()
        return None
    
    conn.close()
    
    return df

In [19]:
customers_sql_query = "SELECT * FROM customers"
customers_df = extract_data_to_df(coffee_db_name, customers_sql_query)
customers_df.head()

Connection successful: <sqlalchemy.engine.base.Connection object at 0x0000026DBB0A4700>


Unnamed: 0,customer_id,customer_name,email,reg_date,card_number,date_of_birth,gender
0,0,,,,,,
1,3001,Kelly Key,Venus@adipiscing.edu,2017-01-04,908-424-2890,1950-05-29,M
2,3002,Clark Schroeder,Nora@fames.gov,2017-01-07,032-732-6308,1950-07-30,M
3,3003,Elvis Cardenas,Brianna@tellus.edu,2017-01-10,459-375-9187,1950-09-30,M
4,3004,Rafael Estes,Ina@non.gov,2017-01-13,576-640-9226,1950-12-01,M


## Transformation

*Remove first null row in customers dataframe*

In [20]:
customers_df = customers_df.drop(0)

In [21]:
customers_df.head()

Unnamed: 0,customer_id,customer_name,email,reg_date,card_number,date_of_birth,gender
1,3001,Kelly Key,Venus@adipiscing.edu,2017-01-04,908-424-2890,1950-05-29,M
2,3002,Clark Schroeder,Nora@fames.gov,2017-01-07,032-732-6308,1950-07-30,M
3,3003,Elvis Cardenas,Brianna@tellus.edu,2017-01-10,459-375-9187,1950-09-30,M
4,3004,Rafael Estes,Ina@non.gov,2017-01-13,576-640-9226,1950-12-01,M
5,3005,Colin Lynn,Dale@Integer.com,2017-01-15,344-674-6569,1951-02-01,M


*Filter for female customers*

In [23]:
female_customers_df = customers_df.loc[customers_df["gender"] == "F", :]
female_customers_df.head()

Unnamed: 0,customer_id,customer_name,email,reg_date,card_number,date_of_birth,gender
301,3301,Alika Rivas,Yoshi@aliquam.us,2017-01-04,021-443-0682,1950-05-13,F
302,3302,Sacha Wall,Yvette@erat.org,2017-01-06,433-102-9277,1950-06-29,F
303,3303,Raya Hampton,Martina@malesuada.com,2017-01-08,478-762-8984,1950-08-14,F
304,3304,Belle Reyes,Sierra@sodales.net,2017-01-10,930-000-5889,1950-09-30,F
305,3305,Brooke Munoz,William@montes.com,2017-01-12,573-552-0754,1950-11-16,F


*Sort female customer names in alphabetical order*

In [24]:
female_customers_sorted_df = female_customers_df.sort_values(by="customer_name", ascending=True)
female_customers_sorted_df.head()

Unnamed: 0,customer_id,customer_name,email,reg_date,card_number,date_of_birth,gender
1318,5518,Abra,Denise@bibendum.edu,2017-02-05,069-289-4592,1952-05-12,F
347,3347,Abra Dotson,Penelope@taciti.org,2017-04-09,659-894-5363,1956-03-26,F
660,3660,Abra Schwartz,Orson@feugiat.com,2019-01-16,703-569-1850,1996-03-02,F
1886,8141,Adara,Emily@non.net,2018-10-02,524-204-1696,1990-02-25,F
1241,5441,Adara,Jaime@egestas.edu,2018-07-23,504-533-2969,1985-04-02,F


## Loading

*Load transformed data to a CSV file*

Parameters:
* `header`: column headers *(default value is True)*
* `index`: row indicies *(default value is True)*
* `sep`: delimiter *(default value is ",")*

In [26]:
female_customers_sorted_df.to_csv(f"./datasets/female_customers.csv", header=True, index=False, sep=",")

In [27]:
file_exists = os.path.exists("./datasets/female_customers.csv")
print(file_exists)

True


*Load transformed data to a JSON file*

*Load transformed data to a PostgreSQL database*