In [1]:
# import required libraries
import pandas as pd
import json
import psycopg2
from sqlalchemy import create_engine, text
from sqlalchemy.exc import OperationalError
import urllib.parse
from datetime import datetime
import os

In [2]:
# Versions
# Python => 3.11.7 # Pandas => 2.1.4 # hdbcli => 2.20.15 # psycopg => 3.1.18 # sqlalchemy.2.0.25 # sqlalchemy-hdbcli

# Content

<ul>
        <li>1. Database Connection Functions
            <ul>
                <li>1.1. Function - SAP Connection</li>
                <li>1.2. Function - PostgreSQL Connection</li>
            </ul>
        </li>
        <li>2. Extract Data from SAP
            <ul>
                <li>2.1. Function - Extract from SAP</li>
                <li>2.2. Set up Staging File Paths</li>
                <li>2.3. Define SQL Queries to Extract Data from SAP</li>
                <li>2.4. Run Extract SQL Queries to Extract Data from SAP</li>
            </ul>
        </li>
        <li>3. Load Data into PostgreSQL
            <ul>
                <li>3.1. Function - Insert Dimension Tables into PostgreSQL</li>
                <li>3.2. Function - Insert Fact Table into PostgreSQL</li>
                <li>3.3. Create a Composite Key Column to Detect Duplicate Rows in AR</li>
                <li>3.4. Load Dimension Tables and Fact Table into PostgreSQL</li>
            </ul>
        </li>
    </ul>

## 1. Database Connection Functions ##

### 1.1. Function - SAP connection ###

In [3]:
def connect_to_sap():
    """Used to connect to SAP Database"""
    
    # Read SAP connection parameters from text file
    with open("D:\\Data Warehousing\\AR HWM\\Credentials\\sap_credentials.txt", "r") as file:
        lines_sap = [line.strip() for line in file.readlines()]

    host, port, user, password = lines_sap[0], int(lines_sap[1]), lines_sap[2], lines_sap[3]
    escaped_password = urllib.parse.quote_plus(password) # Escape special characters such as @, #, $ in the password
    
    # Create connection string to use with sqlalchemy engine
    sap_connection_string = "hana+hdbcli://{}:{}@{}:{}/?encrypt=encrypt=true&sslValidateCertificate=false".format(user, escaped_password, host, port)

    # Create SQLAlchemy engine as connection
    connection = create_engine(sap_connection_string)
    # Check if the connection is successful
    try:
        connection.connect()
        print("SAP Connection successful!")
    except OperationalError as e:
        print("Error connecting to SAP database:", e)

    return connection

### 1.2. Function - PostgreSQL Connection ###

In [4]:
def connect_to_pgsql():
    """Used to connect to PostgreSQL database"""
    
    # Read PostgreSQL connection parameters from text
    with open("D:\\Data Warehousing\\AR HWM\\Credentials\\pgsql_credentials.txt", "r") as file:
        lines_pgsql = [line.strip() for line in file.readlines()]

    host, port, database, username, password = lines_pgsql[0], lines_pgsql[1], lines_pgsql[2], lines_pgsql[3], lines_pgsql[4]

    # Establish PostgreSQL connection
    try:
        connection = psycopg2.connect(database=database,
                                     host=host,
                                     port=port,
                                     user=username,
                                     password=password)
        print("PostgreSQL Connection successful!")
    except psycopg2.Error as e:
        print("PostgreSQL Connection failed!", e)

    return connection

## 2. Extract data from SAP ##

#### 2.1. Function - Extract from SAP

