# Data Pipeline

## Overview

“A simplified ETL process for extracting, transforming, and loading data from AWS S3 to Snowflake, utilizing Amazon RDS, Apache Spark, Databricks, and Delta tables.”



# Execute Utilities Notebook

- This Notebook(Utilities) contains utility functions that manage logging functionalities and handling metadata related to the tables used in the data pipeline.


In [0]:
%run /Projects/ETL/Utilities

# Execute Secrets Notebook

- This notebook (Secrets) contains the required credentials for accessing various services


In [0]:
%run /Projects/ETL/Secrets

# Define the Logger

- The logger is configured to capture and store logs in AWS CloudWatch..
- Log messages include important runtime information, error details, and execution traces to aid in debugging and system analysis.


In [0]:
# Logger Creation and Source Metadata Retrieval
logger = createLogger("Data-PipeLine","general-logger")  
func_logger = createLogger("Function-Logger","function-logger") 
# get source meta data details 
table_definitions , table_names , s3_file_keys = get_source_meta_data()   


# define logger 
func = Logger() 


In [0]:
## import neccessary libraries 
import os
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
import boto3
from io import StringIO 
import json 

#import pyspark methods for etl process 
from pyspark.sql.functions import (
     col , concat ,instr ,length ,substring, split, trim, when, lower ,
     avg , round ,count , format_number,date_format ,
     year , month , weekday,udf ,max , min , sum 
)
from pyspark.sql.types import FloatType ,DecimalType


# DatabaseManager: 
## Manage Database Connections and Data Loading Operations in AWS RDS


In [0]:

class DatabaseManager:
    """
    DatabaseManager class is responsible for managing the connection between Databricks and AWS RDS.
    It provides methods to:
    - Create and manage database connections
    - Load data from S3 into RDS tables
    - Create multiple tables in RDS
    - Handle the lifecycle of the database connection
    """
    global logger , func 
    # Constructor initializes the database connection and cursor
    def __init__(self):
        self._conn, self._conn_str = self.create_connection() # Establishes connection to the database
        self._cursor = self._conn.cursor() # Initializes the cursor for executing SQL queries
    @func.logger
    def create_connection(self):
        """Creates a connection to the RDS database using credentials from environment variables"""
        conn_params = {
            "host": os.getenv("HOST"),
            "port": int(os.getenv("PORT")),
            "database": os.getenv("DBASE"),
            "user": os.getenv("USER"),
            "password": os.getenv("PASSWORD"),
        }
        #Formatting a connection string for SQLAlchemy engine.
        conn_str = "postgresql://{0}:{1}@{2}:{3}/{4}".format(
            os.getenv("USER"),
            os.getenv("PASSWORD"),
            os.getenv("HOST"),
            int(os.getenv("PORT")),
            os.getenv("DBASE"),
        )
        # Tries to establish a connection to the database 
        try:
            conn = psycopg2.connect(**conn_params)
            return conn, conn_str
        except psycopg2.DatabaseError as ex:
            logger.exception(str(ex), exc_info=True) #logs an exception if connection failed 

    @func.logger 
    def set_source(self, tables: tuple, s3_file_keys: tuple) -> None:
        """ Sets the source for loading data: table names and corresponding S3 file keys """ 
        if len(tables) != len(s3_file_keys):
            raise Exception(
                "tables and s3_file_keys  must have the same number of elements."
            )

        self.tables = tables # Stores `Table Names` 
        self.s3_file_keys = s3_file_keys # Stores `S3 file keys` 
    @func.logger 
    def get_db_cursor(self):
        """Returns the current cursor or create new one if not already initialized""" 
        if self._cursor:
            return self._cursor
        self._cursor = self._conn.cursor() # Initialize new cursor 
        return self._cursor
    
    @func.logger
    def load_data_from_s3(self):
        """Load the data from S3 into RDS tables specified in `tables`""" 
        try:
            if all([self.tables, self.s3_file_keys]): 
                s3_client = boto3.client("s3") # Initializes S3 Client 
                engine = create_engine(self._conn_str) # Create SQLAlchemy engine for database 
                with engine.connect() as connection:
                    transaction = connection.begin() # Starts a Transaction 
                    try:
                        for tname, s3_file in zip(self.tables, self.s3_file_keys):
                            #Reads the S3 Object(csv file) and loads into a pandas Dataframe 
                            s3_object = s3_client.get_object(
                                Bucket="ecommerce-data-source", Key=s3_file
                            )
                            data = s3_object["Body"].read().decode("utf-8")
                            df = pd.read_csv(StringIO(data))

                            # Loads data from the DataFrame into the corresponding RDS table
                            df.to_sql(tname, engine, if_exists="append", index=False) 

                            logger.info("Data Loaded in `{0}` Table, source:{1}".format(tname, s3_file))
                            print("Data Loaded in `{0}` Table, source:{1}".format(tname, s3_file))
                        transaction.commit() # Commit the Transaction if all operations are successfull.
                    except Exception as ex:
                        transaction.rollback() # Rollback the transaction in case of error 
                        logger.exception(str(ex), exc_info=True)
                        print(str(ex))
                        raise
            else:
                print("TableNames or S3 URI is Empty")
        except Exception as e:
            logger.exception(str(e), exc_info=True)
            raise
        finally:
            if "engine" in locals(): 
                engine.dispose()  # Disposes the SQLAlchemy engine after use
    @func.logger 
    def create_multiple_tables(self, table_definitions: list) -> None:
        """Create multiple tables in RDS database using the provided table_definitions"""
        try:
            cursor = self.get_db_cursor() # Gets the DB cursor for executing SQL quries
            for table_def in table_definitions:
                cursor.execute(table_def) # Execute the SQL command to create each table
                logger.debug(
                    f"Table `{table_def.split('(')[0].split(' ')[-1]}` Created."
                )
                print(f"Table `{table_def.split('(')[0].split(' ')[-1]}` Created ")
            self._conn.commit()

        except Exception as e:
            self._conn.rollback() #Rollback the transaction in case of error 
            logger.exception(str(ex), exc_info=True) #log the exception 
            raise
    @func.logger 
    def close_connection(self):
        """ Closes the database cursor and connection """
        if self._cursor:
            self._cursor.close()
        if self._conn:
            self._conn.close()

