In [None]:
import pandas as pd
import os
import pyodbc
from sqlalchemy import create_engine
from sqlalchemy.exc import IntegrityError
import urllib
import traceback
import hashlib

In [None]:
# Function to load CSV files into the bronze layer
params = urllib.parse.quote_plus(r'Driver={ODBC Driver 17 for SQL Server};Server=tcp:{SQL},1433;Database=demo;UID={UID};PWD={PWD};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;')
conn_str = 'mssql+pyodbc:///?odbc_connect={}'.format(params)
conn = create_engine(conn_str,echo=True)

def log_load (table_name,file_name,row_count,elt_load_timestamp) :
    df2 = pd.DataFrame({
    'table_name':[f'{table_name}'],
    'file_name':[f'{file_name}'],
    'row_count':[f'{row_count}'],
    'elt_load_timestamp' : [f'{elt_load_timestamp}']
    })
    df2.to_sql('elt_load_table',conn, if_exists='append', index=False, schema = 'aux')

def data_profiling(df, filename):
    profiling_info = []
    
    for col in df.columns:
        data_type = str(df[col].dtype)
        unique_values = df[col].nunique()
        missing_values = df[col].isnull().sum()
        
        inconsistent_values = []
        first_value_type = df[col].dropna().iloc[0].__class__
        for value in df[col]:
            if not isinstance(value, first_value_type):
                inconsistent_values.append(str(value))
        
        profiling_info.append({'filename': filename, 'Column': col, 'Data_Type': data_type, 'Unique_Values': unique_values, 'Missing_Values': missing_values, 'Inconsistent_Value': '::'.join(inconsistent_values)})
    
    pd.DataFrame(profiling_info).to_sql('data_profiling', conn, if_exists='append', index=False, schema='aux')


def insert_records(df,table_name,conn,schema):
    try:
        df.to_sql(table_name, conn, index=False, if_exists='append', schema= schema)
    except Exception as e:
        # Log the error message and traceback
        error_message = str(e)
        error_traceback = traceback.format_exc()
        # Insert the failed record into the error table
        error_df = pd.DataFrame({'error_message': [error_message]})
        error_df = pd.concat([error_df, df], axis=1)
        error_df.to_sql(table_name + '_error_log', conn, index=False, if_exists='append', schema='aux')


#cleaning columns and generating surrogate_keys
def preprocess_string(string_or_int):
    if isinstance(string_or_int, int):
        string_or_int = str(string_or_int)
    return string_or_int.lower().strip().replace('-', '')

def consistent_hash(string_or_int, limit):
    preprocessed_string = preprocess_string(string_or_int)
    hash_object = hashlib.sha256(preprocessed_string.encode())
    hash_integer = int(hash_object.hexdigest(), 16)
    return hash_integer % limit

In [None]:
#Load CSV files to bronze layer
def load_csv_files(csv_dir):
    log_table_query = "SELECT DISTINCT file_name FROM aux.elt_load_table"
    loaded_files = pd.read_sql(log_table_query, conn)['file_name'].tolist()
    # Iterate through CSV files in the directory
    for filename in os.listdir(csv_dir):
        if filename.endswith('.csv') and filename not in loaded_files:
            file_path = os.path.join(csv_dir, filename)
            df = pd.read_csv(file_path,encoding='cp1252', delimiter='|')
            data_profiling(df,filename)
            # Extract date from the file name and make it as monthend
            date_str = filename.split('_')[0]
            end_of_month_date = pd.to_datetime(date_str, format='%Y%m') + pd.offsets.MonthEnd(0)
            datetime_str = filename.split('_Orders_')[1].split('.csv')[0]
            filecreationtimestamp = pd.to_datetime(datetime_str, format='%Y_%m_%d_%H_%M_%S')
            time_now = pd.Timestamp.utcnow().strftime('%Y-%m-%d %H:%M:%S')
            
            #Adding Audit Columns in filename
            df.columns = df.columns.str.lower().str.replace(' ', '_').str.replace('-', '_')
            df['elt_file_date'] = end_of_month_date.date()
            df['elt_file_loading_timestamp'] = time_now
            df['elt_file_timestamp'] = filecreationtimestamp
            # Load DataFrame into the bronze layer table
            try:
                df.to_sql('orders', conn, index=False, if_exists='append',schema='bronze')
                log_load('orders',filename,str(len(df)),time_now)
            except pyodbc.DataError as e:
                error_message = str(e)
                if 'data truncation' in error_message.lower() or 'data type mismatch' in error_message.lower():
                    # Log the data truncation or data type mismatch error
                    print(f"Data truncation or data type mismatch error occurred in file: {filename}")
                    # Log the error message
                    print(f"Error message: {error_message}")
                    # Log the number of rows causing the error
                    print(f"Number of rows causing the error: {len(df)}")
                    # Insert the failing records into the error table
                    error_df = df.copy()  # Create a copy of the DataFrame
                    error_df['error_message'] = error_message  # Add a column for the error message
                    error_df['elt_file_name'] = filename  # Add a column for the file name
                    error_df.to_sql('error_table', conn, index=False, if_exists='append', schema='aux')
                    print("Failing records inserted into error table.")
                else:
                    # Handle other data-related errors
                    print(f"Data-related error occurred in file: {filename}")
                    print(f"Error message: {error_message}")
            except Exception as e:
                # Handle other exceptions
                print(f"Error occurred while processing file: {filename}")
                print(f"Error message: {str(e)}")

            
