## ETL

### Table public.sales_date

+     transaction_id integer,
+    transactional_date timestamp,
+    product_id character varying,
+    customer_id integer,
+    payment character varying,
+    credit_card bigint,
+    loyalty_card character varying,
+    cost character varying,
+    quantity integer,
+    price numeric,
+    PRIMARY KEY (transaction_id)

## Plan of Attack

1. Connect to database
2. Set variable
3. Staging Layer: Sales Fact Table
4. Staging Layer: Product Dimension Table
5. Core Layer: Product Dimension Table
6. Staging Layer: Payment Dimension Table
7. Core Layer: Payment Dimension Table
8. Core Layer: Sales Fact Table

In [None]:
########################################################################################

# libraries

import pandas as pd
import re
import sqlalchemy
import pandas.io.sql as sqlio
import os
import psycopg2
import numpy as np
import psycopg2.extras as extras
from io import StringIO


########################################################################################
"""
CONNECTION TO DATABASE
"""
# Here you want to change your database, username & password according to your own values
param_dic = {
    "host"      : "localhost",
    "database"  : "DataWarehouseX",
    "user"      : "user",
    "password"  : "password"
}

# Function to conect the database
def connect(params_dic):
    """ Connect to the PostgreSQL database server """
    conn = None
    try:
        # connect to the PostgreSQL server
        print('Connecting to the PostgreSQL database...')
        conn = psycopg2.connect(**params_dic)
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        sys.exit(1) 
    print("Connection successful")
    return conn
conn = connect(param_dic)

####################################################################################################################

"""
SET VARIABLE

The first step is to set a variable. In this case I've decided to use the column transactional date,
as is a foreign key from the source system. Data type of transactional date is timestamp.
I extract the maximum transactional date to use it as the last date that was loaded to the core layer.
"""
def get_LastLoadDate(conn):
    """
    Using cursor.execute() to get the last date from transactional date column
    """
    # query_delta_load = 'SELECT MAX(transactional_date) as LastLoadDate FROM core.sales'
    query_full_load  = "SELECT '1970-01-01 00:00:00' as LastLoadDate"
    cursor = conn.cursor()
    try:
        cursor.execute(query_full_load)
        data = sqlio.read_sql_query(query_full_load, conn)
        conn.commit()
        return data
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("Select max transactional_date done")
    
    cursor.close()
    
# get_LastLoadDate(conn)
# df_lastloaddate = get_LastLoadDate(conn)
# lastloaddate = df_lastloaddate.lastloaddate[0]




########################################################################################
"""
STAGING LAYER: SALES FACT TABLE

Note the incremental condition in stage() function, we say that we want only insert into Staging Sales table those
transactions which have transactional date greater than the variable lastloaddate that we have set previously in
the beggining of this ETL process. This condition makes the "UPSERT".
"""
# A - First Step: Truncate the Staging Sales table
# truncate(conn, '"Staging".sales')

# B - Second Step: Update the Staging Sales table
def stage_sales(conn):
    """
    Using cursor.executemany() to insert the dataframe into Staging layer
    """ 
    query = """
    INSERT INTO "Staging".sales(
                                transaction_id, 
                                transactional_date, 
                                product_id, 
                                customer_id, 
                                payment, 
                                credit_card, 
                                loyalty_card, 
                                cost, 
                                quantity, 
                                price) 
                                    SELECT 
                                        transaction_id, 
                                        transactional_date, 
                                        product_id, 
                                        customer_id, 
                                        payment, 
                                        credit_card, 
                                        loyalty_card, 
                                        cost::NUMERIC, 
                                        quantity, 
                                        price 
                                      FROM 
                                        public.sales
                                      WHERE
                                        transactional_date > '%s';
            """ % lastloaddate
    cursor = conn.cursor()
    try:
        cursor.execute(query)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("stage_sales() done")
    cursor.close()
    
# stage_sales(conn)

####################################################################################################################

"""
STAGING LAYER: PRODUCT DIMENSION TABLE
"""

# A - First step: TRUNCATE
def truncate(conn, table):
    """
    Using cursor.execute() to insert the dataframe
    """
    # query  = "INSERT INTO %s(%s) VALUES(%%s,%%s,%%s,%%s,%%s,%%s) ON CONFLICT DO NOTHING" % (table)
    query = "TRUNCATE TABLE %s" % table
    cursor = conn.cursor()
    try:
        cursor.execute(query)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("truncate done")
    cursor.close()

