In [1]:
import re
from pyspark.sql.types import *
import pyspark.sql.functions as sf
from pyspark.sql import Window
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, max
from datetime import date, datetime
import subprocess
import hashlib
import logging

##### Logging

In [2]:
# # Separate pyspark loggers
# pyspark_log = logging.getLogger('pyspark').setLevel(logging.ERROR)
# py4j_logger = logging.getLogger("py4j").setLevel(logging.ERROR)

In [3]:
# # First logger for logging the spark job
# spark_logger = logging.getLogger('SparkJobs')
# spark_logger.setLevel(logging.DEBUG)
# today_date = date.today().isoformat()
# logger_file_name = f'Logs/SparkLogs/SparkJobsLogs-{today_date}.log'
# fh1 = logging.FileHandler(logger_file_name)
# fh1.setLevel(logging.DEBUG)
# fh1.setFormatter(logging.Formatter('%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S'))
# spark_logger.addHandler(fh1)

# # Second logger for reporting some information about the files
# data_logger = logging.getLogger('DataLogs')
# current_time = datetime.now()
# data_logger.setLevel(logging.DEBUG)
# if not os.path.exists(f'Logs/DataLogs/{today_date}'):
#     os.makedirs(f'Logs/DataLogs/{today_date}')
# data_logs_file_name = f'Logs/DataLogs/{today_date}/DataInfo-H-{current_time.hour}.log'
# fh2 = logging.FileHandler(data_logs_file_name)
# fh2.setLevel(logging.DEBUG)
# fh2.setFormatter(logging.Formatter('%(message)s'))
# data_logger.addHandler(fh2)

#### Staring the Session and Setting up pyspark Environment 

In [4]:
spark = SparkSession\
    .builder\
    .master("local[4]")\
    .appName("DataTransformation")\
    .config("spark.eventLog.logBlockUpdates.enabled", True)\
    .config("spark.sql.warehouse.dir", "/storage_layer/gold/")\
    .enableHiveSupport()\
    .getOrCreate()
    # .config ("spark.sql.hive.convertMetastoreOrc","false")\

sc = spark.sparkContext

In [5]:
today_date = date.today().isoformat()
current_time = datetime.now()

#### Some Useful Functions

In [6]:
# a function that takes a df, sk and table_id  name and adds the surrogate to the df
def add_surrogate_key(df, sk_name, table_id, current_max_sk=0):
    df = df.withColumn(sk_name, sf.row_number().over(Window.orderBy(table_id)) + current_max_sk)
    return df

In [7]:
# Function to check if directory is empty or not
def is_directory_empty(dir):
    # creates the directory if it don't exist
    subprocess.getoutput(f'hdfs dfs -mkdir -p {dir}')
    # counts the files in the directory to check if it's empty or not
    files_df = spark.read.format("binaryFile").option("recursiveFileLookup", "true").load(dir)
    file_count = files_df.count()
    return file_count == 0

In [8]:
# the hashing function
def concat_and_hash(*args):
    concatenated_row = ','.join([str(arg) for arg in args])
    return hashlib.sha256(str(concatenated_row).encode('utf-8')).hexdigest()
    
# registering it as a udf to use it later
concat_and_hash_udf = sf.udf(concat_and_hash, StringType())

In [9]:
"""
steps:
1- define the schema
2- read the file
3- create the dir if not exists
4.0- check if the dir is empty
    - if yes:
        a. add sk
        b. reorder the columns for the sk to be first
        4.3- write the final df to the dir
    - if no: # that means that there are files in the dir
        4.1- read the files in the dir as a df
        4.2- get max sk
        4.3- drop the sk column
        4.4- add row level hash to both dataframes 
        4.5- hash compare them and get the new data 
        4.6 - get the max sk from the current data
        4.7- add sk to the new data and reorder the columns
        4.8- append the new data to the files in gold

"""

'\nsteps:\n1- define the schema\n2- read the file\n3- create the dir if not exists\n4.0- check if the dir is empty\n    - if yes:\n        a. add sk\n        b. reorder the columns for the sk to be first\n        4.3- write the final df to the dir\n    - if no: # that means that there are files in the dir\n        4.1- read the files in the dir as a df\n        4.2- get max sk\n        4.3- drop the sk column\n        4.4- add row level hash to both dataframes \n        4.5- hash compare them and get the new data \n        4.6 - get the max sk from the current data\n        4.7- add sk to the new data and reorder the columns\n        4.8- append the new data to the files in gold\n\n'

### The Transformation Algorithm:

1. Define the schema
2. Read the file
3. Create the directory if not exists
4. Check if the directory is empty
   - Yes:
     1. Add SK
     2. Reorder the columns for the SK to be first
     3. Write the final DataFrame to the directory
   - No:
     1. Read the files in the directory as a DataFrame
     2. Get max SK
     3. Drop the SK column
     4. Add row level hash to both DataFrames
     5. Compare hashes to get new data
     6. Get the max SK from the current data
     7. Add SK to the new data and reorder the columns
     8. Append the new data to the files in 'gold'


## The Actual Transformation:

### branch File:

In [10]:
# Required Environment Variables:
db="qcompany"
table = "branch"
table_name = "branch_dim"
schema = f"{table}_schema"
table_key = f"{table}_key"
table_id = f"{table}_id"
table_dir = f"/user/itversity/q-retail-company/gold/{db}.db/{table_name}"


In [11]:
# 1- define the schema
schema = StructType([
    StructField("branch_id", IntegerType(), nullable=False),
    StructField("branch_location", StringType(), nullable=False),
    StructField("branch_establish_date", DateType(), nullable=False),
    StructField("branch_class", StringType(), nullable=False)
])