# Directory containing CSV files
csv_dir = r'{Source_Location}'load_csv_files(csv_dir)

In [None]:
import great_expectations as ge
import json
query = "SELECT * FROM demo.bronze.orders_bkp;"
df = pd.read_sql(query, conn)

In [None]:
df_ge = ge.dataset.PandasDataset(df)
uniqueness = df_ge.expect_compound_columns_to_be_unique(["row_id","elt_file_timestamp"],result_format={"result_format": "COMPLETE","unexpected_index_column_names": ["row_id"],"return_unexpected_index_query": True})
value_not_null = df_ge.expect_column_values_to_not_be_null('row_id',result_format={"result_format": "COMPLETE","unexpected_index_column_names": ["row_id"],"return_unexpected_index_query": True})
order_date = df_ge.expect_column_values_to_match_regex(column='order_date', regex=r'\d{2}-\d{2}-\d{4}', result_format={"result_format": "COMPLETE","unexpected_index_column_names": ["row_id"],"return_unexpected_index_query": True})
ship_date = df_ge.expect_column_values_to_match_regex(column='ship_date', regex=r'\d{2}-\d{2}-\d{4}', result_format={"result_format": "COMPLETE","unexpected_index_column_names": ["row_id"],"return_unexpected_index_query": True})
customer_id = df_ge.expect_column_values_to_match_regex(column='customer_id', regex=r'^[A-Za-z]{2}-\d{5}$', result_format={"result_format": "COMPLETE","unexpected_index_column_names": ["row_id"],"return_unexpected_index_query": True})

In [None]:
from great_expectations.data_context.types.resource_identifiers import ValidationResultIdentifier

context = ge.data_context.DataContext()
expectation_suite = df_ge.get_expectation_suite()
context.save_expectation_suite(expectation_suite)
validation_results = df_ge.validate()

context.build_data_docs()
print(context.get_docs_sites_urls())

In [None]:
import pandas as pd
from datetime import datetime
from sqlalchemy import text

# Load Customer Data
query = """
    WITH CUST AS (
        SELECT DISTINCT 
            customer_id,
            customer_name,
            segment,
            elt_file_date,
            ROW_NUMBER () OVER (PARTITION BY TRIM(customer_id), TRIM(customer_name) ORDER BY elt_file_date DESC) RK 
        FROM demo.bronze.orders
    ) 
    SELECT 
        TRIM(customer_id) customer_id,
        TRIM(customer_name) customer_name,
        TRIM(segment) segment,
        elt_file_date 
    FROM CUST 
    WHERE RK = 1
"""
customers = pd.read_sql(query, conn)

query = "SELECT * FROM silver.customer"
existing_customers = pd.read_sql(query, conn)
existing_customers_filtered = existing_customers.copy()