# truncate(conn, '"Staging".dim_product')

# B - Second step: stage the product dimension
def stage_dim_prod(conn):
    """
    Using cursor.executemany() to insert the dataframe into Staging layer
    """
    query = 'INSERT INTO "Staging".dim_product(product_id, product_name, category, subcategory) SELECT products.* FROM products;'
    cursor = conn.cursor()
    try:
        cursor.execute(query)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("stage() done")
    cursor.close()
    
# stage_dim_prod(conn)


###########################################################################################
"""
CORE LAYER: PRODUCT DIMENSION TABLE

This include a transformation to create the product dimension from the info extracted from the source system.
Considerations:
> The product name field extracted from the source system include brand in brackets.
> The product name field include white spaces and other symbols that we have to eliminate.
"""

# C - Third step: get the stage of the product dimension
def get_stage_dim_prod(conn):
    """
    Using cursor.execute() to insert the dataframe
    """
    # query  = "INSERT INTO %s(%s) VALUES(%%s,%%s,%%s,%%s,%%s,%%s) ON CONFLICT DO NOTHING" % (table)
    # query = "TRUNCATE TABLE %s" % table
    query = 'SELECT * FROM "Staging".dim_product'
    cursor = conn.cursor()
    try:
        cursor.execute(query)
        data = sqlio.read_sql_query(query, conn)
        conn.commit()
        return data
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("get_stage_dim_prod done")
    
    cursor.close()
    
# get_stage(conn)
# dim_product = get_stage(conn)
# dim_product_00 = dim_product.copy()

# C.1 - Eliminate white spaces in product name
# dim_product_00['product_name'] = dim_product_00.apply(lambda row : row[2].strip('\t'), axis=1)
# dim_product_00['product_name'] = dim_product_00.apply(lambda row : row[2].replace('\t', ''), axis=1)


# C.2 - Define split_brand to extract the brand from the product name field in the source system
def split_brand(x):
    try:
        return x[2].split('(', 1)[1].split(')')[0]
    except:
        return 'none'

    
# C.3 - Define split_product_name to extract the name from the product name field in the source system
def split_product_name(x):
    try:
        return re.split(r'[(]',x[2])[0]
    except:
        return 'none'
    
# C.4 - Apply the functions defined previously to the dataframe
# dim_product_00['brand'] = dim_product_00.apply(lambda row : split_brand(row), axis=1)
# dim_product_00['product_name'] = dim_product_00.apply(lambda row : split_product_name(row), axis=1)

