### loading data ###


In [0]:
%sql
use catalog service_center

In [0]:
import dlt
from pyspark.sql.functions import *

In [0]:
@dlt.table(
table_properties={"quality":"bronze"},
comment="customer data"
)
def bronze_customer():
df=spark.read.table("service_center.bronze_schema.customer_data")
return df


Name,Type
customer_id,string
Model,string
Registration_Number,string
Chassis_Number,string
Branch_id,string
Invoice_Date,date
Customer_Name,string
Contact_Number,bigint
Location,string
Pincode,int


In [0]:
@dlt.table(
table_properties={"quality":"bronze"},
comment="sales data"
)
def bronze_sales():
df=spark.read.table("service_center.bronze_schema.sales_data")
return df

Name,Type
Dealer_Name,string
Branch_id,string
Vehicle_ID,string
Model,string
Chassis_Number,string
Temporary_Registration_Number,string
Invoice_Date,date
Base_Price,int
Base_Price_Tax,int
Discount_Price,int


In [0]:
@dlt.table(
table_properties={"quality":"bronze"},
comment="branch conveniance score"
)
def bronze_branch_conveniance_score():
df=spark.read.table("service_center.bronze_schema.branch_conveniance_score")
return df

Name,Type
Branch_Name,string
Branch_ID,string
Branch_Type,string
Established_Year,int
Address,string
City,string
District,string
State,string
Pincode,int
Branch_Landline,string


In [0]:
@dlt.table(
table_properties={"quality":"bronze"},
comment="service center6 raw data"
)
def bronze_service_center():
df=spark.read.table("service_center.bronze_schema.service_centre")
return df

Name,Type
WorkOrderID,string
customer_id,string
Chassis_Number,string
Model,string
Branch_ID,string
Number_of_Visits,bigint
Start_Date,timestamp
Expected_Date,timestamp
Actual_Date,timestamp
Delay_Days,int


In [0]:
@dlt.table(
table_properties={"quality":"bronze"},
comment="sucess score"
)
def bronze_sucess_score():
df=spark.read.table("service_center.bronze_schema.success_score")
return df

Name,Type
Dealer_Name,string
Model,string
Branch_ID,string
Start_Date,timestamp
Expected_Date,timestamp
Actual_Date,timestamp
Delay_Days,int
Average_Delays,double
Normalized_Average_Delays,double
Number_of_Visits,int


In [0]:
@dlt.table(
table_properties={"quality":"bronze"},
comment="vehcile data"
)
def bronze_vehicle():
df=spark.read.table("service_center.bronze_schema.vehicle_data")
return df   

Name,Type
Vehicle_ID,string
Make,string
Model,string
price,string
Year,int
Body_Type,string
Engine_Type,string
Color,string
Mileage,double
Fuel_Type,string


In [0]:
@dlt.table(
table_properties={"quality":"silver"},
comment="customer data"
)
@dlt.expect_or_drop("customer_id is not null", "customer_id IS NOT NULL")
def silver_customer():
df=dlt.read("bronze_customer")
return df

Name,Type
customer_id,string
Model,string
Registration_Number,string
Chassis_Number,string
Branch_id,string
Invoice_Date,date
Customer_Name,string
Contact_Number,bigint
Location,string
Pincode,int


In [0]:
@dlt.table(
table_properties={"quality":"silver"},
comment="sales data"
)
@dlt.expect_or_drop("valid_chassis", "Chassis_Number IS NOT NULL")
def silver_sales():
df=dlt.read("bronze_sales")
return df

Name,Type
Dealer_Name,string
Branch_id,string
Vehicle_ID,string
Model,string
Chassis_Number,string
Temporary_Registration_Number,string
Invoice_Date,date
Base_Price,int
Base_Price_Tax,int
Discount_Price,int


In [0]:
@dlt.table(
table_properties={"quality":"silver"},
comment="silver service center"
)
@dlt.expect_or_drop("cutomer_id is not null", "customer_id IS NOT NULL")
def service_center_silver():
df=dlt.read("bronze_service_center")
return df