In [5]:
def extract_from_sap(connection, extract_query, output_folder, output_filename):
    """
        Extracts data from SAP using a specified SQL query and saves the result as a CSV file.
    """

    # Default folder path for sql files
    sql_folder_path = "D:\\Data Warehousing\\AR HWM\\SQL\\"
    
    # Define the completed sql file path
    sql_file_path = os.path.join(sql_folder_path, extract_query)
    
    # Read query and extract data
    with open(sql_file_path, "r") as file:
        extract_item_query = file.read()
    df = pd.read_sql_query(extract_item_query, connection)

    # Solve FNRC code missing issues in Customer
    if 'u_fnrc' in df.columns:
        df['u_fnrc'] = df['u_fnrc'].fillna(0).astype(int) # fnrc code missing issue in customer table

    # Check ARCM query and transform data
    if extract_query == "Extract_F_ARCM_HWM.sql" or extract_query == "Extract_F_ARLM_HWM.sql":
        
        df = df[["CustomerCode", "SellingPriceList", "SeriesName", "Document Entry", "Document Number", 
                 "LineNum", "Sale Employee Code", "PostingDate", "DueDate", "DocumentDate", "InvoiceType", 
                 "ItemCode", "Quantity", "Status", "Price", "LineTotal", "Document Discount", "DiscountSum", 
                 "RowDiscountPercentage", "Document Total", "Warehouse Code", "BPBranch", "Branch", "Department", 
                 "AgentCode", "AgentCommissionAmount", "Project", "Currency", "Gross Profit", "Line Discount total", 
                 "GrossTotal", "Stock Price", "Stock Value", "Item Last Sale Price", "Gross Profit(Item Cost)", 
                 "TotalBasePrice(LPP)", "TotalGrossProfit(LPP)", "GLAccount Code", "CogsAccount Code"]] ## Get required columns
    
        # Rename Branch as SaleType
        df = df.rename(columns={"Branch":"SaleType"}) # Mismatched column name in ARCM
    
    # Define output file path
    output_file_path = os.path.join(output_folder, output_filename)

    # Export as CSV file
    df.to_csv(output_file_path, index=False)

    print(f"Data extracted successfully and saved to: {output_file_path} at {datetime.now()}")

#### 2.2. Set up Staging file paths ####

In [6]:
# File Paths for Initial Stage (csv files output)
dim_folder_path = "D:\\Data Warehousing\\AR HWM\\Dim\\"
fact_folder_path = "D:\\Data Warehousing\\AR HWM\\Fact\\"
#sql_folder_path = "D:\\Data Warehousing\\AR HWM\\SQL\\"

#### 2.3. Define SQL queries to extract data from SAP 

In [7]:
# Data Extract Queries (SAP)
# Dim
extract_agent_query = "Extract_D_Agent_HWM.sql"
extract_branch_query = "Extract_D_Branch_HWM.sql"
extract_cogsaccount_query = "Extract_D_COGSAccount_HWM.sql"
extract_customer_query = "Extract_D_Customer_HWM.sql"
extract_department_query = "Extract_D_Department_HWM.sql"
extract_glaccount_query = "Extract_D_GLAccount_HWM.sql"
extract_item_query = "Extract_D_Item_HWM.sql"
extract_saleemployee_query = "Extract_D_SaleEmployee_HWM.sql"
extract_saletype_query = "Extract_D_SaleType_HWM.sql"
extract_warehouse_query = "Extract_D_Warehouse_HWM.sql"
# Fact
extract_arcm_query = "Extract_F_ARCM_HWM.sql"
extract_arlm_query = "Extract_F_ARLM_HWM.sql"

#### 2.4. Run extract SQL quereis to extract data from SAP

In [8]:
# Establish SAP Database connection
conn_sap = connect_to_sap()
try:
    # Example calls to extract_from_sap function
    # Dim Tables
    extract_from_sap(conn_sap, extract_agent_query, dim_folder_path, "D_Agent.csv")
    extract_from_sap(conn_sap, extract_branch_query, dim_folder_path, "D_Branch.csv")
    extract_from_sap(conn_sap, extract_cogsaccount_query, dim_folder_path, "D_COGSAccount.csv")
    extract_from_sap(conn_sap, extract_customer_query, dim_folder_path, "D_Customer.csv")
    extract_from_sap(conn_sap, extract_department_query, dim_folder_path, "D_Department.csv")
    extract_from_sap(conn_sap, extract_glaccount_query, dim_folder_path, "D_GLAccount.csv")
    extract_from_sap(conn_sap, extract_item_query, dim_folder_path, "D_Item.csv")
    extract_from_sap(conn_sap, extract_saletype_query, dim_folder_path, "D_SaleType.csv")
    extract_from_sap(conn_sap, extract_warehouse_query, dim_folder_path, "D_Warehouse.csv")
    # Fact Table (Current Month, Last Month)
    extract_from_sap(conn_sap, extract_arcm_query, fact_folder_path, "F_AR_CM.csv")
    extract_from_sap(conn_sap, extract_arlm_query, fact_folder_path, "F_AR_LM.csv")

