## Import Libraries and Frameworks

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, month, year
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType

## Table creation
Create the table if it does not exist.

In [0]:
spark = SparkSession.builder.appName("Silverapp").getOrCreate()

# spark.sql("DROP TABLE IF EXISTS expense_analytics_silver.expenses_clean")

check = spark.catalog.tableExists("workspace.expense_analytics_silver.expenses_clean")

if not check:
    schema = StructType ([
        StructField("date", DateType(), True),
        StructField("category", StringType(), True),
        StructField("description", StringType(), True),
        StructField("amount", DoubleType(), True),
        StructField("payment_mode", StringType(), True)
    ])

    spark.catalog.createTable("expenses_clean", schema=schema, source="delta")

In [0]:


df = spark.read.table("workspace.expense_analytics_bronze.expenses_raw")

#Silver table amount type casting 
# silver_df = spark.read.table("workspace.expense_analytics_silver.expenses_clean")
# silver_df = silver_df.withColumn("amount", col("amount").cast("double"))
# silver_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("workspace.expense_analytics_silver.expenses_clean")


df.show()

### Data Cleaning 

In [0]:
df = df.withColumn("date", col("date").cast("date"))
df = df.withColumn("amount", col("amount").cast("double"))



df.show()

In [0]:
df = df.withColumn("month", month("date"))
df = df.withColumn("year", year("date"))
df.write.mode("overwrite").saveAsTable("workspace.expense_analytics_silver.expenses_clean")
df.show()

In [0]:
monthly_expense = df.groupBy("year", "month", "category") .sum("amount") .withColumnRenamed("sum(amount)", "total_spent")
monthly_expense.show()