Name,Type
WorkOrderID,string
customer_id,string
Chassis_Number,string
Model,string
Branch_ID,string
Number_of_Visits,bigint
Start_Date,timestamp
Expected_Date,timestamp
Actual_Date,timestamp
Delay_Days,int


In [0]:
import dlt
@dlt.view(
comment="customer sales"
)
def customer_sales():
df1 = dlt.read("silver_customer").select("Chassis_Number","Location")
df2 = dlt.read("silver_sales").select(
"Chassis_Number",
"Branch_id",
"Vehicle_ID",
"Model",
"Temporary_Registration_Number",
"Invoice_Date",
"Tags",
"Final_Price",
"customer_id"
)
# Inner join on Chassis_Number and select relevant fields
df_final = df2.join(df1, df1.Chassis_Number == df2.Chassis_Number, "inner") \
.select(
df2.Chassis_Number,
df2.Branch_id,
df2.Vehicle_ID,
df2.Model,
df2.Temporary_Registration_Number,
df2.Invoice_Date,
df2.Tags,
df2.Final_Price,
df2.customer_id,
df1.Location
)
return df_final


Name,Type
Chassis_Number,string
Branch_id,string
Vehicle_ID,string
Model,string
Temporary_Registration_Number,string
Invoice_Date,date
Tags,string
Final_Price,double
customer_id,string
Location,string


In [0]:
from pyspark.sql.functions import sum

In [0]:
@dlt.table(
table_properties={"quality":"gold"},
comment="sales agrregated branch wise"
)
def gold_sales_branch_wise():
df=dlt.read("customer_sales")
df_final=df.groupBy("Branch_id").agg(sum("Final_Price").alias("total_sales_branchwise"))
return df_final

Name,Type
Branch_id,string
total_sales_branchwise,double


In [0]:
from pyspark.sql.functions import col
import dlt
@dlt.view(
comment="not visited service center"
)
def customer_not_visited():
df1 = dlt.read("silver_customer").select("Chassis_Number", "customer_id", "Branch_id")
df2 = dlt.read("service_center_silver").select("Chassis_Number")
df_final = df1.join(
df2,
df1["Chassis_Number"] == df2["Chassis_Number"],
"left_anti"
).select(
df1.Chassis_Number,
df1.customer_id,
df1.Branch_id
)
return df_final


Name,Type
Chassis_Number,string
customer_id,string
Branch_id,string


In [0]:
import dlt
from pyspark.sql.functions import col
@dlt.view(
comment="Customer and service center merged data"
)
def customer_service_center():
df1 = dlt.read("silver_customer").select("Chassis_Number", "Invoice_Date","Location","Customer_Name","Email_ID")
df2 = dlt.read("service_center_silver").select(
"Chassis_Number",
"customer_id",
"Branch_ID",
"Model",
"Speciality",
"Parts_Added",
"Total_Price",
"Previous_Service_Date"
)
df_final = df2.join(
df1,
df2["Chassis_Number"] == df1["Chassis_Number"],
"inner"
).select(
df2["Chassis_Number"],
df2["customer_id"],
df2["Branch_ID"],
df2["Model"],
df2["Speciality"],
df2["Parts_Added"],
df2["Total_Price"],
df1["Invoice_Date"],
df2["Previous_Service_Date"],
df1["Location"],
df1["Customer_Name"],
df1["Email_ID"]
)
return df_final

Name,Type
Chassis_Number,string
customer_id,string
Branch_ID,string
Model,string
Speciality,string
Parts_Added,string
Total_Price,double
Invoice_Date,date
Previous_Service_Date,timestamp
Location,string


In [0]:
# from pyspark.sql.functions import col, datediff, current_timestamp
# @dlt.table(
#     comment="Customers who have not visited the service center in the last 120 days"
# )
# def not_visited_120_days():
#     df = dlt.read("customer_service_center")
#     df = df.withColumn("Previous_Service_Date", col("Previous_Service_Date").cast("timestamp")) \
#            .select("Previous_Service_Date", "Customer_Name", "Email_ID", "Branch_ID", "customer_id")
#     df_filtered = df.filter(
#         datediff(current_timestamp(), col("Previous_Service_Date")) > 120
#     )
#     df_result = df_filtered.dropna().distinct()
#     return df_result