In [12]:
# 2- read the file
new_data_path = '/user/itversity/q-retail-company/silver/2024-07-10/hour-17/branches_cleaned.csv'
df2_path = '/user/itversity/q-retail-company/silver/2024-07-10/hour-18/branches_cleaned.csv'

new_data_df = spark.read.csv(new_data_path, header=True, schema=schema)
df2 = spark.read.csv(df2_path, header=True, schema=schema)


In [13]:
if is_directory_empty(table_dir):
    print("Empty dir, let's fill it!")
    print("----------------------------------------------------------------------------------")
    print("adding the surrogate key")
    df1sk = add_surrogate_key(new_data_df, table_key, table_id)
    print("----------------------------------------------------------------------------------")
    print("preparing the data to be written")
    df1sk = df1sk.select(f"{table_key}", *[col for col in df1sk.columns if col != f"{table_key}"])
    print("----------------------------------------------------------------------------------")
    print("writing the df as a parquet to hdfs")
    df1sk.write.parquet(table_dir, mode='append')
    print("----------------------------------------------------------------------------------")
    print("\n Done")
else:
    print("Dir is not empty let's hash compare them!")
    current_df_sk = spark.read.parquet(table_dir, header=True, inferSchema=True)
    print(f"reading the data in the current {table_name} data")
    print("----------------------------------------------------------------------------------")
    print("getting the max surrogate key to use later on when appening the new data... ")
    max_sk = current_df_sk.agg(sf.max(table_key)).collect()[0][0]
    print(f"current max {table_key} is {max_sk}, the next {table_key} will be {max_sk+1}")
    print("----------------------------------------------------------------------------------")
    current_df = current_df_sk.drop(table_key)
    new_data_df = new_data_df.drop(table_key)
    new_data_df = new_data_df.drop("hash") # in case we re ran the same code twice
    print(f"hashing current & new data to compare them")
    current_df = current_df.withColumn("hash", concat_and_hash_udf(*[sf.col(column).cast("string") for column in current_df.columns]))
    new_data_df = new_data_df.withColumn("hash", concat_and_hash_udf(*[sf.col(column).cast("string") for column in new_data_df.columns]))
    print("----------------------------------------------------------------------------------")
    new_rows_df = new_data_df.join(current_df, new_data_df.hash == current_df.hash, "left_anti")
    # another medthod 
    # new_rows_df = new_data_df.exceptAll(current_df).show()
    print("comparing hashes and removing the duplicates")
    print("----------------------------------------------------------------------------------")
    print("preparing the net new data to append to the current data ")
    new_rows_df = new_rows_df.drop("hash")
    new_rows_df = add_surrogate_key(new_rows_df, table_key, table_id, max_sk)
    new_rows_df = new_rows_df.select(f"{table_key}", *[col for col in new_rows_df.columns if col != f"{table_key}"])
    print("----------------------------------------------------------------------------------")
    print("writing (Appending) the new data to HDFS")
    new_rows_df.write.parquet(table_dir, mode='append')
    print("----------------------------------------------------------------------------------")
    print("\n Mission Passed... Respect+")

Empty dir, let's fill it!
----------------------------------------------------------------------------------
adding the surrogate key
----------------------------------------------------------------------------------
preparing the data to be written
----------------------------------------------------------------------------------
writing the df as a parquet to hdfs
----------------------------------------------------------------------------------

 Done


In [14]:
# for testing purposes
spark.sql("REFRESH TABLES")
spark.read.parquet(table_dir, header=True, inferSchema=True).show()

+----------+---------+---------------+---------------------+------------+
|branch_key|branch_id|branch_location|branch_establish_date|branch_class|
+----------+---------+---------------+---------------------+------------+
|         1|        1|       New York|           2017-01-15|           A|
|         2|        2|    Los Angeles|           2016-07-28|           B|
|         3|        3|        Chicago|           2015-03-10|           A|
|         4|        4|        Houston|           2016-11-05|           D|
|         5|        5|        Phoenix|           2017-09-20|           C|
+----------+---------+---------------+---------------------+------------+



### agent File:

In [15]:
# Required Environment Variables:
db="qcompany"
table = "agent"
table_name = "agent_dim"
schema = f"{table}_schema"
table_key = f"{table}_key"
table_id = f"{table}_id"
table_dir = f"/user/itversity/q-retail-company/gold/{db}.db/{table_name}"

# del(table_key,table_id, dir )

# dir = f"/user/itversity/q-retail-company/gold/{db}.db/{table}"

In [16]:
# 1- define the schema
schema = StructType([
    StructField("agent_id", IntegerType(), nullable=False),
    StructField("agent_name", StringType(), nullable=False),
    StructField("agent_hire_date", DateType(), nullable=False)
])

In [17]:
# 2- read the file
new_data_path = '/user/itversity/q-retail-company/silver/2024-07-10/hour-17/agent_cleaned.csv'
df2_path = '/user/itversity/q-retail-company/silver/2024-07-10/hour-18/agent_cleaned.csv'

new_data_df = spark.read.csv(new_data_path, header=True, schema=schema)
df2 = spark.read.csv(df2_path, header=True, schema=schema)

new_data_df.show()


+--------+------------------+---------------+
|agent_id|        agent_name|agent_hire_date|
+--------+------------------+---------------+
|       1|          John Doe|     2020-06-03|
|       2|        Jane Smith|     2018-05-13|
|       3|   Michael Johnson|     2021-10-03|
|       4|       Emily Brown|     2020-10-25|
|       5|      David Wilson|     2021-04-08|
|       6|       Emma Taylor|     2019-03-28|
|       7|Christopher Miller|     2020-01-11|
|       8|      Olivia Davis|     2021-10-24|
|       9|   Daniel Martinez|     2018-10-08|
|      10|      Sophia Moore|     2019-05-25|
+--------+------------------+---------------+