# Manage RDS Database Table Creation and Ingest Data from S3 to PostgreSQL


In [0]:
@func.logger 
def migrate_raw_data_to_rds():
    """
    `migrate_raw_data_to_rds` function to handle the database table creation and data ingestion process from S3 to PostgreSQL (RDS).
    It performs the following steps:
    1. Initializes the DatabaseManager object.
    2. Creates required tables in PostgreSQL.
    3. Configures the source tables and S3 file URIs.
    4. Loads data from S3 to PostgreSQL.
    5. Closes the database connection.
    """
    #Initialize DatabaseManager Object 
    logger.debug("`migrate_raw_data_to_rds()` function execution starting..") 
    try:
        # Instantiate DatabaseManager to manage PostgreSQL connection and operations
        postgres = DatabaseManager() 

        # Create required  tables in RDS database using Predefined SQL definitions
        postgres.create_multiple_tables(table_definitions) 

        logger.debug("Required tables are created in PostgresSQL Database.") 

        # Set the required table names  and s3 file keys for data loading 
        postgres.set_source(table_names,s3_file_keys) 

        logger.debug("TableNames and S3 Resource URI's are Configured.")

        # Load the data from s3 to postgresSQL databasae (RDS)
        postgres.load_data_from_s3() 
        
        logger.debug("Data Loaded into PostgresSQL Database.")
       
        # Closes the PostgreSQL connection 
        postgres.close_connection() 

        logger.debug("Database connection has been closed")
        
    except Exception as ex :
        logger.exception(str(ex),exc_info=True) 
        print(str(ex))
    finally:
        logger.debug("`migrate_raw_data_to_rds()` function execution ending..")
       

if __name__ == "__main__":
   migrate_raw_data_to_rds() 
   pass  

In [0]:
%sql  
-- Create Databse
CREATE DATABASE IF NOT EXISTS ecommerce_db 
LOCATION 'dbfs:/dbfs/ECOMMERCE_DB';


In [0]:

# Fetch product data and broadcast it to all workers
product_prices_df = spark.sql('SELECT DISTINCT product_id, price FROM delta.`dbfs:/dbfs/ECOMMERCE_DB/products_v1`')

# Store product-id and price as dictionary

product_prices = {row['product_id']: row['price'] for row in product_prices_df.collect()}

# Broadcast the dictionary
broadcasted_prices = spark.sparkContext.broadcast(product_prices)

### Custom user defined functions 
@udf(FloatType())
def getTotalCostUdf(product_id: int, qty: int):
    """
    Calculate the total cost for a product based on its price and quantity.
    
    Args:
        product_id (int): The ID of the product.
        qty (int): The quantity of the product purchased.

    Returns:
        float: The total cost (price * qty) or 0.0 if the quantity is zero or the product is not found.
    """
    # Fetch the price from the broadcasted dictionary of prices; default to 0.0 if product_id is missing
    price = float(broadcasted_prices.value.get(product_id, 0.0))
    
    # Return the total cost, or 0.0 if quantity is 0
    return price * qty if qty else 0.0



@udf(FloatType())
def getTotalDiscountUdf(product_id: int, qty: int, discount) -> float:
    """
    Calculate the total discount amount for a product based on its price, quantity, and discount rate.
    
    Args:
        product_id (int): The ID of the product.
        qty (int): The quantity of the product purchased.
        discount (float or str): The discount rate (as a percentage or float value) to be applied.

    Returns:
        float: The total discounted amount or 0.0 if no discount is provided or product is not found.
    """
    # If no discount is provided, return 0.0
    if not discount:
        return 0.0
    
    # Fetch the price from the broadcasted dictionary of prices; default to 0.0 if product_id is missing
    price = float(broadcasted_prices.value.get(product_id, 0.0))
    
    # Calculate the discount amount on the price
    discount_price = price * float(discount)
    
    # Return the total discounted amount (discounted price * qty) or 0.0 if no valid discount
    return discount_price * float(qty) if discount_price else 0.0
 

# ETL Extraction: Read Data from RDS and Store as Delta Tables in Databricks


In [0]:
## Gets the data from RDS database 
@func.logger 
def etl_extraction():
    """Reads data from the RDS database and stores it as Delta tables in Databricks.""" 
    
    #Reads the RDS `Tables` one by one
    def ingest_data_from_rds(tables):
        options = {
            'url' :f'{os.environ["END_POINT"]}', 
            'user' : f'{os.environ["USER"]}',
            'password' : f'{os.environ["PASSWORD"]}',
            'driver' :'org.postgresql.Driver'
        }
        try:
            # Iterate through each table name and load data
            for tname in tables:
                # Read the data from the specified RDS table into a DataFrame
                df = (spark.read
                    .format('jdbc')
                    .options(**options)
                    .option('dbtable',tname).load())
                # Write the Dataframe as Delta tables 
                df.write\
                    .mode('overwrite')\
                    .format('delta')\
                    .saveAsTable('ecommerce_db.'+tname) 
                logger.debug(f'Delta Table {tname} Created.')
                print(f'Delta Table {tname} Created.')
        except Exception as e:
            logger.exception(str(e),exc_info=True)
            raise # re raise the exception 

    ingest_data_from_rds(table_names) 


# ETL Transformation: Reading Data from Delta Tables and Processing It


