In [16]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import when, col, lit, current_timestamp,row_number, expr
from pyspark.sql.functions import split
import os
from pyspark.sql.functions import col, crc32
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, TimestampType
from pyspark.sql.functions import col, concat, year, month, dayofmonth
import subprocess
from pyspark.sql.window import Window


In [31]:
spark = SparkSession\
    .builder\
    .master("local[4]")\
    .appName("fact")\
    .config("spark.eventLog.logBlockUpdates.enabled", True)\
    .enableHiveSupport()\
    .getOrCreate()

sc = spark.sparkContext

In [18]:
spark.conf.set("spark.sql.repl.eagerEval.maxColWidth", 1000)

In [19]:

def get_lateset_file(directory):
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
    list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(directory))
    files = [(file.getPath().toString(), file.getModificationTime()) for file in list_status if file.isFile()]
    files.sort(key=lambda x: x[1], reverse=True)
    latest_file = files[0][0] if files else None
    return latest_file


In [20]:
latest_file = get_lateset_file('/user/sales_transaction/')
print(latest_file)

hdfs://localhost:9000/user/sales_transaction/sales_transactions_SS_raw_2.csv


In [21]:

def write_checkpoint(checkpoint_path, latest_file):
    checkpoint_dir = os.path.dirname(checkpoint_path)
    result = subprocess.run(['hdfs', 'dfs', '-test', '-e', checkpoint_dir])
    if result.returncode != 0:
        subprocess.run(['hdfs', 'dfs', '-mkdir', '-p', checkpoint_dir])
    
    with open('/tmp/checkpoint_tmp.txt', 'w') as f:
        f.write(latest_file)
    
    subprocess.run(['hdfs', 'dfs', '-put', '-f', '/tmp/checkpoint_tmp.txt', checkpoint_path])
    os.remove('/tmp/checkpoint_tmp.txt')

In [22]:
def fetch_last_sur(table_name, column_name):
    
    query = f"SELECT max({column_name}) as last_sur_key FROM {table_name}"
    df = spark.sql(query)
    max_sur_key = df.select("last_sur_key").first()[0] if df.count() > 0 else 0
    
    return max_sur_key

In [23]:
def write_last_sur(table_name, column_name, last_sur):
    schema = StructType([StructField(column_name, LongType(), False)])
    data = [(last_sur,)]
    df = spark.createDataFrame(data, schema)
    df.write.mode("overwrite").saveAsTable(table_name)