In [18]:
if is_directory_empty(table_dir):
    print("Empty dir, let's fill it!")
    print("----------------------------------------------------------------------------------")
    print("adding the surrogate key")
    df1sk = add_surrogate_key(new_data_df, table_key, table_id)
    print("----------------------------------------------------------------------------------")
    print("preparing the data to be written")
    df1sk = df1sk.select(f"{table_key}", *[col for col in df1sk.columns if col != f"{table_key}"])
    print("----------------------------------------------------------------------------------")
    print("writing the df as a parquet to hdfs")
    df1sk.write.parquet(table_dir, mode='append')
    print("----------------------------------------------------------------------------------")
    print("\n Done")
else:
    print("Dir is not empty let's hash compare them!")
    current_df_sk = spark.read.parquet(table_dir, header=True, inferSchema=True)
    print(f"reading the data in the current {table_name} data")
    print("----------------------------------------------------------------------------------")
    print("getting the max surrogate key to use later on when appening the new data... ")
    max_sk = current_df_sk.agg(sf.max(table_key)).collect()[0][0]
    print(f"current max {table_key} is {max_sk}, the next {table_key} will be {max_sk+1}")
    print("----------------------------------------------------------------------------------")
    current_df = current_df_sk.drop(table_key)
    new_data_df = new_data_df.drop(table_key)
    new_data_df = new_data_df.drop("hash") # in case we re ran the same code twice
    print(f"hashing current & new data to compare them")
    current_df = current_df.withColumn("hash", concat_and_hash_udf(*[sf.col(column).cast("string") for column in current_df.columns]))
    new_data_df = new_data_df.withColumn("hash", concat_and_hash_udf(*[sf.col(column).cast("string") for column in new_data_df.columns]))
    print("----------------------------------------------------------------------------------")
    new_rows_df = new_data_df.join(current_df, new_data_df.hash == current_df.hash, "left_anti")
    # another medthod 
    # new_rows_df = new_data_df.exceptAll(current_df).show()
    print("comparing hashes and removing the duplicates")
    print("----------------------------------------------------------------------------------")
    print("preparing the net new data to append to the current data ")
    new_rows_df = new_rows_df.drop("hash")
    new_rows_df = add_surrogate_key(new_rows_df, table_key, table_id, max_sk)
    new_rows_df = new_rows_df.select(f"{table_key}", *[col for col in new_rows_df.columns if col != f"{table_key}"])
    print("----------------------------------------------------------------------------------")
    print("writing (Appending) the new data to HDFS")
    new_rows_df.write.parquet(table_dir, mode='append')
    print("----------------------------------------------------------------------------------")
    print("\n Mission Passed... Respect+")

Empty dir, let's fill it!
----------------------------------------------------------------------------------
adding the surrogate key
----------------------------------------------------------------------------------
preparing the data to be written
----------------------------------------------------------------------------------
writing the df as a parquet to hdfs
----------------------------------------------------------------------------------

 Done


In [19]:
# for testing purposes
spark.sql("REFRESH TABLES")
spark.read.parquet(table_dir, header=True, inferSchema=True).show()

+---------+--------+------------------+---------------+
|agent_key|agent_id|        agent_name|agent_hire_date|
+---------+--------+------------------+---------------+
|        1|       1|          John Doe|     2020-06-03|
|        2|       2|        Jane Smith|     2018-05-13|
|        3|       3|   Michael Johnson|     2021-10-03|
|        4|       4|       Emily Brown|     2020-10-25|
|        5|       5|      David Wilson|     2021-04-08|
|        6|       6|       Emma Taylor|     2019-03-28|
|        7|       7|Christopher Miller|     2020-01-11|
|        8|       8|      Olivia Davis|     2021-10-24|
|        9|       9|   Daniel Martinez|     2018-10-08|
|       10|      10|      Sophia Moore|     2019-05-25|
+---------+--------+------------------+---------------+



### transactions File:

In [20]:
# defining the expected schema
main_transactions_schema = StructType([
    StructField("transaction_date", DateType(), nullable=False),
    StructField("transaction_id", StringType(), nullable=False),
    StructField("customer_id", IntegerType(), nullable=False),
    StructField("customer_fname", StringType(), nullable=False),
    StructField("customer_lname", StringType(), nullable=False),
    StructField("customer_email", StringType(), nullable=False),
    StructField("agent_id", IntegerType(), nullable=False),
    StructField("branch_id", IntegerType(), nullable=False),
    StructField("product_id", IntegerType(), nullable=False),
    StructField("product_name", StringType(), nullable=False),
    StructField("product_category", StringType(), nullable=False),
    StructField("offer_1", BooleanType(), nullable=True),
    StructField("offer_2", BooleanType(), nullable=True),
    StructField("offer_3", BooleanType(), nullable=True),
    StructField("offer_4", BooleanType(), nullable=True),
    StructField("offer_5", BooleanType(), nullable=True),
    StructField("units", IntegerType(), nullable=False),
    StructField("unit_price", FloatType(), nullable=False),
    StructField("is_online", StringType(), nullable=False),
    StructField("payment_method", StringType(), nullable=False),
    StructField("shipping_address", StringType(), nullable=True)
])

In [21]:
# 2- read the file
new_data_path = '/user/itversity/q-retail-company/silver/2024-07-10/hour-17/transaction_cleaned.csv'
df2_path = '/user/itversity/q-retail-company/silver/2024-07-10/hour-18/transaction_cleaned.csv'

new_main_data_df = spark.read.csv(new_data_path, header=True, schema=main_transactions_schema)


#### customer dim:

In [22]:
# Required Environment Variables:
db="qcompany"
table = "customer"
table_name = "customer_dim"
schema = f"{table}_schema"
table_key = f"{table}_key"
table_id = f"{table}_id"
table_dir = f"/user/itversity/q-retail-company/gold/{db}.db/{table_name}"