# Iterate through each customer
for index, customer in customers.iterrows():
    # Check if the customer exists in the database
    existing_customer = existing_customers_filtered[existing_customers_filtered['customer_id'] == customer['customer_id']]
    if len(existing_customer) == 0:
        # If the customer doesn't exist, insert a new record
        sk_customer_id = consistent_hash(customer['customer_id'], 10 ** 9)
        segment_key = consistent_hash(customer['segment'], 10 ** 9)
        valid_from = customer['elt_file_date']
        valid_to = datetime.strptime('9999-01-01', '%Y-%m-%d').strftime('%Y-%m-%d')
        his_flag = 'C'
        elt_load_time = pd.Timestamp.utcnow().strftime('%Y-%m-%d %H:%M:%S')
        
        new_customer_data = {
            'sk_customer_id': sk_customer_id,
            'segment_key': segment_key,
            'customer_id': customer['customer_id'],
            'customer_name': customer['customer_name'],
            'valid_from': valid_from,
            'valid_to': valid_to,
            'his_flag': his_flag,
            'elt_load_time': elt_load_time
        }
        
        # Append new customer data to DataFrame and insert into database
        new_customer_df = pd.DataFrame([new_customer_data])
        #new_customer_df.to_sql('customer', conn, index=False, if_exists='append', schema='silver')
        insert_records(new_customer_df, 'customer', conn,'silver')
    else:
        # If the customer exists, check for changes
        existing_customer = existing_customer.iloc[0] 
        
        if (customer['customer_name'].lower() != existing_customer['customer_name'].lower()) or ((hash(customer['segment']) % (10 ** 9)) != existing_customer['segment_key']):
            # Changes detected, end-date existing record and insert new record
            valid_to = customer['elt_file_date']
            his_flag = 'H' 
            
            # Update existing record in the database
            update_query = f"""
                UPDATE silver.customer 
                SET valid_to = '{valid_to}', his_flag = '{his_flag}' 
                WHERE customer_id = '{customer['customer_id']}' AND his_flag = 'C'
            """
            with conn.connect() as connection:
                connection.execute(text(update_query))
                connection.commit()
            
            # Insert new record into the database
            new_customer_data = {
                'sk_customer_id': consistent_hash(customer['customer_id'], 10 ** 9),
                'segment_key': consistent_hash(customer['segment'], 10 ** 9),
                'customer_id': customer['customer_id'],
                'customer_name': customer['customer_name'],
                'valid_from': customer['elt_file_date'],
                'valid_to': datetime.strptime('9999-01-01', '%Y-%m-%d').strftime('%Y-%m-%d'),
                'his_flag': 'C',
                'elt_load_time': pd.Timestamp.utcnow().strftime('%Y-%m-%d %H:%M:%S')
            }
            # Append new customer data to DataFrame and insert into database
            new_customer_df = pd.DataFrame([new_customer_data])
            #new_customer_df.to_sql('customer', conn, index=False, if_exists='append', schema='silver')
            insert_records(new_customer_df, 'customer', conn,'silver')

In [None]:
# Load into segments using insertloading

query = 'SELECT DISTINCT Segment FROM bronze.orders'
segments = pd.read_sql(query, conn)

silver_query = 'SELECT * FROM silver.segment'
silver_segments = pd.read_sql(silver_query, conn)
segments['sk_segment_id'] = segments['Segment'].apply(lambda x: consistent_hash(str(x), 10 ** 9))

# Find new segment keys that don't exist in the silver table
new_segment_keys = segments[~segments['sk_segment_id'].isin(silver_segments['sk_segment_id'])]

# Prepare DataFrame for new segment keys
new_segments_df = pd.DataFrame({
    'sk_segment_id': new_segment_keys['sk_segment_id'],
    'segment': new_segment_keys['Segment'],
    'elt_load_time': pd.Timestamp.utcnow().strftime('%Y-%m-%d %H:%M:%S')
})

# Insert new segment keys into silver.segments table
if not new_segments_df.empty:
    #new_segments_df.to_sql('segment', conn, if_exists='append', index=False, schema='silver')
    insert_records(new_segments_df, 'segment', conn,'silver')
    


In [None]:
#Load categories

query = 'SELECT DISTINCT category FROM bronze.orders'
category = pd.read_sql(query, conn)

# Fetch from silver.segment
silver_query_category = 'SELECT * FROM silver.category'
silver_category = pd.read_sql(silver_query_category, conn)

# Compute hash for values
category['sk_category_id'] = category['category'].apply(lambda x: consistent_hash(str(x), 10 ** 9))

# Find new keys that don't exist in the silver table
new_category_keys = category[~category['sk_category_id'].isin(silver_category['sk_category_id'])]

# Prepare DataFrame for new keys
new_category_df = pd.DataFrame({
    'sk_category_id': new_category_keys['sk_category_id'],
    'category_name': new_category_keys['category'],
    'elt_load_time': pd.Timestamp.utcnow().strftime('%Y-%m-%d %H:%M:%S')
})

# Insert new keys into silver.segments table
if not new_category_df.empty:
    #new_category_df.to_sql('category', conn, if_exists='append', index=False, schema='silver')
    insert_records(new_category_df, 'category', conn, 'silver')