except Exception as e:
    print("An error occurred during extraction:", e)

finally:
    # Close database connection
    conn_sap.dispose()

SAP Connection successful!
Data extracted successfully and saved to: D:\Data Warehousing\AR HWM\Dim\D_Agent.csv at 2024-06-06 10:28:38.718963
Data extracted successfully and saved to: D:\Data Warehousing\AR HWM\Dim\D_Branch.csv at 2024-06-06 10:28:38.732973
Data extracted successfully and saved to: D:\Data Warehousing\AR HWM\Dim\D_COGSAccount.csv at 2024-06-06 10:28:38.749965
Data extracted successfully and saved to: D:\Data Warehousing\AR HWM\Dim\D_Customer.csv at 2024-06-06 10:28:38.820289
Data extracted successfully and saved to: D:\Data Warehousing\AR HWM\Dim\D_Department.csv at 2024-06-06 10:28:38.835916
Data extracted successfully and saved to: D:\Data Warehousing\AR HWM\Dim\D_GLAccount.csv at 2024-06-06 10:28:38.861717
Data extracted successfully and saved to: D:\Data Warehousing\AR HWM\Dim\D_Item.csv at 2024-06-06 10:28:39.103053
Data extracted successfully and saved to: D:\Data Warehousing\AR HWM\Dim\D_SaleType.csv at 2024-06-06 10:28:39.119053
Data extracted successfully and 

## 3. Load data into PostgreSQL ##

#### 3.1. Function - Insert Dimension Tables into PostgreSQL

In [9]:
def insert_dim_into_pgsql(table_name):
    """Insert dimension data from csv files into PostgreSQL"""

    # Define the DELETE command to delete old data in dimensional tables
    delete_data_sql = f'DELETE FROM "AR_HWM"."{table_name}";'

    # Define the COPY command to insert new data from csv files into dimensional tables
    copy_sql = f'COPY "AR_HWM"."{table_name}" FROM stdin WITH CSV HEADER;'
    
    # Define the CSV filepath and filename to get data from this file
    file_path = "D:\\Data Warehousing\\AR HWM\\Dim\\" + table_name + ".csv"
    
    with open(file_path, 'r', encoding='utf-8') as f:
        # Execute the DELETE command to delete old data
        cursor_pgsql.execute(delete_data_sql)
        # Execute the COPY command to insert new data
        cursor_pgsql.copy_expert(sql=copy_sql, file=f)
        # Execute UPDATE Time query to record the updated time of dimensional tables
        cursor_pgsql.execute(f'UPDATE "AR_HWM"."{table_name}" SET "UpdatedAt" = CURRENT_TIMESTAMP;')

    print(f"{table_name} Data is inserted successfully into postgresql at {datetime.now()}")

#### 3.2. Function - Insert Fact Table into PostgreSQL

