*Read data form bronze layer*

DROP TABLE IF EXISTS bara_slaes_project.bronze.dim_customers;
DROP TABLE IF EXISTS bara_slaes_project.bronze.dim_products;
DROP TABLE IF EXISTS bara_slaes_project.bronze.fact_sales;

In [0]:
df=spark.table("bara_slaes_project.bronze.crm_cust_info")

In [0]:
%sql
SELECT * FROM bara_slaes_project.bronze.crm_cust_info LIMIT 2;

In [0]:
%sql
SELECT cst_id,cst_key,cst_firstname,cst_lastname,cst_create_date FROM bara_slaes_project.bronze.crm_cust_info ORDER BY cst_create_date DESC;

In [0]:
%sql
SELECT * FROM bara_slaes_project.bronze.crm_prd_info;

In [0]:
%sql
SELECT * FROM bara_slaes_project.bronze.sales_details LIMIT 5;

----

*Data Transformation*

In [0]:
df=spark.table("bara_slaes_project.bronze.crm_cust_info")
df.display()

---

In [0]:
from pyspark.sql.functions import col
from pyspark.sql import functions as F

# 1. Get the max date
max_date = df.agg(F.max('cst_create_date')).collect()[0][0]

# 2. Filter the DataFrame
df.filter(F.col('cst_create_date') == max_date).show()


In [0]:
# *Rename Cols*
rename_mapping = {
    'cst_id':'Customer_ID',
    'cst_key':'Customer_Key',
    'cst_firstname':'Customer_first_name',
    'cst_lastname':'Customer_last_name',
    'cst_marital_status':'marital_status',
    'cst_gndr':'Gender',
    'cst_create_date':'Customer_create_date'
}

df = df.withColumnsRenamed(rename_mapping)
df.display()

In [0]:
df= df.dropna(subset=["Customer_ID"])
df=df.fillna(value="Unknown",subset=["Customer_first_name","Customer_last_name","marital_status","Gender"])


In [0]:
from pyspark.sql.functions import col
df.filter( (col('Customer_first_name').isNull() )  &  (col('Customer_last_name').isNull() ) 
           ).show()

In [0]:
from pyspark.sql.functions import col
df.filter( (col('Customer_first_name').isNull() )  |  (col('Customer_last_name').isNull() ) 
           ).show()

In [0]:
import pandas as pd
pandas_df = df.toPandas()
print("============"*5)
print("data shape")
print(pandas_df.shape)
print("============"*5)
print("Null ?")
print(pandas_df.isnull().sum())
print("============"*5)
print("duplicated ?")
pandas_df.duplicated().sum()


In [0]:
df = spark.createDataFrame(pandas_df)

In [0]:
df.printSchema()

*--->Null*

In [0]:
from pyspark.sql.functions import sum, col

df.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df.columns
]).display()


*--->Duplicatred*

In [0]:
## Duplicatred
df.groupBy(df.columns) \
  .count() \
  .filter(col("count") > 1) \
  .display()

########
df.join(
    df.groupBy("customer_id")
      .count()
      .filter(col("count") > 1),
    on="customer_id",
    how="inner"
).display()

#############
from pyspark.sql.functions import countDistinct

df.select([
    (df.count() - countDistinct(col(c))).alias(f"{c}_duplicate_count")
    for c in df.columns
]).display()

###########
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

w = Window.partitionBy("customer_id").orderBy("customer_id")

df.withColumn("is_duplicate", row_number().over(w) > 1).display()

######
#df.groupBy("gender").count().filter(col("count") > 1)
#####

from pyspark.sql.functions import col, sum, countDistinct, lit

row_count = df.count()

dq_report = None

for c in df.columns:
    stats = df.select(
        lit(c).alias("column_name"),
        sum(col(c).isNull().cast("int")).alias("null_count"),
        (row_count - countDistinct(col(c))).alias("duplicate_count")
    )
    dq_report = stats if dq_report is None else dq_report.union(stats)