In [None]:
#Load Sub-categories

querysubcategory = 'SELECT DISTINCT sub_category FROM bronze.orders'
Subcategory = pd.read_sql(querysubcategory, conn)

# Fetch from silver.sub_category
silver_query_Subcategory = 'SELECT * FROM silver.sub_category'
silver_Subcategory = pd.read_sql(silver_query_Subcategory, conn)

# Compute hash for segment values to get sk_segment_id
Subcategory['sk_sub_category_id'] = Subcategory['sub_category'].apply(lambda x: consistent_hash(str(x), 10 ** 9))
new_Subcategory_keys = Subcategory[~Subcategory['sk_sub_category_id'].isin(silver_Subcategory['sk_sub_category_id'])]

# Prepare DataFrame for new keys
new_Subcategory_df = pd.DataFrame({
    'sk_sub_category_id': new_Subcategory_keys['sk_sub_category_id'],
    'sub_category_name': new_Subcategory_keys['sub_category'],
    'elt_load_time': pd.Timestamp.utcnow().strftime('%Y-%m-%d %H:%M:%S')
})

# Insert new keys into silver.segments table
if not new_Subcategory_df.empty:
    #new_Subcategory_df.to_sql('sub_category', conn, if_exists='append', index=False, schema='silver')
    insert_records(new_Subcategory_df, 'sub_category', conn, 'silver')


In [None]:
#Load Shipment Type

queryshipmode = 'SELECT DISTINCT ship_mode FROM bronze.orders'
ShipmentMode = pd.read_sql(queryshipmode, conn)

# Fetch from silver.sub_category
silver_ShipmentMode = 'SELECT * FROM silver.shipment_mode'
silver_ShipmentMode = pd.read_sql(silver_ShipmentMode, conn)

# Compute hash for segment values to get sk_segment_id
ShipmentMode['sk_shipment_mode_id'] = ShipmentMode['ship_mode'].apply(lambda x: consistent_hash(str(x), 10 ** 9))
new_ShipmentMode_keys = ShipmentMode[~ShipmentMode['sk_shipment_mode_id'].isin(silver_ShipmentMode['sk_shipment_mode_id'])]

# Prepare DataFrame for new keys
new_ShipmentMode_df = pd.DataFrame({
    'sk_shipment_mode_id': new_ShipmentMode_keys['sk_shipment_mode_id'],
    'shipment_type': new_ShipmentMode_keys['ship_mode'],
    'elt_load_time': pd.Timestamp.utcnow().strftime('%Y-%m-%d %H:%M:%S')
})

# Insert new keys into silver.segments table
if not new_ShipmentMode_df.empty:
    #new_ShipmentMode_df.to_sql('shipment_mode', conn, if_exists='append', index=False, schema='silver')
    insert_records(new_ShipmentMode_df, 'shipment_mode', conn, 'silver')

In [None]:
#Laod Location

querylocation = 'SELECT DISTINCT country, city,state,postal_code,region FROM bronze.orders'
location = pd.read_sql(querylocation, conn)

# Fetch from silver.sub_category
silver_location = 'SELECT * FROM silver.location'
silver_location = pd.read_sql(silver_location, conn)

# Compute hash for segment values to get sk_segment_id
location['sk_location_id'] = location['postal_code'].apply(lambda x: consistent_hash(str(x), 10 ** 9))
new_location_keys = location[~location['sk_location_id'].isin(silver_location['sk_location_id'])]

# Prepare DataFrame for new keys
new_location_df = pd.DataFrame({
    'sk_location_id': new_location_keys['sk_location_id'],
    'postal_code': new_location_keys['postal_code'],
    'country': new_location_keys['country'],
    'city': new_location_keys['city'],
    'state': new_location_keys['state'],
    'region': new_location_keys['region'],
    'elt_load_time': pd.Timestamp.utcnow().strftime('%Y-%m-%d %H:%M:%S')
})

# Insert new keys into silver.segments table
if not new_location_df.empty:
    #new_location_df.to_sql('location', conn, if_exists='append', index=False, schema='silver')
    insert_records(new_location_df, 'location', conn, 'silver')


In [None]:
#Load Product SCD2

from sqlalchemy import text
queryproduct = """SELECT * FROM 
                 (SELECT product_id,
                product_name,
                category, 
                sub_category,
                elt_file_date, 
                ROW_NUMBER() OVER (PARTITION BY product_id, product_name ORDER BY elt_file_timestamp DESC) 
                RK FROM demo.bronze.orders WHERE elt_file_timestamp = (SELECT MAX(elt_file_timestamp) FROM demo.bronze.orders)) X WHERE RK = 1 
                """
