# 2. Transform

## 1. Silver class

In [0]:
%python

# Pakettien importtaus
import pyspark.sql.functions as F

class Silver():
    def __init__(self, silver_catalog, bronze_catalog, schema):
        '''Defining catalogs & schema.'''
        self.silver_catalog = silver_catalog
        self.bronze_catalog = bronze_catalog 
        self.schema = schema

    def assert_missing(self, df):
        # Check no missing values
        print("Asserting no missing values...")
        for col in df.columns:
            miss = df.filter(F.col(col).isNull()).count()
            assert miss == 0, print(f"Column {col} has missing values!")
        return df
    
    def print_dtypes(self, old_df, new_df):
        # Print changes in data types
        for i in range(len(old_df.dtypes)):
            var, old_type = old_df.dtypes[i]
            new_type = new_df.dtypes[i][1]
            if old_type != new_type:
                print(f"The type of {var} was changed: {old_type} > {new_type}.")
            else:
                print(f"The type of {var} was not changed: {old_type}.")

    def clean_and_save_visits(self):
        # Read data from bronze layer
        dataset_name = "visits"
        visits = spark.table(f"{self.bronze_catalog}.{self.schema}.{dataset_name}_{self.bronze_catalog}")

        # display(visits.head(2))
        print("Cleaning data...")
        visits_clean = (visits
              .dropDuplicates() # Drop duplicates
              .withColumn("Aika", F.expr("try_cast(Aika as int)")) # Cast column types
              .withColumn("val", F.expr("try_cast(val as int)"))
              .filter(F.col("val").isNotNull()) # Filter out rows where "val" is missing
              .withColumnRenamed('val', 'Käyntimäärä') # Rename columns
              .withColumnRenamed('Aika', 'Vuosi')
        )
        self.print_dtypes(visits, visits_clean)
        self.assert_missing(visits_clean)

        print("Saving data...")
        visits_clean.write.format("delta").mode("overwrite").saveAsTable(f"{self.silver_catalog}.{self.schema}.{dataset_name}_{self.silver_catalog}")

    def clean_and_save_customers(self):
        dataset_name = "customers"
        # Read data from bronze layer
        customers = spark.table(f"{self.bronze_catalog}.{self.schema}.{dataset_name}_{self.bronze_catalog}")

        # display(customers.head(2))
        print("Cleaning data...")
        customers_clean = (customers
              .dropDuplicates() # Drop duplicates
              .withColumn("Aika", F.expr("try_cast(Aika as int)")) # Cast column types
              .withColumn("val", F.expr("try_cast(val as int)"))
              .filter(F.col("val").isNotNull()) # Filter out rows where "val" is missing
              .withColumnRenamed('val', 'Asiakasmäärä') # Rename columns
              .withColumnRenamed('Aika', 'Vuosi')
        )
        self.print_dtypes(customers, customers_clean)
        self.assert_missing(customers_clean)

        print("Saving data...")
        customers_clean.write.format("delta").mode("overwrite").saveAsTable(f"{self.silver_catalog}.{self.schema}.{dataset_name}_{self.silver_catalog}")

    def clean_and_save_visits_customers(self):
        dataset_name = "visits_customers"
        # Read data from bronze layer
        visits_customers = spark.table(f"{self.bronze_catalog}.{self.schema}.{dataset_name}_{self.bronze_catalog}")

        # display(visits_customers.head(2))
        print("Cleaning data...")
        visits_customers_clean = (visits_customers
              .dropDuplicates() # Drop duplicates
              .withColumn("Aika", F.expr("try_cast(Aika as int)")) # Cast column types
              .withColumn("val", F.expr("try_cast(val as float)"))
              .filter(F.col("val").isNotNull()) # Filter out rows where "val" is missing
              .withColumnRenamed('val', 'Käynnit_per_asiakas') # Rename columns
              .withColumnRenamed('Aika', 'Vuosi')
        )
        self.print_dtypes(visits_customers, visits_customers_clean)
        self.assert_missing(visits_customers_clean)

        print("Saving data...")
        visits_customers_clean.write.format("delta").mode("overwrite").saveAsTable(f"{self.silver_catalog}.{self.schema}.{dataset_name}_{self.silver_catalog}")

    def execute_silver_pipeline(self):
        print("Processing dataset Visits...")
        self.clean_and_save_visits()
        print("Done!")
        print("Processing dataset Customers...")
        self.clean_and_save_customers()
        print("Done!")
        print("Processing dataset Visits per Customer...")
        self.clean_and_save_visits_customers() 
        print("Done!")


## 2. Transform & clean the data

In [0]:
silver = Silver("silver", "bronze", "avohilmo")
silver.execute_silver_pipeline()