In [0]:
## data transformation
@func.logger 
def etl_transform_v1():
    """
    Transform the DataFrames from the E-Commerce database.
    This function performs transformations on the following tables:
    1. categories_df
    2. subcategories_df
    3. customers_df
    4. products_df
    5. products_ratings_df
    6. orders_df
    7. order_items_df
    8. returned_products_df
    """

    ### 1.Normalize `categories` table

    categories_df = spark.sql(
        "SELECT * FROM delta. `dbfs:/dbfs/ECOMMERCE_DB/categories`;"
    )  # denormalized table

    # create separate table for subcategory to avoid redundancy
    subcategories_df = categories_df.select(
        "sub_category_id", "sub_category_name", "category_id"
    )
    # remove unnecssary columns from categories table
    categories_df = (
        categories_df.drop("sub_category_id", "sub_category_name")
        .distinct()
        .orderBy("category_id")
    )

    ### 2. transform customers table

    customers_df = spark.sql(
        "SELECT * FROM delta. `dbfs:/dbfs/ECOMMERCE_DB/customers`;"
    )
    # remove duplcate records ,concat first_name and last_name column as full_name ,extract the domain from email
    customers_df = (
        customers_df.distinct()
        .withColumn("full_name", concat(col("first_name"), col("last_name")))
        .withColumn("domain", split("email", "@")[1])
        .drop('first_name','last_name')
    )

    ### 3. transform customer_products_ratings table
    products_ratings_df = spark.sql(
        "SELECT * FROM delta. `dbfs:/dbfs/ECOMMERCE_DB/customers_ratings`;"
    )

    # trim whitespace and standartize review column
    products_ratings_df = products_ratings_df.withColumn(
        "review", lower(trim("review"))
    )

    # calculate average rating for products
    avg_ratings_df = products_ratings_df.groupBy("product_id").agg(
        round(avg("ratings"), 2).alias("avg_rating"),
        count("product_id").alias("reviews_count"),
    )
    # join both dataframes  and # fill default values as 0  for null value in avg_rating column
    products_ratings_df = products_ratings_df.join(
        other=avg_ratings_df, on="product_id", how="left"
    ).fillna({"avg_rating": 0})

    ### 4. transform products table

    products_df = spark.sql(
        "SELECT * FROM delta. `dbfs:/dbfs/ECOMMERCE_DB/products_v1`;"
    )
    # calculate average price for products
    avg_price_df = products_df.groupBy("subcategory_id").agg(
        round(avg("price"), 2).alias("avg_price")
    )
    products_df = (
        products_df.distinct()
        .withColumn("product_category", trim(split("name", "-")[0]))
        .withColumn("description", trim(col("description")))
        .join(avg_price_df, "subcategory_id", "left")
        .withColumn(
            "price_range",
            when(col("price") > col("avg_price"), "High")
            .when(col("price") == col("avg_price"), "Medium")
            .otherwise("Low"),
        )
    )

    ### 5. transform orders table
    orders_df = spark.sql("SELECT * FROM delta. `dbfs:/dbfs/ECOMMERCE_DB/orders`;")
    orders_df = (
        orders_df
        # Remove unnecessary columns and Remove duplicates
        .drop("order_id_surrogate", "campaign_id")
        .distinct()
        # Round 'amount' to 2 decimal places
        .withColumn("amount", round("amount", 2))
        # change order_date format to yyyy-MM-dd
        .withColumn("order_date", date_format("order_date", "yyyy-MM-dd"))
        # Extract year,month , weekday from 'order_date'
        .withColumn("year", year("order_date"))
        .withColumn("month", month("order_date"))
        .withColumn("week_day", weekday("order_date"))
    )

    ### 5. transform order_item table
    order_items_df = spark.sql(
        "SELECT * FROM delta. `dbfs:/dbfs/ECOMMERCE_DB/order_items`;"
    )

    order_items_df = (
        order_items_df.withColumn("subtotal", round("subtotal", 2))
        .withColumn("discount", round("discount", 2))
        .withColumn(
            "gross_total", round(getTotalCostUdf(col("product_id"), col("quantity")), 2)
        )
        .withColumn(
            "discount_amount",
            round(
                getTotalDiscountUdf(
                    col("product_id"), col("quantity"), col("discount")
                ),
                2,
            ),
        )
        .withColumn("net_total", round(col("gross_total") - col("discount_amount"), 2))
    )
    returned_products_df = spark.sql("SELECT * FROM delta. `dbfs:/dbfs/ECOMMERCE_DB/returned_products`;")
    suppliers_df = spark.sql("SELECT * FROM delta. `dbfs:/dbfs/ECOMMERCE_DB/supplier`;")
    payment_methods_df = spark.sql("SELECT * FROM delta. `dbfs:/dbfs/ECOMMERCE_DB/payment_methods`;")
    return(
        categories_df,subcategories_df,customers_df,products_df , 
        products_ratings_df,orders_df, order_items_df ,returned_products_df ,
        suppliers_df , payment_methods_df
    )



# Create a Fact Table Using Transformed DataFrames