# del(table_key,table_id, dir )

# dir = f"/user/itversity/q-retail-company/gold/{db}.db/{table}"

In [23]:
# 1- define the schema
customer_schema = StructType([
    StructField("customer_id", IntegerType(), nullable=False),
    StructField("customer_fname", StringType(), nullable=False),
    StructField("customer_lname", StringType(), nullable=False),
    StructField("customer_email", StringType(), nullable=False)
])

In [24]:
# 2- get the new customer df (subset from the new transaction file)


new_data_df = new_main_data_df.select("customer_id", "customer_fname", "customer_lname", "customer_email").distinct()
new_data_df.show()


+-----------+--------------+--------------+--------------------+
|customer_id|customer_fname|customer_lname|      customer_email|
+-----------+--------------+--------------+--------------------+
|      85526|        Sophia|         Smith|sophia.smith@hotm...|
|      85524|     Alexander|        Miller|alexander.miller@...|
|      85502|       William|         Jones|william.jones@gma...|
|      85481|     Alexander|      Williams|alexander.william...|
|      85558|         James|         Smith|james.smith@outlo...|
|      85517|        Sophia|         Davis|sophia.davis@hotm...|
|      85555|          John|       Johnson|john.johnson@hotm...|
|      85541|        Olivia|        Wilson|olivia.wilson@out...|
|      85545|          John|        Wilson|john.wilson@gmail...|
|      85494|       William|         Davis|william.davis@gma...|
|      85488|        Olivia|        Wilson|olivia.wilson@yah...|
|      85554|       William|        Wilson|william.wilson@ya...|
|      85508|           A

In [25]:
if is_directory_empty(table_dir):
    print("Empty dir, let's fill it!")
    print("----------------------------------------------------------------------------------")
    print("adding the surrogate key")
    df1sk = add_surrogate_key(new_data_df, table_key, table_id)
    print("----------------------------------------------------------------------------------")
    print("preparing the data to be written")
    df1sk = df1sk.select(f"{table_key}", *[col for col in df1sk.columns if col != f"{table_key}"])
    print("----------------------------------------------------------------------------------")
    print("writing the df as a parquet to hdfs")
    df1sk.write.parquet(table_dir, mode='append')
    print("----------------------------------------------------------------------------------")
    print("\n Done")
else:
    print("Dir is not empty let's hash compare them!")
    current_df_sk = spark.read.parquet(table_dir, header=True, inferSchema=True)
    print(f"reading the data in the current {table_name} data")
    print("----------------------------------------------------------------------------------")
    print("getting the max surrogate key to use later on when appening the new data... ")
    max_sk = current_df_sk.agg(sf.max(table_key)).collect()[0][0]
    print(f"current max {table_key} is {max_sk}, the next {table_key} will be {max_sk+1}")
    print("----------------------------------------------------------------------------------")
    current_df = current_df_sk.drop(table_key)
    new_data_df = new_data_df.drop(table_key)
    new_data_df = new_data_df.drop("hash") # in case we re ran the same code twice
    print(f"hashing current & new data to compare them")
    current_df = current_df.withColumn("hash", concat_and_hash_udf(*[sf.col(column).cast("string") for column in current_df.columns]))
    new_data_df = new_data_df.withColumn("hash", concat_and_hash_udf(*[sf.col(column).cast("string") for column in new_data_df.columns]))
    print("----------------------------------------------------------------------------------")
    new_rows_df = new_data_df.join(current_df, new_data_df.hash == current_df.hash, "left_anti")
    # another medthod 
    # new_rows_df = new_data_df.exceptAll(current_df).show()
    print("comparing hashes and removing the duplicates")
    print("----------------------------------------------------------------------------------")
    print("preparing the net new data to append to the current data ")
    new_rows_df = new_rows_df.drop("hash")
    new_rows_df = add_surrogate_key(new_rows_df, table_key, table_id, max_sk)
    new_rows_df = new_rows_df.select(f"{table_key}", *[col for col in new_rows_df.columns if col != f"{table_key}"])
    print("----------------------------------------------------------------------------------")
    print("writing (Appending) the new data to HDFS")
    new_rows_df.write.parquet(table_dir, mode='append')
    print("----------------------------------------------------------------------------------")
    print("\n Mission Passed... Respect+")

Empty dir, let's fill it!
----------------------------------------------------------------------------------
adding the surrogate key
----------------------------------------------------------------------------------
preparing the data to be written
----------------------------------------------------------------------------------
writing the df as a parquet to hdfs
----------------------------------------------------------------------------------

 Done


In [26]:
# for testing purposes
spark.sql("REFRESH TABLES")
test_df = spark.read.parquet(table_dir, header=True, inferSchema=True)
print(test_df.count())
test_df.show()

101
+------------+-----------+--------------+--------------+--------------------+
|customer_key|customer_id|customer_fname|customer_lname|      customer_email|
+------------+-----------+--------------+--------------+--------------------+
|           1|      85462|          John|         Smith|john.smith@yahoo.com|
|           2|      85463|        Olivia|         Smith|olivia.smith@outl...|
|           3|      85464|       Michael|        Miller|michael.miller@ou...|
|           4|      85465|        Sophia|        Miller|sophia.miller@out...|
|           5|      85466|          Emma|         Brown|emma.brown@yahoo.com|
|           6|      85467|          Emma|        Miller|emma.miller@yahoo...|
|           7|      85468|          Emma|        Wilson|emma.wilson@outlo...|
|           8|      85469|       Michael|      Williams|michael.williams@...|
|           9|      85470|          John|        Taylor|john.taylor@yahoo...|
|          10|      85471|     Alexander|        Miller|alex