In [10]:
def insert_fact_into_pgsql(table_name):
    """ Insert fact data (AR) from csv file into PostgreSQL"""

    # Create a temporary AR table to avoid duplicate rows insertion
    tem_table_query = """
        DROP TABLE IF EXISTS "temp_f_ar_hwm";
    
        CREATE TEMPORARY TABLE "temp_f_ar_hwm" (
        "CustomerCode" VARCHAR(50),
        "SellingPriceList" VARCHAR(50),
        "SeriesName" VARCHAR(50),
        "DocumentEntry" BIGINT,
        "DocumentNumber" BIGINT,
        "LineNum" INT,
        "SaleEmployeeCode" INT,
        "PostingDate" TIMESTAMP,
        "DueDate" TIMESTAMP,
        "DocumentDate" TIMESTAMP,
        "InvoiceType" VARCHAR(50),
        "ItemCode" VARCHAR(50),
        "Quantity" DECIMAL(10, 4),
        "Status" VARCHAR(50),
        "Price" DECIMAL(15, 4),  -- 15 digits total, 4 decimal places
        "LineTotal" DECIMAL(20, 4),  -- 20 digits total, 4 decimal places
        "DocumentDiscount" DECIMAL(15, 4),  -- 15 digits total, 4 decimal places
        "DiscountSum" DECIMAL(15, 4),  -- 15 digits total, 4 decimal places
        "RowDiscountPercentage" DECIMAL(7, 4),  -- 7 digits total, 4 decimal places
        "DocumentTotal" DECIMAL(25, 4),  -- 25 digits total, 4 decimal places
        "WarehouseCode" VARCHAR(50),
        "BPBranch" VARCHAR(50),
        "SaleType" VARCHAR(50),
        "Department" VARCHAR(50),
        "AgentCode" VARCHAR(50),
        "AgentCommissionAmount" DECIMAL(20, 4),  -- 20 digits total, 4 decimal places
        "Project" VARCHAR(50),
        "Currency" VARCHAR(10),
        "GrossProfit" DECIMAL(20, 4),  -- 20 digits total, 4 decimal places
        "LineDiscountTotal" DECIMAL(20, 4),  -- 20 digits total, 4 decimal places
        "GrossTotal" DECIMAL(25, 4),  -- 25 digits total, 4 decimal places
        "StockPrice" DECIMAL(15, 4),  -- 15 digits total, 4 decimal places
        "StockValue" DECIMAL(25, 4),  -- 25 digits total, 4 decimal places
        "ItemLastSalePrice" DECIMAL(15, 4),  -- 15 digits total, 4 decimal places
        "GrossProfitItemCost" DECIMAL(25, 4),  -- 25 digits total, 4 decimal places
        "TotalBasePriceLPP" DECIMAL(25, 4),  -- 25 digits total, 4 decimal places
        "TotalGrossProfitLPP" DECIMAL(25, 4),  -- 25 digits total, 4 decimal places
        "GLAccountCode" BIGINT,
        "CogsAccountCode" BIGINT,
        "Key" VARCHAR(500)
    );"""
    
      
    # Copy data command to the temporay table
    copy_to_tem_table_command = """COPY "temp_f_ar_hwm" FROM stdin WITH CSV HEADER;"""


    # Insert into main AR table
    insert_to_main_f_table_query = f"""
    INSERT INTO "AR_HWM"."F_AR" 
    SELECT
    *
    FROM "temp_f_ar_hwm"
    ON CONFLICT("Key") DO NOTHING;
    """
    
    # Open the CSV file
    file_path = "D:\\Data Warehousing\\AR HWM\\Fact\\" + table_name + ".csv"
    
    with open(file_path, 'r', encoding='utf-8') as f:
        # Execute the temptable command
        cursor_pgsql.execute(tem_table_query)
        # Execute the COPY command
        cursor_pgsql.copy_expert(sql=copy_to_tem_table_command, file=f)
        # Execute the Insert command (main fact table)
        cursor_pgsql.execute(insert_to_main_f_table_query)

    print(f"{table_name} Data is inserted successfully into postgresql at {datetime.now()}")

#### 3.3. Create a composite key column to detect duplicate rows in AR ####

In [11]:
def combine_columns(row):
    return f"{row['CustomerCode']} {row['SellingPriceList']} {row['SeriesName']} {row['Document Entry']} {row['Document Number']}{row['LineNum']}{row['ItemCode']}"