Name,Type
Branch_ID,string
customer_id,string


In [0]:
from pyspark.sql.functions import col, datediff, current_date, max as spark_max
@dlt.table(
comment="Customers who have visited the service center in the last 120 days"
)
def visited_120_days():
df = dlt.read("customer_service_center")
df = df.withColumn("Previous_Service_Date", col("Previous_Service_Date").cast("date")) \
.select("customer_id", "Customer_Name", "Email_ID", "Branch_ID", "Previous_Service_Date")
latest_service = df.groupBy("customer_id").agg(
spark_max("Previous_Service_Date").alias("Last_Service_Date")
)
inactive_customers = latest_service.filter(
datediff(current_date(), col("Last_Service_Date")) <= 120
)
final_result = inactive_customers.join(df, on="customer_id", how="left") \
.select("customer_id", "Customer_Name", "Email_ID", "Branch_ID", "Last_Service_Date") \
.dropDuplicates(["customer_id"])
return final_result


Name,Type
Previous_Service_Date,timestamp
Customer_Name,string
Email_ID,string
Branch_ID,string
customer_id,string


In [0]:
from pyspark.sql.functions import col, datediff, current_date, max as spark_max
@dlt.table(
comment="Customers who have not visited the service center in the last 120 days"
)
def not_visited_120_days():
df = dlt.read("customer_service_center")
df = df.withColumn("Previous_Service_Date", col("Previous_Service_Date").cast("date")) \
.select("customer_id", "Customer_Name", "Email_ID", "Branch_ID", "Previous_Service_Date","Model")
latest_service = df.groupBy("customer_id").agg(
spark_max("Previous_Service_Date").alias("Last_Service_Date")
)
inactive_customers = latest_service.filter(
datediff(current_date(), col("Last_Service_Date")) > 120
)
final_result = inactive_customers.join(df, on="customer_id", how="left") \
.select("customer_id", "Customer_Name", "Email_ID", "Branch_ID", "Last_Service_Date","Model") \
.dropDuplicates(["customer_id"])
return final_result


In [0]:
# from pyspark.sql.functions import col, datediff, current_timestamp
# @dlt.table(
#     comment="Customers who have not visited the service center in the last 120 days"
# )
# def not_visited_120_days():
#     df = dlt.read("customer_service_center")
#     df = df.withColumn("Previous_Service_Date", col("Previous_Service_Date").cast("timestamp")) \
#            .select("Previous_Service_Date", "Customer_Name", "Email_ID", "Branch_ID", "customer_id")
#     df_filtered = df.filter(
#         datediff(current_timestamp(), col("Previous_Service_Date")) > 120
#     )
#     df_result = df_filtered.dropna().distinct()
#     return df_result


In [0]:
# @dlt.table(
#     comment="Customers who not visited within 120 days"
# )
# def not_visited_120_days():
#     df = dlt.read("customer_service_center")
#     df = df.withColumn("Previous_Service_Date", col("Previous_Service_Date").cast("timestamp"))
#     df_final = df.filter(
#         datediff(current_timestamp(), col("Previous_Service_Date")) > 120
#     )
#     df_final=df_final.groupBy("Branch_ID","customer_id").agg(sum("Branch_ID").alias("sum"))
#     df_final =df_final.select("Branch_ID", "customer_id").dropna()
#     return df_final

In [0]:
# @dlt.table(
#     comment="Visited_120days_Customer_Details"
# )
# def gold_Visited_120days_Customer_Details():
#     df=dlt.read("visited_120_days")
#     final_df=df.groupBy("Branch_ID","customer_id").agg(sum("Branch_ID").alias("sum"))
#     final_df = final_df.select("Branch_ID", "customer_id").dropna()
#     return final_df

In [0]:
# @dlt.table(
#     comment="not_visited_120_days_customer_details"
# )
# def gold_not_visited_120_days_customer_details():
#     df=dlt.read("not_visited_120_days")
#     final_df=df.groupBy("Branch_ID","customer_id").agg(sum("Branch_ID").alias("sum"))
#     final_df = final_df.select("Branch_ID", "customer_id").dropna()
#     return final_df