#### product dim:

In [27]:
# Required Environment Variables:
db="qcompany"
table = "product"
table_name = "product_dim"
schema = f"{table}_schema"
table_key = f"{table}_key"
table_id = f"{table}_id"
table_dir = f"/user/itversity/q-retail-company/gold/{db}.db/{table_name}"

print(table_dir)

/user/itversity/q-retail-company/gold/qcompany.db/product_dim


In [28]:
# 1- define the schema
product_schema = StructType([
    StructField("product_id", IntegerType(), nullable=False),
    StructField("product_name", StringType(), nullable=False),
    StructField("product_price", DoubleType(), nullable=False),
    StructField("product_category", StringType(), nullable=False)
])

In [29]:
# 2- get the new customer df (subset from the new transaction file)


new_data_df = new_main_data_df.select("product_id", "product_name", "unit_price", "product_category").distinct().orderBy("product_id")

new_data_df.show()


+----------+------------+----------+----------------+
|product_id|product_name|unit_price|product_category|
+----------+------------+----------+----------------+
|         1|      Laptop|    999.99|     Electronics|
|         2|  Smartphone|    699.99|     Electronics|
|         3|      Tablet|    299.99|     Electronics|
|         4|  Headphones|     99.99|     Electronics|
|         5|     T-Shirt|     19.99|        Clothing|
|         6|       Jeans|     49.99|        Clothing|
|         7|       Dress|     59.99|        Clothing|
|         8|    Sneakers|     79.99|        Footwear|
|         9|       Boots|    129.99|        Footwear|
|        10|     Sandals|     39.99|        Footwear|
|        11|          TV|    899.99|     Electronics|
|        12|     Monitor|    299.99|     Electronics|
|        13|     Printer|    149.99|     Electronics|
|        14|      Camera|    399.99|     Electronics|
|        15|      Hoodie|     29.99|        Clothing|
|        16|       Skirt|   

In [30]:
if is_directory_empty(table_dir):
    print("Empty dir, let's fill it!")
    print("----------------------------------------------------------------------------------")
    print("adding the surrogate key")
    df1sk = add_surrogate_key(new_data_df, table_key, table_id)
    print("----------------------------------------------------------------------------------")
    print("preparing the data to be written")
    df1sk = df1sk.select(f"{table_key}", *[col for col in df1sk.columns if col != f"{table_key}"])
    print("----------------------------------------------------------------------------------")
    print("writing the df as a parquet to hdfs")
    df1sk.write.parquet(table_dir, mode='append')
    print("----------------------------------------------------------------------------------")
    print("\n Done")
else:
    print("Dir is not empty let's hash compare them!")
    current_df_sk = spark.read.parquet(table_dir, header=True, inferSchema=True)
    print(f"reading the data in the current {table_name} data")
    print("----------------------------------------------------------------------------------")
    print("getting the max surrogate key to use later on when appening the new data... ")
    max_sk = current_df_sk.agg(sf.max(table_key)).collect()[0][0]
    print(f"current max {table_key} is {max_sk}, the next {table_key} will be {max_sk+1}")
    print("----------------------------------------------------------------------------------")
    current_df = current_df_sk.drop(table_key)
    new_data_df = new_data_df.drop(table_key)
    new_data_df = new_data_df.drop("hash") # in case we re ran the same code twice
    print(f"hashing current & new data to compare them")
    current_df = current_df.withColumn("hash", concat_and_hash_udf(*[sf.col(column).cast("string") for column in current_df.columns]))
    new_data_df = new_data_df.withColumn("hash", concat_and_hash_udf(*[sf.col(column).cast("string") for column in new_data_df.columns]))
    print("----------------------------------------------------------------------------------")
    new_rows_df = new_data_df.join(current_df, new_data_df.hash == current_df.hash, "left_anti")
    # another medthod 
    # new_rows_df = new_data_df.exceptAll(current_df).show()
    print("comparing hashes and removing the duplicates")
    print("----------------------------------------------------------------------------------")
    print("preparing the net new data to append to the current data ")
    new_rows_df = new_rows_df.drop("hash")
    new_rows_df = add_surrogate_key(new_rows_df, table_key, table_id, max_sk)
    new_rows_df = new_rows_df.select(f"{table_key}", *[col for col in new_rows_df.columns if col != f"{table_key}"])
    print("----------------------------------------------------------------------------------")
    print("writing (Appending) the new data to HDFS")
    new_rows_df.write.parquet(table_dir, mode='append')
    print("----------------------------------------------------------------------------------")
    print("\n Mission Passed... Respect+")

Empty dir, let's fill it!
----------------------------------------------------------------------------------
adding the surrogate key
----------------------------------------------------------------------------------
preparing the data to be written
----------------------------------------------------------------------------------
writing the df as a parquet to hdfs
----------------------------------------------------------------------------------

 Done


In [31]:
# for testing purposes
spark.sql("REFRESH TABLES")
test_df = spark.read.parquet(table_dir, header=True, inferSchema=True)
print(test_df.count())
test_df.show()

30
+-----------+----------+------------+----------+----------------+
|product_key|product_id|product_name|unit_price|product_category|
+-----------+----------+------------+----------+----------------+
|          1|         1|      Laptop|    999.99|     Electronics|
|          2|         2|  Smartphone|    699.99|     Electronics|
|          3|         3|      Tablet|    299.99|     Electronics|
|          4|         4|  Headphones|     99.99|     Electronics|
|          5|         5|     T-Shirt|     19.99|        Clothing|
|          6|         6|       Jeans|     49.99|        Clothing|
|          7|         7|       Dress|     59.99|        Clothing|
|          8|         8|    Sneakers|     79.99|        Footwear|
|          9|         9|       Boots|    129.99|        Footwear|
|         10|        10|     Sandals|     39.99|        Footwear|
|         11|        11|          TV|    899.99|     Electronics|
|         12|        12|     Monitor|    299.99|     Electronics|
|      