In [0]:
@func.logger 
def etl_transform_v2():
    """
    Transform the DataFrames from the E-Commerce database into a fact table.
    This function performs transformations and aggregations on the following tables:
    1. products_df
    2. products_ratings_df
    3. order_items_df
    4. returned_products_df
    """
    # Create initial fact table with product ratings
    fact_v1 = (
        products_df.alias("p_df")
        .join(products_ratings_df.alias("cpr_df"), "product_id", "left")
        .groupBy("p_df.product_id")
        .agg(
            # Average rating
            round(avg("cpr_df.ratings"), 2).alias("average_rating"),
            max("cpr_df.ratings").alias("highest_rating"),
            min("cpr_df.ratings").alias("lowest_rating"),
            count("cpr_df.sentiment").alias("totol_reviews"),
            sum(when(col("cpr_df.sentiment") == "good", 1).otherwise(0)).alias(
                "positive_reviews_count"
            ),
            sum(when(col("cpr_df.sentiment") == "bad", 1).otherwise(0)).alias(
                "negative_reviews_count"
            ),
        )
    )
    # Aggregate sales data from order items
    temp_v1 = (
        fact_v1.alias("p_df")
        .join(order_items_df.alias("oi_df"), "product_id", "left")
        .groupBy("p_df.product_id")
        .agg(
            sum("quantity").alias("total_quantities_sold"),
            round(sum("discount_amount"), 3).alias("total_discount_amount"),
            round(sum("gross_total"), 3).alias("gross_total_amount"),
            round(sum("net_total"), 3).alias("net_total_amount"),
        )
    )
    # Aggregate returned-products data
    temp_v2 = (
        returned_products_df.groupBy("product_id")
        .count()
        .withColumnRenamed("count", "no_of_returns")
        .orderBy("product_id")
    )
    # Create the final fact table by joining all aggregations
    fact_v2 = (
        fact_v1.join(temp_v1, "product_id", "left")
        .join(temp_v2, "product_id", "left")
        .fillna(0) # Fill nulls with 0 for review and sales counts
    )
    return fact_v2

# Validating DataFrames: Handling Nulls and Duplicates


In [0]:
@func.logger
def etl_validation():
    """
    Validate the transformed DataFrames by ensuring there are no duplicate records
    and filling in default values for missing data.
    """
    # Use global variables for transformed DataFrames
    global categories_df, subcategories_df, customers_df, products_df, products_ratings_df, orders_df, order_items_df    

    # Handle missing values 
    customers_df = customers_df.fillna(0).fillna('Unknown')
    categories_df = categories_df.fillna(0).fillna('Not Applicable')
    subcategories_df = subcategories_df.fillna(0).fillna('Not Applicable')
    products_ratings_df = products_ratings_df.fillna(0).fillna('Not Provided')
    products_df = products_df.fillna(0).fillna('Not Applicable')

    orders_df = orders_df.fillna(0).fillna('Not Applicable') 
    orders_items_df =order_items_df.fillna(0).fillna('Not Applicable') 

    # Drop duplcaite records 
    customers_df = customers_df.distinct() 
    categories_df = categories_df.distinct() 
    subcategories_df = subcategories_df.distinct() 
    products_ratings_df = products_ratings_df.distinct() 
    products_df = products_df.distinct() 
    orders_df = orders_df.distinct() 
    orders_items_df =orders_items_df.distinct() 


# ETL Data Loading: Loading DataFrames into Snowflake


In [0]:
@func.logger 
def etl_load_to_snowflake():
    sfTableNames = ['dim_categories','dim_subcategories','dim_orders','dim_order_items','dim_products',
                   'dim_customers', 'dim_product_ratings','dim_returned_products','dim_payment_methods',
                   'dim_suppliers','fact_products_summary'] 
    def df_generators():
        yield categories_df
        yield subcategories_df
        yield orders_df
        yield order_items_df
        yield products_df 
        yield customers_df
        yield products_ratings_df 
        yield returned_products_df 
        yield payment_methods_df 
        yield suppliers_df
        yield fact_products_summary

    # Iterate over the generator 
    try:
        for idx ,df in enumerate(df_generators()): 
            (df 
             .write.format("snowflake")
             .mode("overwrite")
             .options(**_credentials)
             .option('dbtable',sfTableNames[idx])
             .save() 
            )
            logger.debug(f'Table `{sfTableNames[idx]}` Created in Snowflake.')
    except Exception as ex:
        logger.exception(str(ex),exc_info=True) 
        print(ex)