In [0]:
@dlt.table(
table_properties={"quality":"silver"},
comment="silver_success_score"
)
def silver_success_score():
df=dlt.read("bronze_sucess_score")
return df

Name,Type
Dealer_Name,string
Model,string
Branch_ID,string
Start_Date,timestamp
Expected_Date,timestamp
Actual_Date,timestamp
Delay_Days,int
Average_Delays,double
Normalized_Average_Delays,double
Number_of_Visits,int


In [0]:
@dlt.table(
table_properties={'quality':'gold'},
comment="gold_success_score"
)
def gold_success_score():
df=dlt.read("silver_success_score")
df_final=df.groupBy(["Branch_ID","Model","Branch_Convenience_Score"]).agg(sum("Branch_Convenience_Score").alias("overall_score"))
return df_final

Name,Type
Branch_ID,string
Model,string
Branch_Convenience_Score,double
overall_score,double


In [0]:
@dlt.table(
table_properties={'quality':'silver'},
comment="silver_brach_convenience_score"
)
def silver_branch_convenience_score():
df=dlt.read("bronze_branch_conveniance_score")
return df

Name,Type
Branch_Name,string
Branch_ID,string
Branch_Type,string
Established_Year,int
Address,string
City,string
District,string
State,string
Pincode,int
Branch_Landline,string


In [0]:
from pyspark.sql.functions import sum
import dlt
@dlt.table(
comment="Gold level: branch-wise overall convenience score"
)
def gold_branch_convenience_score():
df = dlt.read("silver_branch_convenience_score")
df_final = df.groupBy("Branch_ID") \
.agg(sum("convience_score").alias("overall_score"))
return df_final


Name,Type
Branch_ID,string
overall_score,double


In [0]:
@dlt.table(
comment="Location_Wise_Sales"
)
def gold_location_wise_sales():
df=dlt.read("customer_sales")
final_df=df.groupBy("Location").agg(sum("Final_Price").alias("Total_Price"))
return final_df

Name,Type
Location,string
Total_Price,double


In [0]:
@dlt.table(
comment="Branch_Wise_Revenue_Generated"
)
def gold_branch_wise_revenue():
df=dlt.read("customer_sales")
final_df=df.groupBy("Branch_ID").agg(sum("Final_Price").alias("Total_Revenue"))
return final_df

Name,Type
Branch_ID,string
Total_Revenue,double


In [0]:
@dlt.table(
comment="Tags_Wise_Revenue_Generated"
)
def gold_tags_wise_revenue():
df=dlt.read("customer_sales")
final_df=df.groupBy("Branch_ID","Model","Tags").agg(sum("Final_Price").alias("Total_Revenue"))
return final_df

Name,Type
Branch_ID,string
Model,string
Tags,string
Total_Revenue,double


In [0]:
@dlt.table(
comment="Service_Branch_revenue"
)
def gold_service_branch_revenue():
df=dlt.read("customer_service_center")
final_df=df.groupBy("Branch_ID").agg(sum("Total_Price").alias("Total_Revenue"))
return final_df

Name,Type
Branch_ID,string
Total_Revenue,double


In [0]:
@dlt.table(
comment="Branch_Wise_Customers"
)
def gold_Branch_Wise_Customers():
df=dlt.read("customer_service_center")
final_df=df.groupBy("Branch_ID").agg(count("customer_id").alias("Total_Customers"))
return final_df

Name,Type
Branch_ID,string
Total_Customers,bigint


In [0]:
@dlt.table(
comment="Location_wise_Customers"
)
def gold_Location_wise_Customers():
df=dlt.read("customer_service_center")
final_df=df.groupBy("Location").agg(count("customer_id").alias("Total_Customers"))
return final_df

Name,Type
Location,string
Total_Customers,bigint


In [0]:
@dlt.table(
comment="Branch_wise_Vehicles"
)
def gold_Branch_wise_Vehicles():
df=dlt.read("customer_service_center")
final_df=df.groupBy("Branch_ID","Model").agg(count("Model").alias("total_visits"))
return final_df

Name,Type
Branch_ID,string
Model,string
total_visits,bigint