### Transaction Facts:

##### Feature Engineering 


In [32]:
# separating the transactions data
main_transactions = new_main_data_df.drop("customer_fname", "customer_lname", "customer_email", "product_name", "product_price", "product_category")

Offers

In [33]:
# adding the redeemed offer column
main_transactions = main_transactions.withColumn("offer_redeemed", 
                                                 sf.when(sf.col("offer_1") == True, "offer_1")\
                                                .when(sf.col("offer_2") == True, "offer_2")\
                                                .when(sf.col("offer_3") == True, "offer_3")\
                                                .when(sf.col("offer_4") == True, "offer_4")\
                                                .when(sf.col("offer_5") == True, "offer_5")\
                                                .otherwise("NA")
)

In [34]:

# adding the offer percentage column
main_transactions = main_transactions.withColumn("discount_pct", 
                                                 sf.when(sf.col("offer_redeemed") == "offer_1", 5)\
                                                 .when(sf.col("offer_redeemed") == "offer_2", 10)\
                                                 .when(sf.col("offer_redeemed") == "offer_3", 15)\
                                                 .when(sf.col("offer_redeemed") == "offer_4", 20)\
                                                 .when(sf.col("offer_redeemed") == "offer_5", 25)\
                                                .otherwise(0)
)

# main_transactions.show(5)

In [35]:
# dropping the offers(1..5) columns
main_transactions = main_transactions.drop("offer_1", "offer_2", "offer_3", "offer_4", "offer_5")

Price

In [36]:
# adding the total price column
main_transactions = main_transactions.withColumn("total_price", sf.col("unit_price") * sf.col("units"))

In [37]:
# adding the final price column
main_transactions = main_transactions.withColumn("final_price", sf.round(sf.col("total_price") * (1 - sf.col("discount_pct") / 100), 3))

Now, Separating the Transactions to online and offline

In [38]:
online_transactions = main_transactions.filter(sf.col("is_online") == "yes")
offline_transactions = main_transactions.filter(sf.col("is_online") == "no")

#### Online Transactions

In [39]:
online_transactions = online_transactions.drop("branch_id", "agent_id")
# online_transactions.show(2)

Address

In [40]:
# # online_transactions = 
online_transactions = online_transactions.withColumn("address", sf.split(online_transactions["shipping_address"], "/").getItem(0))
online_transactions = online_transactions.withColumn("shipping_city", sf.split(online_transactions["shipping_address"], "/").getItem(1))
online_transactions = online_transactions.withColumn("shipping_state", sf.split(online_transactions["shipping_address"], "/").getItem(2))
online_transactions = online_transactions.withColumn("shipping_postal_code", sf.split(online_transactions["shipping_address"], "/").getItem(3))
online_transactions = online_transactions.drop("shipping_address")
# online_transactions.show()


In [41]:
online_transactions.show(2)

+----------------+----------------+-----------+----------+-----+----------+---------+--------------+--------------+------------+-----------+-----------+------------------+-------------+--------------+--------------------+
|transaction_date|  transaction_id|customer_id|product_id|units|unit_price|is_online|payment_method|offer_redeemed|discount_pct|total_price|final_price|           address|shipping_city|shipping_state|shipping_postal_code|
+----------------+----------------+-----------+----------+-----+----------+---------+--------------+--------------+------------+-----------+-----------+------------------+-------------+--------------+--------------------+
|      2022-05-11|trx-254567854494|      85492|        27|   10|     29.99|      yes|        PayPal|            NA|           0|      299.9|      299.9|7002 Secrest Court|       Arvada|            CO|               80007|
|      2022-04-06|trx-585776417704|      85521|        28|    5|     19.99|      yes|        Stripe|            

In [42]:
# now we need to join  the customer_dim, product_dim to the fact to get their keys
customer_table_dir = f"/user/itversity/q-retail-company/gold/{db}.db/customer_dim"
product_table_dir = f"/user/itversity/q-retail-company/gold/{db}.db/product_dim"

current_customer_dim = spark.read.parquet(customer_table_dir, header=True, inferSchema=True)
current_product_dim = spark.read.parquet(product_table_dir, header=True, inferSchema=True)

online_transactions_with_c_key = online_transactions.join(current_customer_dim.select("customer_id", "customer_key"), online_transactions.customer_id == current_customer_dim.customer_id, "left")
online_transactions_with_c_p_key = online_transactions_with_c_key.join(current_product_dim.select("product_id", "product_key"), online_transactions_with_c_key.product_id == current_product_dim.product_id, "left")

columns_to_exclude = ["customer_id", "product_id", "is_online"]
selected_cols = [col for col in online_transactions_with_c_p_key.columns if col not in columns_to_exclude]

final_new_online_transactions = online_transactions_with_c_p_key.select(selected_cols)\
                                                            .withColumnRenamed("transaction_id", "online_transaction_id")\
                                                            .withColumnRenamed("units", "quantity")\
                                                            .withColumnRenamed("address", "shipping_address")
# final_new_online_transactions.show(2)

desired_columns_ordered = ['online_transaction_id', 'transaction_date', 'customer_key', 'product_key', 'unit_price', 'quantity', 'total_price', 'offer_redeemed', 'discount_pct', 'final_price', 'payment_method', 'shipping_address', 'shipping_city', 'shipping_state', 'shipping_postal_code']

final_new_online_transactions = final_new_online_transactions.select(desired_columns_ordered)

In [43]:
# Required Environment Variables:
db="qcompany"
table = "online_transaction"
table_name = "online_transaction_fact"
table_key = f"{table}_key"
table_id = f"{table}_id"
table_dir = f"/user/itversity/q-retail-company/gold/{db}.db/{table_name}"