dq_report.display()




*--->describe*

In [0]:
df.describe().display()
#df.select("country","gender").describe().display()

In [0]:
from pyspark.sql import Column
from pyspark.sql.functions import upper
# type(df)
#df.columns
# type(df.columns)
# df.collect()
# df.take(1)
df.tail(1)

In [0]:
df.show()

In [0]:
## Trim
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import *
for field in df.schema.fields:
    if isinstance(field.dataType,StringType):
        df=df.withColumn(field.name,trim(col(field.name)))

In [0]:
#Replace Values
Gender_replacements = {
    "F": "Female",
    "M": "Male",
    "U": "Unknown"
}
marital_status_replacements = {
    "S": "Single",
    "M": "Married",
    "Unknown": "Unknown",
}
df=df.replace(Gender_replacements,subset=["Gender"])
df=df.replace(marital_status_replacements,subset=["marital_status"])

In [0]:
# concatenate columns
from pyspark.sql.functions import concat, concat_ws, col
# df.withColumn("Customer_Full_Name", concat(col("Customer_first_name"), col("Customer_last_name"))).show()
#df.withColumn("Customer_Full_Name", concat_ws(" ",col("Customer_first_name"), col("Customer_last_name"))).show()

column_list = ["Customer_first_name", "Customer_last_name"]
df=df.withColumn("Customer_Full_Name",concat_ws(" ", *column_list))


In [0]:
df.select("Gender").distinct().count()

In [0]:
df.select("Gender").distinct().show()

---
### *Cleaning crm_prd_info Table*

In [0]:
%sql
SELECT * FROM bara_slaes_project.bronze.crm_prd_info LIMIT 5;

In [0]:
df=spark.table("bara_slaes_project.bronze.crm_prd_info")
# df.display()
# df.printSchema()

In [0]:
df=spark.table("bara_slaes_project.bronze.crm_prd_info")
## Trim
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import *
for field in df.schema.fields:
    if isinstance(field.dataType,StringType):
        df=df.withColumn(field.name,trim(col(field.name)))

# *Rename Cols*
rename_mapping = {
    'prd_id':'Product_ID',
    'prd_key':'Product_Key',
    'prd_nm':'Product_name',
    'prd_cost':'Product_Cost',
    'prd_line':'Product_Line',
    'prd_start_dt':'Product_Start_Date',
    'prd_end_dt':'Product_End_Date'
}

df = df.withColumnsRenamed(rename_mapping)

# *Replace Null Values*
median_Product_Cost = df.approxQuantile("Product_Cost", [0.5], 0.01)[0]
Replae_na_values={
    "Product_Cost":median_Product_Cost,
    "Product_Line":"Unknown"}
df=df.fillna(value=Replae_na_values)


# *Rename Values*
Product_Line_replacements = {
    "M":"Mountain",
    "R": "Road",
    "S": "Other Sales",
    "T": "Touring",
}
df=df.replace(Product_Line_replacements,subset=["Product_Line"])

df.display()

%md
---
### *Cleaning sales_details Table*

In [0]:
%sql
SELECT * FROM bara_slaes_project.bronze.sales_details WHERE sls_quantity=1 LIMIT 5;

In [0]:
%sql
SELECT * FROM bara_slaes_project.bronze.sales_details WHERE sls_quantity=2 LIMIT 5;

In [0]:
%sql
SELECT * FROM bara_slaes_project.bronze.crm_prd_info;

In [0]:
%sql
SELECT * FROM bara_slaes_project.bronze.sales_details;

In [0]:
%sql
SELECT * FROM bara_slaes_project.bronze.sales_details ORDER BY sls_quantity DESC;

In [0]:
df=spark.table("bara_slaes_project.bronze.sales_details")
## Trim
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import *
for field in df.schema.fields:
    if isinstance(field.dataType,StringType):
        df=df.withColumn(field.name,trim(col(field.name)))