# etl_load_to_snowflake() 

        

DEBUG:Data-PipeLine:Table `dim_categories` Created in Snowflake.
DEBUG:Data-PipeLine:Table `dim_subcategories` Created in Snowflake.
DEBUG:Data-PipeLine:Table `dim_orders` Created in Snowflake.
DEBUG:Data-PipeLine:Table `dim_order_items` Created in Snowflake.
DEBUG:Data-PipeLine:Table `dim_products` Created in Snowflake.
DEBUG:Data-PipeLine:Table `dim_customers` Created in Snowflake.
DEBUG:Data-PipeLine:Table `dim_product_ratings` Created in Snowflake.
DEBUG:Data-PipeLine:Table `dim_returned_products` Created in Snowflake.
DEBUG:Data-PipeLine:Table `dim_payment_methods` Created in Snowflake.
DEBUG:Data-PipeLine:Table `dim_suppliers` Created in Snowflake.
DEBUG:Data-PipeLine:Table `fact_products_summary` Created in Snowflake.


# Main ETL Process: Extract, Transform, Validate, and Load Data into Snowflake


In [0]:
if __name__ == '__main__':
    try: 
        # Main entry point for the ETL process
        logger.debug('Starting ETL process...')

        # 1. Extract the data
        logger.debug('Starting data extraction process...')
        # etl_extraction() 
        logger.info('Data extraction completed successfully.')

        # 2. Perform transformations on the DataFrames
        logger.debug('Starting data transformation (v1)...')
        (categories_df, subcategories_df, customers_df, products_df, 
        products_ratings_df, orders_df, order_items_df, 
        returned_products_df, suppliers_df, payment_methods_df) = etl_transform_v1()
        
        logger.info('Data transformation (v1) completed successfully.')

        # 3. Create the fact table
        logger.debug('Starting fact table creation...')
        fact_products_summary = etl_transform_v2() 
        logger.info('Fact table created successfully.')

        # 4. Validate the processed DataFrames
        logger.debug('Starting data validation...')
        etl_validation()
        logger.info('Data validation completed successfully.')

        # 5. Load the processed data into Snowflake
        logger.debug('Starting data load into Snowflake...')
        etl_load_to_snowflake() 
        logger.info('Data loaded into Snowflake successfully.')

        logger.debug('ETL process completed successfully.')

    except Exception as ex: 
        logger.exception(str(ex),exc_info=True) 
        print(ex)
    finally:
        # Log the function's activities (additional logging for debugging)
        func_logger.debug(json.dumps(func.logs, indent=4))
        func.clear_logs() 



DEBUG:Data-PipeLine:Starting ETL process...
DEBUG:Data-PipeLine:Starting data extraction process...
INFO:Data-PipeLine:Data extraction completed successfully.
DEBUG:Data-PipeLine:Starting data transformation (v1)...
INFO:Data-PipeLine:Data transformation (v1) completed successfully.
DEBUG:Data-PipeLine:Starting fact table creation...
INFO:Data-PipeLine:Fact table created successfully.
DEBUG:Data-PipeLine:Starting data validation...
INFO:Data-PipeLine:Data validation completed successfully.
DEBUG:Data-PipeLine:Starting data load into Snowflake...
DEBUG:Data-PipeLine:Table `dim_categories` Created in Snowflake.
DEBUG:Data-PipeLine:Table `dim_subcategories` Created in Snowflake.
DEBUG:Data-PipeLine:Table `dim_orders` Created in Snowflake.
DEBUG:Data-PipeLine:Table `dim_order_items` Created in Snowflake.
DEBUG:Data-PipeLine:Table `dim_products` Created in Snowflake.
DEBUG:Data-PipeLine:Table `dim_customers` Created in Snowflake.
DEBUG:Data-PipeLine:Table `dim_product_ratings` Created in Sn