print(table_dir)

/user/itversity/q-retail-company/gold/qcompany.db/online_transaction_fact


In [44]:
schema = final_new_online_transactions.schema
final_new_online_transactions.printSchema()

root
 |-- online_transaction_id: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- customer_key: integer (nullable = true)
 |-- product_key: integer (nullable = true)
 |-- unit_price: float (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- total_price: float (nullable = true)
 |-- offer_redeemed: string (nullable = false)
 |-- discount_pct: integer (nullable = false)
 |-- final_price: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- shipping_address: string (nullable = true)
 |-- shipping_city: string (nullable = true)
 |-- shipping_state: string (nullable = true)
 |-- shipping_postal_code: string (nullable = true)



In [45]:
if is_directory_empty(table_dir):
    print("Empty dir, let's fill it!")
    print("----------------------------------------------------------------------------------")
    print("adding the surrogate key")
    df1sk = add_surrogate_key(final_new_online_transactions, table_key, table_id)
    print("----------------------------------------------------------------------------------")
    print("preparing the data to be written")
    df1sk = df1sk.select(f"{table_key}", *[col for col in df1sk.columns if col != f"{table_key}"])
    print("----------------------------------------------------------------------------------")
    print("writing the df as a parquet to hdfs")
    df1sk.write.parquet(table_dir, mode='append')
    print("----------------------------------------------------------------------------------")
    print("\n Done")
else:
    print("Dir is not empty let's Append the new transactions!")
    current_df_sk = spark.read.parquet(table_dir, header=True, inferSchema=True)
    # current_df_sk.show(1)
    print(f"reading the data in the current {table_name} data")
    print("----------------------------------------------------------------------------------")
    print("getting the max surrogate key to use later on when appening the new data... ")
    max_sk = current_df_sk.agg(sf.max(table_key)).collect()[0][0]
    print(f"current max {table_key} is {max_sk}, the next {table_key} will be {max_sk+1}")
    print("----------------------------------------------------------------------------------")
    print("writing (Appending) the new data to HDFS")
    final_new_online_transactions = add_surrogate_key(final_new_online_transactions, table_key, table_id, max_sk)
    final_new_online_transactions = final_new_online_transactions.select(f"{table_key}", *[col for col in final_new_online_transactions.columns if col != f"{table_key}"])
    final_new_online_transactions.write.parquet(table_dir, mode='append')
    print("----------------------------------------------------------------------------------")
    print("\n Mission Passed... Respect+")

Empty dir, let's fill it!
----------------------------------------------------------------------------------
adding the surrogate key
----------------------------------------------------------------------------------
preparing the data to be written
----------------------------------------------------------------------------------
writing the df as a parquet to hdfs
----------------------------------------------------------------------------------

 Done


In [46]:
# for testing purposes
spark.sql("REFRESH TABLES")
table_dir
test_df = spark.read.parquet(table_dir, header=True, schema=True)
test_df.show()

+----------------------+---------------------+----------------+------------+-----------+----------+--------+-----------+--------------+------------+-----------+--------------+--------------------+-------------+--------------+--------------------+
|online_transaction_key|online_transaction_id|transaction_date|customer_key|product_key|unit_price|quantity|total_price|offer_redeemed|discount_pct|final_price|payment_method|    shipping_address|shipping_city|shipping_state|shipping_postal_code|
+----------------------+---------------------+----------------+------------+-----------+----------+--------+-----------+--------------+------------+-----------+--------------+--------------------+-------------+--------------+--------------------+
|                     1|     trx-003395056165|      2022-04-20|           4|         25|    499.99|       9|    4499.91|            NA|           0|    4499.91|        Stripe|7901 West 52nd Av...|       Arvada|            CO|               80002|
|           

#### Offline Transactions

In [47]:
offline_transactions = offline_transactions.drop("shipping_address")
offline_transactions.show(2)

+----------------+----------------+-----------+--------+---------+----------+-----+----------+---------+--------------+--------------+------------+-----------+-----------+
|transaction_date|  transaction_id|customer_id|agent_id|branch_id|product_id|units|unit_price|is_online|payment_method|offer_redeemed|discount_pct|total_price|final_price|
+----------------+----------------+-----------+--------+---------+----------+-----+----------+---------+--------------+--------------+------------+-----------+-----------+
|      2023-10-25|trx-072037549384|      85550|       2|        3|         3|    7|    299.99|       no|          Cash|            NA|           0|    2099.93|    2099.93|
|      2022-05-08|trx-125208155197|      85512|       9|        5|        11|    3|    899.99|       no|   Credit Card|            NA|           0|    2699.97|    2699.97|
+----------------+----------------+-----------+--------+---------+----------+-----+----------+---------+--------------+--------------+------

In [48]:
# now we need to join  the customer_dim, product_dim, branch_dim, agent_dim to the fact to get their keys
customer_table_dir = f"/user/itversity/q-retail-company/gold/{db}.db/customer_dim"
product_table_dir = f"/user/itversity/q-retail-company/gold/{db}.db/product_dim"
agent_table_dir = f"/user/itversity/q-retail-company/gold/{db}.db/agent_dim"
branch_table_dir = f"/user/itversity/q-retail-company/gold/{db}.db/branch_dim"

current_customer_dim = spark.read.parquet(customer_table_dir, header=True, inferSchema=True)
current_product_dim = spark.read.parquet(product_table_dir, header=True, inferSchema=True)
current_agent_dim = spark.read.parquet(agent_table_dir, header=True, inferSchema=True)
current_branch_dim = spark.read.parquet(branch_table_dir, header=True, inferSchema=True)

offline_transactions_with_c_key = offline_transactions.join(current_customer_dim.select("customer_id", "customer_key"), offline_transactions.customer_id == current_customer_dim.customer_id, "left")
offline_transactions_with_c_p_key = offline_transactions_with_c_key.join(current_product_dim.select("product_id", "product_key"), offline_transactions_with_c_key.product_id == current_product_dim.product_id, "left")
offline_transactions_with_c_p_a_key = offline_transactions_with_c_p_key.join(current_agent_dim.select("agent_id", "agent_key"), offline_transactions_with_c_p_key.agent_id == current_agent_dim.agent_id, "left")
offline_transactions_with_c_p_a_b_key = offline_transactions_with_c_p_a_key.join(current_branch_dim.select("branch_id", "branch_key"), offline_transactions_with_c_p_a_key.branch_id == current_branch_dim.branch_id, "left")

# offline_transactions_with_c_p_a_b_key.show(2)
columns_to_exclude = ["customer_id", "product_id", "is_online", "agent_id", "branch_id"]
selected_cols = [col for col in offline_transactions_with_c_p_a_b_key.columns if col not in columns_to_exclude]

final_new_offline_transactions = offline_transactions_with_c_p_a_b_key.select(selected_cols)\
                                                            .withColumnRenamed("transaction_id", "offline_transaction_id")\
                                                            .withColumnRenamed("units", "quantity")
final_new_offline_transactions.columns
desired_columns_ordered = ['offline_transaction_id', 'transaction_date', 'customer_key', 'product_key', 
                           'unit_price', 'quantity', 'total_price', 'offer_redeemed', 'discount_pct', 
                           'final_price', 'payment_method', 'branch_key', 'agent_key']

# # final_new_offline_transactions.show(2)

final_new_offline_transactions = final_new_offline_transactions.select(desired_columns_ordered)

final_new_offline_transactions.show(2)


+----------------------+----------------+------------+-----------+----------+--------+-----------+--------------+------------+-----------+--------------+----------+---------+
|offline_transaction_id|transaction_date|customer_key|product_key|unit_price|quantity|total_price|offer_redeemed|discount_pct|final_price|payment_method|branch_key|agent_key|
+----------------------+----------------+------------+-----------+----------+--------+-----------+--------------+------------+-----------+--------------+----------+---------+
|      trx-072037549384|      2023-10-25|          89|          3|    299.99|       7|    2099.93|            NA|           0|    2099.93|          Cash|         3|        2|
|      trx-125208155197|      2022-05-08|          51|         11|    899.99|       3|    2699.97|            NA|           0|    2699.97|   Credit Card|         5|        9|
+----------------------+----------------+------------+-----------+----------+--------+-----------+--------------+------------

In [49]:
# Required Environment Variables:
db="qcompany"
table = "offline_transaction"
table_name = "offline_transaction_fact"
table_key = f"{table}_key"
table_id = f"{table}_id"
table_dir = f"/user/itversity/q-retail-company/gold/{db}.db/{table_name}"

print(table_dir)

/user/itversity/q-retail-company/gold/qcompany.db/offline_transaction_fact


In [50]:
schema = final_new_offline_transactions.schema
final_new_offline_transactions.printSchema()

root
 |-- offline_transaction_id: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- customer_key: integer (nullable = true)
 |-- product_key: integer (nullable = true)
 |-- unit_price: float (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- total_price: float (nullable = true)
 |-- offer_redeemed: string (nullable = false)
 |-- discount_pct: integer (nullable = false)
 |-- final_price: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- branch_key: integer (nullable = true)
 |-- agent_key: integer (nullable = true)



In [51]:
if is_directory_empty(table_dir):
    print("Empty dir, let's fill it!")
    print("----------------------------------------------------------------------------------")
    print("adding the surrogate key")
    df1sk = add_surrogate_key(final_new_offline_transactions, table_key, table_id)
    print("----------------------------------------------------------------------------------")
    print("preparing the data to be written")
    df1sk = df1sk.select(f"{table_key}", *[col for col in df1sk.columns if col != f"{table_key}"])
    print("----------------------------------------------------------------------------------")
    print("writing the df as a parquet to hdfs")
    df1sk.write.parquet(table_dir, mode='append')
    print("----------------------------------------------------------------------------------")
    print("\n Done")
else:
    print("Dir is not empty let's Append the new transactions!")
    current_df_sk = spark.read.parquet(table_dir, header=True, inferSchema=True)
    # current_df_sk.show(1)
    print(f"reading the data in the current {table_name} data")
    print("----------------------------------------------------------------------------------")
    print("getting the max surrogate key to use later on when appening the new data... ")
    max_sk = current_df_sk.agg(sf.max(table_key)).collect()[0][0]
    print(f"current max {table_key} is {max_sk}, the next {table_key} will be {max_sk+1}")
    print("----------------------------------------------------------------------------------")
    print("writing (Appending) the new data to HDFS")
    final_new_offline_transactions = add_surrogate_key(final_new_offline_transactions, table_key, table_id, max_sk)
    final_new_offline_transactions = final_new_offline_transactions.select(f"{table_key}", *[col for col in final_new_offline_transactions.columns if col != f"{table_key}"])
    final_new_offline_transactions.write.parquet(table_dir, mode='append')
    print("----------------------------------------------------------------------------------")
    print("\n Mission Passed... Respect+")

Empty dir, let's fill it!
----------------------------------------------------------------------------------
adding the surrogate key
----------------------------------------------------------------------------------
preparing the data to be written
----------------------------------------------------------------------------------
writing the df as a parquet to hdfs
----------------------------------------------------------------------------------

 Done


In [52]:
# for testing purposes
spark.sql("REFRESH TABLES")
table_dir
test_df = spark.read.parquet(table_dir, header=True, schema=True)
test_df.count()

1000