In [12]:
def add_key(table_name):
    filepath = f"D:\\Data Warehousing\\AR HWM\\Fact\\{table_name}.csv"
    df = pd.read_csv(filepath)
    df['Key'] = df.apply(combine_columns, axis=1)
    df.to_csv(f"D:\\Data Warehousing\\AR HWM\\Fact\\{table_name}.csv", index=False)
    df.to_csv(f"D:\\Data Warehousing\\AR HWM\\Fact\\{table_name}added_key.csv", index=False)

#### 3.4. Load Dimension Tables and Fact Table into PostgreSQL

In [13]:
# Create pgsql connection and cursor
conn_pgsql = connect_to_pgsql()
cursor_pgsql = conn_pgsql.cursor()

# Run the COPY command to import data from CSV
try:
    #Dim
    for file in os.listdir("D:\\Data Warehousing\\AR HWM\\Dim"):
        table_name = file.split(".")[0]
        insert_dim_into_pgsql(table_name)
        # Commit the transaction
        conn_pgsql.commit()
    print("All Dimensional Data are imported into PostgreSQL database successfully!")


    # current date
    current_date = datetime.now()
    
    #Fact
    if current_date.day < 25:
        # Delete AR Data WHERE PostingDate = Current Month
        cursor_pgsql.execute("""DELETE FROM "AR_HWM"."F_AR" WHERE 
        "PostingDate" = date_trunc('month', current_date)""")
        
        table_name = "F_AR_CM"
        # Add Key column to detect duplicate rows
        add_key(table_name)
        insert_fact_into_pgsql(table_name)
        # Commit the transaction
        conn_pgsql.commit()
        print("Fact Data (Current Month) is imported.")
        print("All Fact Data are imported into PostgreSQL database successfully!")
    else:
        # Delete AR Data WHERE PostingDate=last month
        cursor_pgsql.execute("""DELETE FROM "AR_HWM"."F_AR" WHERE 
        "PostingDate" >= (date_trunc('month', current_date) - INTERVAL '1 month')
        AND "PostingDate" < date_trunc('month', current_date)""")

        # Load AR_LM into PgSQL
        table_name = "F_AR_LM"
        # Add Key column to detect duplicate rows
        add_key(table_name)
        insert_fact_into_pgsql(table_name)
        # Commit the transaction
        conn_pgsql.commit()

        # Load AR_CM into PgSQL
        table_name = "F_AR_CM"
        # Add Key column to detect duplicate rows
        add_key(table_name)
        insert_fact_into_pgsql(table_name)
        # Commit the transaction
        conn_pgsql.commit()

        print("Fact Data (Current Month and Last Month) are imported.")
        print("All Fact Data are imported into PostgreSQL database successfully!")

    
except psycopg2.Error as e:
    print("Error:", e)
finally:
    # Close cursor and connection
    cursor_pgsql.close()
    conn_pgsql.close()

PostgreSQL Connection successful!
D_Agent Data is inserted successfully into postgresql at 2024-06-06 10:28:41.903672
D_Branch Data is inserted successfully into postgresql at 2024-06-06 10:28:42.065433
D_COGSAccount Data is inserted successfully into postgresql at 2024-06-06 10:28:42.232642
D_Customer Data is inserted successfully into postgresql at 2024-06-06 10:28:42.532351
D_Department Data is inserted successfully into postgresql at 2024-06-06 10:28:42.703866
D_GLAccount Data is inserted successfully into postgresql at 2024-06-06 10:28:42.864533
D_Item Data is inserted successfully into postgresql at 2024-06-06 10:28:43.773757
D_SaleType Data is inserted successfully into postgresql at 2024-06-06 10:28:43.964835
D_Warehouse Data is inserted successfully into postgresql at 2024-06-06 10:28:44.126567
All Dimensional Data are imported into PostgreSQL database successfully!
F_AR_CM Data is inserted successfully into postgresql at 2024-06-06 10:28:44.509593
Fact Data (Current Month) is