products = pd.read_sql(queryproduct, conn)
     
silverquery_product = "SELECT * FROM silver.product"
existing_products = pd.read_sql(silverquery_product, conn)
existing_products_filtered = existing_products.copy()

for index, product in products.iterrows():
    # Check if the product exists in the silver.product table
    existing_product = existing_products_filtered[(existing_products_filtered['product_id'] == product['product_id'])]
    if len(existing_product) == 0:    
        sk_product_id = consistent_hash(product['product_id']) % (10 ** 9)
        category_id = consistent_hash(product['category']) % (10 ** 9)
        sub_category_id = consistent_hash(product['sub_category']) % (10 ** 9)
        product_id = product['product_id']
        product_name = product['product_name']
        valid_from = product['elt_file_date']
        valid_to = '9999-01-01'
        his_flag = 'C'
        elt_load_time = pd.Timestamp.utcnow().strftime('%Y-%m-%d %H:%M:%S')
        
        new_product_data = {
            'sk_product_id': sk_product_id,
            'category_id': category_id,
            'product_id': product_id,
            'sub_category_id': sub_category_id,
            'product_name': product_name,
            'valid_from': valid_from,
            'valid_to': valid_to,
            'his_flag': his_flag,
            'elt_load_time': elt_load_time
        }
        
        new_product_df = pd.DataFrame([new_product_data])
        #new_product_df.to_sql('product', conn, index=False, if_exists='append', schema='silver')
        insert_records(new_product_df, 'product', conn, 'silver')
        
    else:
        # If the product exists, update its validity period if there's a change
        existing_product = existing_product.iloc[0]
        if (product['product_name'].lower() != existing_product['product_name'].lower()):
            valid_to = product['elt_file_date']
            his_flag = 'H'
            product_id = product['product_id']
            
            update_query = f"""
            UPDATE silver.product SET valid_to = '{valid_to}', 
            his_flag = '{his_flag}' 
            WHERE product_id = '{product_id}' and valid_to = '9999-01-01'"""
            with conn.connect() as connection:
                connection.execute(text(update_query))
                connection.commit()

            # Insert a new record for the updated product
            sk_product_id = consistent_hash(product['product_id']) % (10 ** 9)
            category_id = consistent_hash(product['category']) % (10 ** 9)
            sub_category_id = consistent_hash(product['sub_category']) % (10 ** 9)
            product_id = product['product_id']
            product_name = product['product_name']
            valid_from = product['elt_file_date']
            valid_to = '9999-01-01'
            his_flag = 'C'
            elt_load_time = pd.Timestamp.utcnow().strftime('%Y-%m-%d %H:%M:%S')
            
            new_product_data = {
                'sk_product_id': sk_product_id,
                'category_id': category_id,
                'product_id': product_id,
                'sub_category_id': sub_category_id,
                'product_name': product_name,
                'valid_from': valid_from,
                'valid_to': valid_to,
                'his_flag': his_flag,
                'elt_load_time': elt_load_time
            }
            
            new_product_df = pd.DataFrame([new_product_data])
            #new_product_df.to_sql('product', conn, index=False, if_exists='append', schema='silver')
            insert_records(new_product_df, 'product', conn, 'silver')

In [None]:
# Load Fact Table

import dateutil.parser
from sqlalchemy import text

Ordersquery = f"""
    SELECT 
        order_id,
        order_date,
        ship_date,
        ship_mode,
        customer_id,
        segment,
        postal_code,
        product_id,
        sales,
        quantity,
        discount,
        profit,
        elt_file_date,
        elt_file_timestamp
    FROM 
        bronze.orders
    WHERE 
        elt_file_timestamp = (SELECT MAX(elt_file_timestamp) FROM bronze.orders)"""

orders = pd.read_sql(Ordersquery, conn)
silverquery = "SELECT * FROM silver.fact_order_transaction"
existing_orders = pd.read_sql(silverquery, conn)
existing_orders_filtered = existing_orders.copy()

