In [1]:
import pandas as pd
import MySQLdb
import pygrametl
from pygrametl.datasources import CSVSource, SQLSource, PandasSource
from pygrametl.tables import Dimension, FactTable, SlowlyChangingDimension, TypeOneSlowlyChangingDimension, AccumulatingSnapshotFactTable
import duckdb

In [2]:
def computelag(row, namemapping, updated):
    """The function calculates the difference in days between different events"""
    if 'ship_date' in updated:
        row['shipment_lag'] = (row['ship_date'] - row['order_date']).days
    if 'delivery_date' in updated:
        row['delivery_lag'] = (row['delivery_date'] - row['ship_date']).days


def fact_orders_acc(sourceDatabase, dw_conn_wrapper):

    # Specify the query that will generate the input dataset  
    sql = "select order_num, employee_id,customer_id,product_id,sales_channel_id,currency_code,cast(order_quantity as DECIMAL(18, 2)) as order_quantity,cast(total_cost as DECIMAL(18, 2)) as total_cost,cast(total_price as DECIMAL(18, 2)) as total_price,cast(order_date as date) as order_date,cast(ship_date as date) as ship_date,cast(delivery_date as date) as delivery_date,cast(procure_date as date) as procure_date from sales_order"
    # Pygrametl will automatically rename columns, in case if names of the input columns are different from the result dataset 
    name_mapping = 'order_num', 'employee_id','customer_id','product_id','sales_channel_id','currency_code','order_quantity','total_cost','total_price','order_date','ship_date','delivery_date','procure_date'  
    source = SQLSource(connection = sourceDatabase, query = sql, names = name_mapping)

    
    asft = AccumulatingSnapshotFactTable(
        name = 'fact_sales_acc',  # name of the fact table in the data warehouse 
        keyrefs = ['order_num', 'employee_id', 'customer_id', 'product_id', 'sales_channel_id'], # a sequence of attribute names that constitute the primary key of the fact tables 
        #(i.e., primary keys in the dimension tables that corresponds to foreign keys in the fact table)
        otherrefs = ['order_date', 'ship_date', 'delivery_date', 'procure_date'], # date columns that should be updated
        measures = ['order_quantity', 'total_cost', 'total_price', 'shipment_lag', 'delivery_lag'], # a list of measures
        factexpander = computelag) # calls the computerlag function to computer the lag measures before the row in the fact table is updated

    # Lookup the given row. If that fails, insert it. If found, see if values for attributes in otherrefs or measures have changed and update the found row 
    # if necessary (note that values for attributes in keyrefs are not allowed to change). If an update is necessary and a factexpander is defined, 
    # the row will first be updated with any missing otherrefs/measures and the factexpander will be run on it.
    for row in source:
        asft.ensure(row)

    dw_conn_wrapper.commit()
    dw_conn_wrapper.close()


def main():
    fact_orders_acc(sourceDatabase, dw_conn_wrapper)


if __name__ == '__main__':
    # Connect to salesdb (OLTP) and salesdwh (OLT)
    sourceDatabase = MySQLdb.connect(database = 'salesdb', user = 'user', password = 'password', port = 42333)
    destDatabase = duckdb.connect(r'C:\Users\katep\OneDrive\Desktop\DEV-modeling\assets_scripts\salesdwh.duckdb') # Change the path if you have your sales duckDB somewhere else
    dw_conn_wrapper = pygrametl.ConnectionWrapper(connection = destDatabase)
    main()

