In [1]:
import findspark
findspark.init(spark_home = "/home/thanhphat/BigData/spark-3.5.0-bin-hadoop3")

In [2]:
from pyspark.sql import SparkSession

import traceback
import pyspark.sql.functions as f
import pyspark.sql.types as t

project_name = "Global_Electronics_Retailer"

In [3]:
# Config
    # Number of executor: 2
    # 2 CPU for each executor
    # 2g memory for each executor

# Create SparkSession
spark = SparkSession.builder.master("local[4]") \
    .appName("Bronze_to_Silver") \
    .config("spark.sql.warehouse.dir", f"hdfs://localhost:9000/lakehouse/warehouse/LH_{project_name}") \
    .config("spark.sql.catalogImplementation", "hive").enableHiveSupport() \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.parquet.vorder.enabled", "true") \
    .config("spark.sql.shuffle.partitions", 100) \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

# Get all config
spark.sparkContext.getConf().getAll()            

24/06/29 10:38:18 WARN Utils: Your hostname, thanhphat-inspiron-5406-2n1 resolves to a loopback address: 127.0.1.1; using 192.168.1.8 instead (on interface wlp0s20f3)
24/06/29 10:38:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/06/29 10:38:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


[('spark.app.submitTime', '1719632299646'),
 ('spark.master', 'local[4]'),
 ('spark.driver.host', '192.168.1.8'),
 ('spark.app.id', 'local-1719632301116'),
 ('spark.sql.warehouse.dir',
  'hdfs://localhost:9000/lakehouse/warehouse/LH_Global_Electronics_Retailer'),
 ('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNA

In [None]:
import sys
sys.path.append("/home/thanhphat/PersonalProject/Global_Electronics_Retailer/source")

from modules.Extraction import *
from modules.HDFSUtils import *
from modules.LogUtils import *
from modules.Metadata import *
from modules.Load import *


# Instance for modules
extraction = Extraction()
hdfsUtils = HDFSUtils()
logUtils = LogUtils() 
metadata = Metadata()
loadHive = Load()

# Define base_path
lakewarehouse_db = f"LH_{project_name}"
lakehouse_table_path = f"hdfs://localhost:9000/lakehouse/LH_{project_name}/Tables"
log_path = f"hdfs://localhost:9000/lakehouse/LH_{project_name}/Files/log"

In [None]:
executionDate = str(spark.sql("SELECT CURRENT_DATE()").collect()[0][0])

# Partition Execution Date
parse_execution = executionDate.split("-")
year = parse_execution[0]
month = parse_execution[1]
day = parse_execution[2]

## Metadata Table Action 

In [None]:
# Read metadata action
# metadata_action = metadata.read_metadata_action("admin", "admin", "metadata", "config_table", \
#                                                 "CusDB -> Bronze")

from airflow.models import Variable
metadata_action = Variable.get(key = "metadata_action", deserialize_json = True, default_var = None)


# Define for log job
batch_run = hdfsUtils.check_batch_run(project_name, executionDate) - 1
start_time = ""
end_time = ""
error = ""
status = ""
source_row_read = 0
numInserted = 0
numUpdated = 0


from delta.tables import *


for metadata in metadata_action:

    # None df
    df = None

    task_id = metadata["task_id"]
    task_name = metadata["task_name"]
    source_connection = metadata["source_connection"]
    target_database = metadata["target_database"]
    source_folder = metadata["source_folder"].lower()
    target_table = metadata["target_table"].lower()
    phase = metadata["phase"]

    # Start time for check
    start_time = spark.sql(''' SELECT CURRENT_TIMESTAMP() as current_time ''') \
                        .collect()[0]["current_time"].strftime('%Y-%m-%d %H:%M:%S')
    

    try:
        # New df path
        new_path_version = hdfsUtils.get_new_version(executionDate, project_name, source_folder)

        df = spark.read.format("parquet").load(new_path_version)


        # Transformation
            # Task 6
        if source_folder == "customers":
            
            df = df.withColumnRenamed("State Code", "StateCode") \
                   .withColumnRenamed("Zip Code", "ZipCode")
        
            # Task = 7, Stores
        elif source_folder == "stores":

            df = df.withColumnRenamed("Square Meters", "SquareMeters") \
                   .withColumnRenamed("Open Date", "OpenDate")
            
            # Task = 8, Products
        elif source_folder == "products":

            df = df.withColumnRenamed("Product Name", "ProductName") \
                   .withColumnRenamed("Unit Cost USD", "Unit_Cost_USD") \
                   .withColumnRenamed("Unit Price USD", "Unit_Price_USD")
            
            # Task = 9, Sales
        elif source_folder == "sales":

            df = df.withColumnRenamed("Order Number", "OrderNumber") \
                   .withColumnRenamed("Line Item", "LineItem") \
                   .withColumnRenamed("Order Date", "OrderDate") \
                   .withColumnRenamed("Delivery Date", "DeliveryDate") \
                   .withColumnRenamed("Currency Code", "CurrencyCode")
            
            # Task = 10, Exchange_Rates
        elif source_folder == "exchange_rates":

            df = df.withColumn("Exchange", f.col("Exchange").cast("Float"))


        # df.show()
        deltaTablePath = f"{lakehouse_table_path}/{target_table}"

        if DeltaTable.isDeltaTable(spark, deltaTablePath):

            # Condition for Upsert
            mergeKeyExpr = " AND ".join(f"target.{col} = source.{col}" for col in df.columns) 

            # Upsert(Update existing, Insert new) data
            deltaTable = DeltaTable.forPath(spark, deltaTablePath)
            deltaTable.alias("target").merge(
                df.alias("source"),
                mergeKeyExpr
            ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

            # Get information
            history = deltaTable.history(1).select("operationMetrics")
            operationMetrics = history.collect()[0]["operationMetrics"]

            source_row_read = df.count()
            numInserted = operationMetrics["numTargetRowsInserted"]
            numUpdated = operationMetrics["numTargetRowsUpdated"]
        else:
            loadHive.writeInit(df, spark, lakehouse_table_path, lakewarehouse_db, target_table)
            
            source_row_read = df.count()
            numInserted = df.count()
            numUpdated = 0
            
    except:
        error = traceback.format_exc()
        status = "Failed"

        print("Task ", task_id, " ", status)

    else:
        error = ""
        status = "Success"
        print("Task ", task_id, " ", status)

    # End time for transformation
    end_time = spark.sql(''' SELECT CURRENT_TIMESTAMP() as current_time ''') \
                        .collect()[0]["current_time"].strftime('%Y-%m-%d %H:%M:%S')

    df_log = logUtils.log_data(batch_run, task_name, source_connection, target_database, f"parquet_{source_folder}",
                 target_table, start_time, end_time, source_row_read, numInserted, numUpdated, "", 
                 "", error, status, phase, t, spark)

    df_log.write.mode("append").format("parquet").save(f"{log_path}/{executionDate}/batch_{batch_run}/")