In [0]:
import pyspark.sql.functions as F
import logging
from pyspark.sql.types import ArrayType, StringType, IntegerType, DoubleType, LongType, StructType, StructField

In [0]:
%run ./logger

In [0]:
logger = Logger("silver_logger", logging.INFO).setup()
logger.info("Logging started for silver...")


In [0]:
base_path = '/Volumes/students_data/default/landing'
date = '09-01-2023'
filename = 'LMS_'+date+'.csv'
file_path = base_path+'/'+filename

catalog_name = "students_data"
bronze_table_name = "bronze_students_table"
bronze_schema_name = "students_bronze"
silver_table_name = "silver_students_table"
silver_schema_name = "students_silver"

In [0]:
class Silver:
    def __init__(self, catalog_name, bronze_table_name, bronze_schema_name, silver_table_name,silver_schema_name):
        self.bronze_table_name = bronze_table_name
        self.bronze_schema_name = bronze_schema_name
        self.silver_table_name = silver_table_name
        self.silver_schema_name = silver_schema_name
        self.catalog_name = catalog_name

    def read_bronze_table(self):
        silver_df = spark.sql(f"select * from {self.catalog_name}.{self.bronze_schema_name}.{self.bronze_table_name} where File_Name='{file_path}'") 
        logger.info("silver data read completed...")

        if silver_df.count()==0:
            raise Exception("No data found in bronze table")

        return silver_df

    def transform(self, silver_df):
        silver_df = silver_df.filter(silver_df["Age"]>0) \
                    .dropDuplicates(subset=["Student_ID", "Course_ID"]) \
                    .withColumn("First_Name",F.split(silver_df["Name"], " ").getItem(0)) \
                    .withColumn("Last_Name",F.split(silver_df["Name"], " ").getItem(1))\
                    .drop(F.col("Name"), F.col("Load_Time")) \
                    .withColumn("Gender", F.when(F.upper(silver_df["Gender"])=='M', "Male") \
                                            .when(F.upper(silver_df["Gender"])=='F', "Female").otherwise("Unknown")) 
                    

        transformed_df = silver_df.withColumn("Enrollment_Date", F.to_date(silver_df["Enrollment_Date"], 'M/d/yyyy'))\
                            .withColumn("Completion_Date", F.to_date(silver_df["Completion_Date"], 'M/d/yyyy'))\
                            .withColumn("Assignment_Scores",
                                        F.split(F.regexp_replace(silver_df["Assignment_Scores"],'[\[\]\s]',''),',').cast(ArrayType(IntegerType()))) \
                            .withColumn("Course_Duration", F.datediff(silver_df["Completion_Date"], silver_df["Enrollment_Date"]))

        return transformed_df
    
    def write_silver_table(self, transformed_df):
        transformed_df.write.format("delta").mode("append").saveAsTable(f"{self.catalog_name}.{self.silver_schema_name}.{self.silver_table_name}")
        logger.info("Loaded silver table...")


obj = Silver(catalog_name=catalog_name, bronze_schema_name=bronze_schema_name, bronze_table_name=bronze_table_name, silver_schema_name=silver_schema_name, silver_table_name=silver_table_name)      
silver_df = obj.read_bronze_table()
clean_silver_df = obj.transform(silver_df)   
obj.write_silver_table(clean_silver_df)     

        