In [0]:
df.schema.names

In [0]:
# *Rename Cols*
rename_mapping = {
    'sls_ord_num':'Order_Number',
    'sls_prd_key':'Product_Key',
    'sls_cust_id':'Customer_ID',
    'sls_order_dt':'Order_Date',
    'sls_ship_dt':'Ship_Date',
    'sls_due_dt':'Due_Date',
    'sls_sales':'Sales',
    'sls_quantity':'Quantity',
    'sls_price':'Price'
}

df = df.withColumnsRenamed(rename_mapping)

In [0]:
from pyspark.sql.functions import sum, col

df.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df.columns
]).show()

In [0]:
df= (
    df
    .withColumn("Price",
              when(col("Price").isNull(),
                   when(
                       col("Quantity")!=0,
                       col("Sales")/col("Quantity")
                   ).otherwise("None")
              ).otherwise(col("Price"))
    )
    .withColumn("Sales",
              when(col("Sales").isNull(),
                   when(
                       col("Quantity")!=0,
                       col("Price")*col("Quantity")
                   ).otherwise("None")
              ).otherwise(col("Sales"))
    )     
)                 

In [0]:
from pyspark.sql.functions import col, to_date
from pyspark.sql import DataFrame

def convert_int_dates(df: DataFrame, date_cols: list) -> DataFrame:
    for c in date_cols:
        df = df.withColumn(
            c,
            to_date(col(c).cast("string"), "yyyyMMdd")
        )
    return df
date_columns = ["Order_Date", "Ship_Date", "Due_Date"]

df = convert_int_dates(df, date_columns)

In [0]:
df.display()


*Never loop with filter().count() in production.*

for i in df.schema.names:  
    print(i,df.filter(col(i).isNull()).count())
    

from pyspark.sql.types import DateType, TimestampType
from pyspark.sql.functions import col

for f in df.schema.fields:
    if not isinstance(f.dataType, (DateType, TimestampType)):
        print(f.name, df.filter(col(f.name).isNull()).count())

In [0]:



# *Replace Null Values*
median_Product_Cost = df.approxQuantile("Product_Cost", [0.5], 0.01)[0]
Replae_na_values={
    "Product_Cost":median_Product_Cost,
    "Product_Line":"Unknown"}
df=df.fillna(value=Replae_na_values)


# *Rename Values*
Product_Line_replacements = {
    "M":"Mountain",
    "R": "Road",
    "S": "Other Sales",
    "T": "Touring",
}
df=df.replace(Product_Line_replacements,subset=["Product_Line"])

df.display()

In [0]:
import pandas as pd
pandas_df = df.toPandas()
print("============"*5)
print("data shape")
print(pandas_df.shape)
print("============"*5)
print("Null ?")
print(pandas_df.isnull().sum())
print("============"*5)
print("duplicated ?")
pandas_df.duplicated().sum()


In [0]:
df = spark.createDataFrame(pandas_df)

In [0]:
%sql
SELECT * FROM bara_slaes_project.bronze.crm_prd_info LIMIT 5;

In [0]:
median_Product_Cost = df.approxQuantile("Product_Cost", [0.5], 0.01)[0]
Replae_na_values={
    "Product_Cost":median_Product_Cost,
    "Product_Line":"Unknown"}
df=df.fillna(value=Replae_na_values)


In [0]:
df.filter(col("Product_End_Date").isNull()).count()

from pyspark.sql.functions import when

df.withColumn(
    "is_date_missing",
    when(col("Product_End_Date").isNull(), 1).otherwise(0)
).show()


In [0]:
Product_Line_replacements = {
    "M":"Mountain",
    "R": "Road",
    "S": "Other Sales",
    "T": "Touring",
}
df=df.replace(Product_Line_replacements,subset=["Product_Line"])

In [0]:
df.display()

In [0]:
df.select("Product_Line").distinct().count()

*Write into Silver layer*