In [None]:
# Databricks notebook source
# COMMAND ----------
# %md
# # Data Processing with PySpark
# This notebook processes sales data using PySpark, performing various transformations and saving the results to Unity Catalog.

# COMMAND ----------
#
# Import necessary libraries
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, to_date, when, year, regexp_replace, split, lit, datediff
from pyspark.sql.types import DoubleType

# COMMAND ----------
#
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# COMMAND ----------
# %md
# ## Load Data
# Load data from Unity Catalog tables.

# COMMAND ----------
#
# Load data from Unity Catalog tables
try:
    orders_central = spark.table("catalog.sales.orders_central")
    orders_west = spark.table("catalog.sales.orders_west")
    orders_east = spark.table("catalog.sales.orders_east")
    orders_south = spark.table("catalog.sales.orders_south_2015")
    quota = spark.table("catalog.sales.quota")
    returns = spark.table("catalog.sales.returns")
    logger.info("Data loaded successfully from Unity Catalog tables.")
except Exception as e:
    logger.error(f"Error loading data from Unity Catalog: {e}")
    raise

# COMMAND ----------
# %md
# ## Transformations
# ### Fix Dates for Orders Central

# COMMAND ----------
#
# Transformation: Fix Dates for Orders Central
try:
    orders_central = orders_central.withColumn("Region", lit("Central")) \
        .withColumn("Order Date", to_date(concat_ws("-", "Order Year", "Order Month", "Order Day"))) \
        .withColumn("Ship Date", to_date(concat_ws("-", "Ship Year", "Ship Month", "Ship Day"))) \
        .drop("Order Year", "Order Month", "Order Day", "Ship Year", "Ship Month", "Ship Day") \
        .withColumnRenamed("Discounts", "Discount") \
        .withColumnRenamed("Product", "Product Name")
    logger.info("Dates fixed for Orders Central.")
except Exception as e:
    logger.error(f"Error in fixing dates for Orders Central: {e}")
    raise

# COMMAND ----------
# %md
# ### Remove Nulls

# COMMAND ----------
#
# Transformation: Remove Nulls
try:
    orders_central = orders_central.filter("Order ID IS NOT NULL")
    logger.info("Null Order IDs removed from Orders Central.")
except Exception as e:
    logger.error(f"Error in removing null Order IDs: {e}")
    raise

# COMMAND ----------
# %md
# ### Fix Data Type

# COMMAND ----------
#
# Transformation: Fix Data Type
try:
    orders_central = orders_central.withColumn("Discount", orders_central["Discount"].cast("string")) \
        .withColumn("Sales", regexp_replace(orders_central["Sales"], "[^0-9.]", "").cast(DoubleType()))
    logger.info("Data types fixed for Orders Central.")
except Exception as e:
    logger.error(f"Error in fixing data types: {e}")
    raise

# COMMAND ----------
# %md
# ### Rename States

# COMMAND ----------
#
# Transformation: Rename States
state_mapping = {"California": "CA", "New York": "NY"}  # Example mapping
try:
    orders_central = orders_central.replace(state_mapping, subset=["State"])
    logger.info("State names mapped to abbreviations.")
except Exception as e:
    logger.error(f"Error in renaming states: {e}")
    raise

# COMMAND ----------
# %md
# ### Pivot Quotas

# COMMAND ----------
#
# Transformation: Pivot Quotas
try:
    quota = quota.selectExpr("Region", "stack(4, '2014', `2014`, '2015', `2015`, '2016', `2016`, '2017', `2017`) as (Year, Quota)")
    logger.info("Quotas pivoted successfully.")
except Exception as e:
    logger.error(f"Error in pivoting quotas: {e}")
    raise

# COMMAND ----------
# %md
# ### Clean Notes and Approver

# COMMAND ----------
#
# Transformation: Clean Notes and Approver
try:
    returns = returns.withColumn("Return Notes", split(returns["Notes"], " ")[0]) \
        .withColumn("Approver", split(returns["Notes"], " ")[1]) \
        .drop("Notes")
    logger.info("Notes and Approver cleaned.")
except Exception as e:
    logger.error(f"Error in cleaning Notes and Approver: {e}")
    raise

# COMMAND ----------
# %md
# ## Union and Join Operations

# COMMAND ----------
#
# Union and Join Operations
try:
    all_orders = orders_central.union(orders_west).union(orders_east).union(orders_south)
    orders_returns = returns.join(all_orders, ["Order ID", "Product ID"], "left")
    logger.info("Union and join operations completed.")
except Exception as e:
    logger.error(f"Error in union and join operations: {e}")
    raise

# COMMAND ----------
# %md
# ## Custom Calculations

# COMMAND ----------
#
# Custom Calculations
try:
    is_returned = returns["Return Reason"].isNotNull()
    orders_returns = orders_returns.withColumn("Returned", when(is_returned, 1).otherwise(0)) \
        .withColumn("Days to Ship", datediff(orders_returns["Ship Date"], orders_returns["Order Date"])) \
        .withColumn("Discount", when(orders_returns["Discount"].isNull(), 0).otherwise(orders_returns["Discount"])) \
        .withColumn("Year of Sale", year(orders_returns["Order Date"]))
    logger.info("Custom calculations added.")
except Exception as e:
    logger.error(f"Error in custom calculations: {e}")
    raise

# COMMAND ----------
# %md
# ## Output: Save to Unity Catalog

# COMMAND ----------
#
# Output: Save to Unity Catalog
try:
    spark.sql("DROP TABLE IF EXISTS catalog.sales.superstore_sales")
    orders_returns.write.format("delta").mode("overwrite").saveAsTable("catalog.sales.superstore_sales")
    logger.info("Data written to Unity Catalog table 'superstore_sales'.")
except Exception as e:
    logger.error(f"Error in writing data to Unity Catalog: {e}")
    raise