for index, order in orders.iterrows():
    existing_order = existing_orders_filtered[
    (existing_orders_filtered['order_id'] == order['order_id']) & 
    (existing_orders_filtered['transaction_month'] == order['elt_file_date']) & 
    (existing_orders_filtered['product_id'] == hash(order['product_id']) % (10 ** 9))
]
    
    if len(existing_order) == 0:
        order_id = order['order_id']
        ship_mode_id = hash(order['ship_mode']) % (10 ** 9)
        location_id = hash(order['postal_code']) % (10 ** 9)
        customer_id = hash(order['customer_id']) % (10 ** 9)
        product_id = hash(order['product_id']) % (10 ** 9)
        order_date = dateutil.parser.parse(order['order_date']).strftime("%Y-%m-%d")
        ship_date = dateutil.parser.parse(order['ship_date']).strftime("%Y-%m-%d")
        sales = order['sales']
        quantity = order['quantity']
        discount = order['discount']
        profit = order['profit']
        transaction_month = order['elt_file_date']
        valid_from = order['elt_file_timestamp']
        valid_to = datetime.strptime('9999-01-01 00:00:00', '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d %H:%M:%S')
        status = ''
        elt_load_time = pd.Timestamp.utcnow().strftime('%Y-%m-%d %H:%M:%S')
        new_order_data = {
            'order_id': order_id,
            'ship_mode_id': ship_mode_id,
            'location_id': location_id,
            'product_id': product_id,
            'customer_id': customer_id,
            'order_date': order_date,
            'ship_date': ship_date,
            'sales': sales,
            'quantity': quantity,
            'discount': discount,
            'profit': profit,
            'transaction_month' :transaction_month,
            'valid_from': valid_from,
            'valid_to': valid_to,
            'status': status,
            'elt_load_time': elt_load_time
        }
        # Append new order data to DataFrame and insert into database
        new_order_df = pd.DataFrame([new_order_data])
        #new_order_df.to_sql('fact_order_transaction', conn, index=False, if_exists='append', schema='silver')
        insert_records(new_order_df, 'fact_order_transaction', conn, 'silver')
    else:
        existing_order = existing_order.iloc[0] 
        if (existing_order['valid_from'] != order['elt_file_timestamp']):
            # Update existing record to 'D'
            existing_orders_filtered.loc[existing_orders_filtered['order_id'] == order['order_id'], 'status'] = 'D'
            
            # Update the database with the updated status
            valid_to = order['elt_file_timestamp']
            status = 'D'
            order_id = order['order_id']
            product_id = hash(order['product_id']) % (10 ** 9)
            transaction_month = order['elt_file_date']
            
            # Update the database with the updated values
            update_query = f"""
            UPDATE silver.fact_order_transaction 
            SET valid_to = '{valid_to}' , status = '{status}' 
            WHERE order_id = '{order_id}' AND product_id = '{product_id}' AND transaction_month = '{transaction_month}'"""
            with conn.connect() as connection:
                connection.execute(text(update_query))
                connection.commit()
            
            # Insert the new record
            order_id = order['order_id']
            ship_mode_id = hash(order['ship_mode']) % (10 ** 9)
            customer_id = hash(order['customer_id']) % (10 ** 9)
            location_id = hash(order['postal_code']) % (10 ** 9)
            product_id = hash(order['product_id']) % (10 ** 9)
            order_date = dateutil.parser.parse(order['order_date']).strftime("%Y-%m-%d")
            ship_date = dateutil.parser.parse(order['ship_date']).strftime("%Y-%m-%d")
            sales = order['sales']
            quantity = order['quantity']
            discount = order['discount']
            profit = order['profit']
            transaction_month = order['elt_file_date']
            valid_from = order['elt_file_timestamp']
            valid_to = datetime.strptime('9999-01-01 00:00:00', '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d %H:%M:%S')
            status = ''
            elt_load_time = pd.Timestamp.utcnow().strftime('%Y-%m-%d %H:%M:%S')
            new_order_data = {
                'order_id': order_id,
                'ship_mode_id': ship_mode_id,
                'customer_id': customer_id,
                'location_id': location_id,
                'product_id': product_id,
                'order_date': order_date,
                'ship_date': ship_date,
                'sales': sales,
                'quantity': quantity,
                'discount': discount,
                'profit': profit,
                'transaction_month' :transaction_month,
                'valid_from': valid_from,
                'valid_to': valid_to,
                'status': status,
                'elt_load_time': elt_load_time
            }
            # Append new order data to DataFrame and insert into database
            new_order_df = pd.DataFrame([new_order_data])
            insert_records(new_order_df, 'fact_order_transaction', conn, 'silver')
            #new_order_df.to_sql('fact_order_transaction', conn, index=False, if_exists='append', schema='silver')