In [24]:
schema = StructType([
    StructField("transaction_date", TimestampType(), True),
    StructField("transaction_id", StringType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("customer_fname", StringType(), True),
    StructField("cusomter_lname", StringType(), True),
    StructField("cusomter_email", StringType(), True),
    StructField("sales_agent_id", IntegerType(), True),
    StructField("branch_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("product_category", StringType(), True),
    StructField("offer_1", BooleanType(), True),
    StructField("offer_2", BooleanType(), True),
    StructField("offer_3", BooleanType(), True),
    StructField("offer_4", BooleanType(), True),
    StructField("offer_5", BooleanType(), True),
    StructField("units", IntegerType(), True),
    StructField("unit_price", DoubleType(), True),
    StructField("is_online", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("shipping_address", StringType(), True)
])


In [25]:
create_audit_query = """
CREATE TABLE IF NOT EXISTS audit_dim (
    audit_sur BIGINT
)
"""
spark.sql(create_audit_query)

In [26]:
def get_hash(df, column_name):
    if column_name:
        return df.withColumn("customer_sur", crc32(col(column_name)))
    else:
        return df.withColumn("customer_sur", crc32(lit(1)))

In [27]:
def create_audit_dimension(df_transactions, audit_dim, column_id):
    last_audit_id = fetch_last_sur("audit_dim", "audit_sur")
    if last_audit_id is None:
        last_audit_id = 0  
    
    windowSpec = Window.orderBy("transaction_date")
    df_transactions = df_transactions.withColumn("row_num", row_number().over(windowSpec))
    df_transactions = df_transactions.withColumn("audit_sur", last_audit_id + col("row_num"))
    
    df_transactions = df_transactions.withColumn("source_file", lit(latest_file))
    df_transactions = df_transactions.withColumn("created_at", current_timestamp())
    df_transactions = df_transactions.withColumn("created_by", lit("Eslam Fayez"))
    df_transactions = df_transactions.withColumn("is_valid_email", expr("customer_email like '%@%.%'"))
    df_transactions = df_transactions.withColumn("is_positive_units", col("units") > 0)
    df_transactions = df_transactions.withColumn("is_positive_unit_price", col("unit_price") > 0)
    df_transactions = df_transactions.withColumn("is_valid_transaction", col("is_valid_email") & col("is_positive_units") & col("is_positive_unit_price"))
    
    return df_transactions

In [28]:
def concat_date(df, date_col):
    return df.withColumn("date_sur", 
                         concat(year(col(date_col)).cast("string"), 
                                month(col(date_col)).cast("string"), 
                                dayofmonth(col(date_col)).cast("string")))

In [29]:
checkpoint_path = "/user/checkpoint/checkpoint_transactions.txt"
result = subprocess.run(['hdfs', 'dfs', '-test', '-e', checkpoint_path])
if result.returncode != 0:
    latest_processed_file = ""
else:
    rdd = sc.textFile("/user/checkpoint/checkpoint_transactions.txt")
    latest_processed_file = rdd.take(rdd.count())[-1]
    print(latest_processed_file)

hdfs://localhost:9000/user/sales_transaction/sales_transactions_SS_raw_1.csv


In [30]:
processed_path = "/user/silver/sales_transaction/sales_transactions.parquet"
processed_dir = "/user/silver/sales_transaction"
checkpoint_path = "/user/checkpoint/checkpoint_transactions.txt"

latest_file = get_lateset_file('/user/sales_transaction/')



if latest_processed_file == latest_file:
    print(f"File already processed before: {latest_processed_file}")
    sc.stop()
else:
    df_transactions = spark.read.csv(latest_file, header=True, schema=schema)    
    
    df_transactions = df_transactions.withColumn("customer_email", F.regexp_replace("cusomter_email", r"\.com.*", ".com"))
    df_transactions = df_transactions.withColumnRenamed("cusomter_lname", "customer_lname")
    df_transactions = df_transactions.withColumn(
        "offers",
        when(col("offer_1") == "TRUE", 5)
        .when(col("offer_2") == "TRUE", 10)
        .when(col("offer_3") == "TRUE", 15)
        .when(col("offer_4") == "TRUE", 20)
        .when(col("offer_5") == "TRUE", 25)
        .otherwise("0")
    )
    df_transactions = df_transactions.withColumn("total_price", col("units") * col("unit_price"))
    df_transactions = df_transactions.withColumn('address_split', split(df_transactions['shipping_address'], '/'))
    df_transactions = df_transactions.withColumn('shipping_address', df_transactions['address_split'].getItem(0)) \
    .withColumn('city', df_transactions['address_split'].getItem(1)) \
    .withColumn('state', df_transactions['address_split'].getItem(2)) \
    .withColumn('postal_code', df_transactions['address_split'].getItem(3))
    df_transactions = df_transactions.drop('address_split',"offer_1","offer_2","offer_3","offer_4","offer_5","row_num","cusomter_email")
    df_transactions = create_audit_dimension(df_transactions, "audit_dim", "audit_sur")
    df_transactions = get_hash(df_transactions,"customer_email")
    df_transactions = concat_date(df_transactions,"transaction_date")
    #df_transactions = df_transactions.drop('address_split',"offer_1","offer_2","offer_3","offer_4","offer_5","customer_email","row_num")
    write_checkpoint(checkpoint_path, latest_file)
    
    last_audit_id = df_transactions.select(F.max("audit_sur")).first()[0]
    write_last_sur("audit_dim", "audit_sur", last_audit_id)
    
    result = subprocess.run(['hdfs', 'dfs', '-test', '-e', processed_path])

    if result.returncode == 0:
        write_checkpoint(checkpoint_path, latest_file)
        print(f"File already exists in HDFS: {processed_path}")
        df_transactions.write.parquet(processed_path, mode="append")
    else:
        write_checkpoint(checkpoint_path, latest_file)
        subprocess.run(['hdfs', 'dfs', '-mkdir', '-p', processed_dir])
        df_transactions.write.parquet(processed_path)

File already exists in HDFS: /user/silver/sales_transaction/sales_transactions.parquet


In [28]:
print(f'Data types of all the columns is : {df_transactions.dtypes}')
df_transactions.select('offers')

Data types of all the columns is : [('transaction_date', 'timestamp'), ('transaction_id', 'string'), ('customer_id', 'int'), ('customer_fname', 'string'), ('customer_lname', 'string'), ('sales_agent_id', 'int'), ('branch_id', 'int'), ('product_id', 'int'), ('product_name', 'string'), ('product_category', 'string'), ('units', 'int'), ('unit_price', 'double'), ('is_online', 'string'), ('payment_method', 'string'), ('shipping_address', 'string'), ('customer_email', 'string'), ('offers', 'string'), ('total_price', 'double'), ('city', 'string'), ('state', 'string'), ('postal_code', 'string'), ('row_num', 'int'), ('audit_sur', 'int'), ('source_file', 'string'), ('created_at', 'timestamp'), ('created_by', 'string'), ('is_valid_email', 'boolean'), ('is_positive_units', 'boolean'), ('is_positive_unit_price', 'boolean'), ('is_valid_transaction', 'boolean')]


offers
0
10
15
0
5
20
0
0
0
0


In [20]:
df_transactions.select('date')

date
2023520
20221025
202225
20231020
20221117
2022927
2022421
2023428
202338
2023617


In [15]:
sc.stop()