# D - Last step: INSERT/UPDATE the core dimension product table.
def update_core_product(conn, df, table):
    """
    Using cursor.executemany() to insert the dataframe
    """
    # Create a list of tupples from the dataframe values
    tuples = [tuple(x) for x in df.to_numpy()]
    # Comma-separated dataframe columns
    cols = ','.join(list(df.columns))
    # SQL quert to execute
    query  = "INSERT INTO %s(%s) VALUES(%%s,%%s,%%s,%%s,%%s,%%s) ON CONFLICT DO NOTHING" % (table, cols)
    cursor = conn.cursor()
    try:
        cursor.executemany(query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("update_core_product() done")
    cursor.close()
    
# update_core_product(conn, dim_product_00, 'core.dim_product')

########################################################################################
"""
STAGING LAYER: PAYMENT DIMENSION TABLE

Note that we do not have a Staging Payments table as we can get the core directly from the Staging Sales table.
"""

def get_stage_payment(conn):
    """
    Using cursor.execute() to select from "Staging".sales
    """
    query = """
    SELECT DISTINCT
        COALESCE(payment, 'cash') as payment,
        loyalty_card
      FROM
        "Staging".sales;
    """
    cursor = conn.cursor()
    try:
        cursor.execute(query)
        data = sqlio.read_sql_query(query, conn)
        conn.commit()
        return data
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("Stage payment done")
    
    cursor.close()

# get_stage_payment = get_stage_payment(conn)
# dim_payment = get_stage_payment.copy()

"""
CORE LAYER: PAYMENT DIMENSION TABLE
"""

def update_core_dimpayment(conn, df, table):
    """
    Using cursor.executemany() to insert the dataframe
    """
    # Create a list of tupples from the dataframe values
    tuples = [tuple(x) for x in df.to_numpy()]
    # Comma-separated dataframe columns
    cols = ','.join(list(df.columns))
    # SQL quert to execute
    query  = "INSERT INTO %s(%s) VALUES(%%s,%%s) ON CONFLICT DO NOTHING" % (table, cols)
    cursor = conn.cursor()
    try:
        cursor.executemany(query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return "Error"
    print("update_core_dimpayment() done")
    cursor.close()


# update_core_dimpayment(conn, dim_payment, 'core.dim_payment')


########################################################################################

"""
STAGING LAYER: SALES FACT TABLE
"""
# A - First Step: Transform the data to get total price, total cost and profit columns.
def get_stage_sales(conn):
    """
    Using cursor.execute() to select from "Staging".sales
    """
    # query = 'SELECT * FROM "Staging".sales'
    
    query = """
     WITH getsales as
        (
        SELECT 
            transaction_id ,
            transactional_date ,
            EXTRACT(year from transactional_date)*10000 + EXTRACT('month' from transactional_date)*100+EXTRACT('day' from transactional_date)as 	transactional_date_fk,
            f.product_id ,
            p.product_PK as product_FK,
            payment_PK as payment_FK,
            customer_id ,
            credit_card ,
            cost  ,
            quantity ,
            price,
            quantity * price as total_price,
            quantity * cost as total_cost
        FROM "Staging".sales f
        LEFT JOIN 
        core.dim_payment d
        ON d.payment = COALESCE(f.payment,'cash') AND d.loyalty_card=f.loyalty_card
        LEFT JOIN core.dim_product p on p.product_id=f.product_id
        order by transaction_id
        )
        SELECT 
            getsales.*,
            total_price - total_cost as profit
          FROM getsales;
     """
    
    cursor = conn.cursor()
    try:
        cursor.execute(query)
        data = sqlio.read_sql_query(query, conn)
        conn.commit()
        return data
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("Stage sales done")
    
    cursor.close()


# get_stage_sales = get_stage_sales(conn)
# staging_sales = get_stage_sales.copy()

"""
CORE LAYER: SALES FACT TABLE
"""

# A - First Step: Update the core table
def update_core_sales(conn, df, table):
    """
    Using cursor.executemany() to insert the dataframe
    """
    # Create a list of tupples from the dataframe values
    tuples = [tuple(x) for x in df.to_numpy()]
    # Comma-separated dataframe columns
    cols = ','.join(list(df.columns))
    # SQL quert to execute
    query  = "INSERT INTO %s(%s) VALUES(%%s,%%s,%%s,%%s,%%s,%%s,%%s,%%s,%%s,%%s,%%s,%%s,%%s,%%s) ON CONFLICT DO NOTHING" % (table, cols)
    cursor = conn.cursor()
    try:
        cursor.executemany(query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return "Error"
    print("execute_many() done")
    cursor.close()


# update_core_sales(conn, staging_sales, 'core.sales')

In [None]:
conn = connect(param_dic)

get_LastLoadDate(conn)
df_lastloaddate = get_LastLoadDate(conn)
lastloaddate = df_lastloaddate.lastloaddate[0]

truncate(conn, '"Staging".dim_product')

stage_dim_prod(conn)

get_stage_dim_prod(conn)
dim_product = get_stage_dim_prod(conn)
dim_product_00 = dim_product.copy()

# C.1 - Eliminate white spaces in product name
dim_product_00['product_name'] = dim_product_00.apply(lambda row : row[2].strip('\t'), axis=1)
dim_product_00['product_name'] = dim_product_00.apply(lambda row : row[2].replace('\t', ''), axis=1)

# C.4 - Apply the functions defined previously to the dataframe
dim_product_00['brand'] = dim_product_00.apply(lambda row : split_brand(row), axis=1)
dim_product_00['product_name'] = dim_product_00.apply(lambda row : split_product_name(row), axis=1)

update_core_product(conn, dim_product_00, 'core.dim_product')

# A - First Step: Truncate the Staging Sales table
truncate(conn, '"Staging".sales')

# B - Second Step: Update the Staging Sales table
stage_sales(conn)

# Payment
get_stage_payment = get_stage_payment(conn)
dim_payment = get_stage_payment.copy()

update_core_dimpayment(conn, dim_payment, 'core.dim_payment')

# CORE SALES
get_stage_sales = get_stage_sales(conn)
staging_sales = get_stage_sales.copy()

update_core_sales(conn, staging_sales, 'core.sales')