# AH ASSESSMENT | DATA ENGINEERING | TRANSFORM

In [1]:
import findspark
findspark.init()

In [2]:
import os
import shutil
import logging
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,lit,to_date,regexp_replace,regexp_extract, expr
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType,DateType
from datetime import datetime
import numpy as np

In [3]:
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

### INITIALISE SPARK SESSION

In [4]:
spark=SparkSession.builder.appName('AH_ASSESSMENT_TRANSFORM').config("spark.jars.packages","com.crealytics:spark-excel_2.12:0.13.7").getOrCreate()

:: loading settings :: url = jar:file:/Users/nncube1/Library/Python/3.10/lib/python/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/nncube1/.ivy2/cache
The jars for the packages stored in: /Users/nncube1/.ivy2/jars
com.crealytics#spark-excel_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4680e424-cbf4-4493-97a5-145dcfc56b81;1.0
	confs: [default]
	found com.crealytics#spark-excel_2.12;0.13.7 in central
	found org.apache.poi#poi;4.1.2 in central
	found commons-codec#commons-codec;1.13 in central
	found org.apache.commons#commons-collections4;4.4 in central
	found org.apache.commons#commons-math3;3.6.1 in central
	found com.zaxxer#SparseBitSet;1.2 in central
	found org.apache.poi#poi-ooxml;4.1.2 in central
	found org.apache.poi#poi-ooxml-schemas;4.1.2 in central
	found org.apache.xmlbeans#xmlbeans;3.1.0 in central
	found com.github.virtuald#curvesapi;1.06 in central
	found com.norbitltd#spoiwo_2.12;1.8.0 in central
	found org.scala-lang.modules#scala-xml_2.12;1.3.0 in central
	found com.github.pjfanning#excel-streaming-reader;2.3.6 in centr

In [5]:
spark

In [6]:
#PYSPARK
spark.conf.set("spark.sql.legacy.allowUntypedScalaUDF", "true")  # Equivalent to pd.set_option('mode.chained_assignment', None)
spark.conf.set("spark.sql.repl.eagerEval.enabled", "true")  # Equivalent to pd.set_option('display.max_rows', None)
spark.conf.set("spark.sql.repl.eagerEval.maxNumRows", 100)  # Equivalent to pd.set_option('display.max_columns', None)
spark.conf.set("spark.sql.repl.eagerEval.truncate", 0)
spark.conf.set("zeppelin.spark.printREPLOutput", "true")# Equivalent to pd.set_option('display.expand_frame_repr', False)

#PANDAS
pd.set_option('mode.chained_assignment', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.expand_frame_repr', False)

### INGESTING THE FILE FROM THE RAW DATA ZONE FOR TRANSFORMATION

In [7]:
#RUN THE 'GET FILES' MODULE.THIS IS SIMILAR TO IMPORT
%run "UTILS/files.ipynb"

In [8]:
#DEFINING LOCATIONS FOR FILE
source='1.RAW_DATA_LAYER'
destination='2.TRANSFORMATION_LAYER'
artefact_file_location='4.ARTEFACTS'

In [9]:
df,file_name=get_files(source)

INFO:__main__:Successfully read modelling (1).xlsx from 1.RAW_DATA_LAYER/modelling (1).xlsx.


### DATA CLEANING

##### Sorting out dates | visit_start_date,visit_end_date,pet_dob,dt

In [10]:
df=df.withColumn("visit_start_date", to_date(col("visit_start_date"), 'dd/MM/yyyy')) \
               .withColumn("visit_end_date", to_date(col("visit_end_date"), 'dd/MM/yyyy')) \
               .withColumn("pet_dob", to_date(col("pet_dob"), 'dd/MM/yyyy')) \
               .withColumn("dt",lit(datetime.now().strftime("%Y%m%d"))) # adding partition date column based on today

logger.info(f"DATES SUCCESSFULLY PROCESSED")


INFO:__main__:DATES SUCCESSFULLY PROCESSED


##### Sorting out currency | visit_cost

In [11]:
#Remove non-numeric characters from the visit_cost column and it two decimal places

df = df.withColumn("visit_cost_numeric", expr("CAST(regexp_replace(visit_cost, '[^0-9.]', '') AS DECIMAL(10,2))")) \
                  .withColumn("currency", regexp_extract(col("visit_cost"), r'[^\d.]+', 0))

In [12]:
# See distribution of currency
count_per_currency = df.groupBy("currency").count()
count_per_currency

                                                                                

currency,count
$,11
£,989


In [13]:
''' QUESTION FOR BUSINESS,WAS THIS PAID WITH FOREIGN CURRENCY OR IS IT A MISTAKE?

    IF FOREIGN CURRENCY:
        WE NEED TO CONVERT IT FROM $ TO £
    IF MISTAKE,WE NEED TO REPLACE $ WITH £ 
    
    Since cities seem made up and $ make up 1.1% of the data,I cannot verify this
    For the sake of this assessment,I am just going to assume $ were a mistake'''



' QUESTION FOR BUSINESS,WAS THIS PAID WITH FOREIGN CURRENCY OR IS IT A MISTAKE?\n\n    IF FOREIGN CURRENCY:\n        WE NEED TO CONVERT IT FROM $ TO £\n    IF MISTAKE,WE NEED TO REPLACE $ WITH £ \n    \n    Since cities seem made up and $ make up 1.1% of the data,I cannot verify this\n    For the sake of this assessment,I am just going to assume $ were a mistake'

In [14]:
# Replace $ with £
df = df.withColumn("currency", regexp_replace(col("currency"), "\\$", "£"))

In [15]:
#Confirm removal of $
count_per_currency = df.groupBy("currency").count()
count_per_currency

currency,count
£,1000


In [16]:
# Drop visit_cost to replace it with the version without currency symbols
df=df.drop("visit_cost")
df = df.withColumnRenamed("visit_cost_numeric", "visit_cost")

logger.info(f"VISIT_COST SUCCESSFULLY PROCESSED")


INFO:__main__:VISIT_COST SUCCESSFULLY PROCESSED


### DEFINE DESIRED SCHEMA

In [17]:
custom_schema = StructType([
    StructField("visit_id", IntegerType(), True),
    StructField("visit_start_date", DateType(), True),
    StructField("visit_end_date", DateType(), True),
    StructField("visit_cost", DoubleType(), True),
    StructField("currency", StringType(), True),
    StructField("procedure_code", StringType(), True),
    StructField("procedure_desc", StringType(), True),
    StructField("hospital_id", IntegerType(), True),
    StructField("hospital", StringType(), True),
    StructField("owner_id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("postcode", StringType(), True),
    StructField("pet_id", IntegerType(), True),
    StructField("pet_name", StringType(), True),
    StructField("species", StringType(), True),
    StructField("breed", StringType(), True),
    StructField("pet_dob", DateType(), True),
    StructField("dt", StringType(), False)
])

In [18]:
# For each field,cast data type based on custom schema
for field in custom_schema.fields:
    df = df.withColumn(field.name, col(field.name).cast(field.dataType))

In [19]:
# Check if the DataFrame schema matches the predefined schema
if set(df.schema.fields) != set(custom_schema.fields):
    raise ValueError("DataFrame schema does not match the defined schema.")
    
    
logger.info(f"SCHEMA CHECK SUCCESSFULLY COMPLETED")

INFO:__main__:SCHEMA CHECK SUCCESSFULLY COMPLETED


In [20]:
#Select desired fields and enforce order

df = df.select("visit_id", "visit_start_date", "visit_end_date", "currency","visit_cost", "procedure_code", "procedure_desc", "hospital_id", "hospital", "owner_id", "first_name", "last_name", "email", "address", "city", "postcode", "pet_id", "pet_name", "species", "breed", "pet_dob","dt")


### DATA QUALITY CHECKS

##### 1. Duplicate Records 
- Removing any duplicates records in the data
- If found,save number of duplicates to text file for further analysis

In [21]:
duplicate_counts = df.groupBy(df.columns).count().filter('count > 1')
duplicates_removed=duplicate_counts.count()

if duplicates_removed != 0:
    duplicate_output_path = f"{artefact_file_location}/Duplicates_for_{file_name}.txt"
    print(duplicate_output_path)
    
    # Saving the count of duplicates to a text file for future reference
    try:
        with open(duplicate_output_path, 'w') as file:
            file.write(f"Number of Duplicates Removed: {duplicates_removed}")
            logger.info(f"Number of Duplicated removed saved to {duplicate_output_path}.")
    except Exception as e:
        logger.error(f"Error: {str(e)}")

    # Deleting duplicates from df
    df = df.dropDuplicates()
    logger.info(f"Duplicates removed")
else:
    logger.info(f"DUPLICATE CHECK SUCCESSFULLY COMPLETED")



INFO:__main__:DUPLICATE CHECK SUCCESSFULLY COMPLETED                            


##### 2. Invalid emails 
- Identify invalid emails
- If found,write records to a path in the artefacts folder
- save no of invalid emails found to a variable for later use 

In [22]:
df_invalid_emails = df.filter((df["email"].contains("@") == False) & (df["email"] != "")).distinct()
invalid_emails_path = f"{artefact_file_location}/Invalid_emails_for_{file_name}.csv"
if df_invalid_emails.count() !=0:
    try:
        df_invalid_emails.coalesce(1).write.mode("overwrite").csv(invalid_emails_path)
        no_of_invalid_emails=df_invalid_emails.count()
        logger.info(f"{no_of_invalid_emails} invalid emails found.See {invalid_emails_path} for records found")
    except Exception as e:
        logger.error(f"Error: {str(e)}")
else:
    no_of_invalid_emails=df_invalid_emails.count()
    logger.info(f"INVALID EMAILS CHECK SUCCESSFULLY COMPLETED")


INFO:__main__:INVALID EMAILS CHECK SUCCESSFULLY COMPLETED                       


#### 2. Summary stats for data quality 

- Work out no of missing values,min and max,data types and no of invalid emails per column
- Save as a csv table to visualise later.The viz will also calculate the data quality score per column and overall

In [23]:
pandas_df = df.toPandas()

                                                                                

In [24]:
# total_records = pandas_df.shape
# print(total_records)

In [25]:

# Calculate missing values minimum and maximum values for each column

#-----------------------MISSING VALUES-------------------------------------
missing_values = pandas_df.isnull().sum().reset_index()
missing_values.columns = ['field', 'missing_values']

#-----------------------MIN VALUES-------------------------------------

min_values = pandas_df.min().reset_index()
min_values.columns = ['field', 'min']

#-----------------------MAX VALUES-------------------------------------

max_values = pandas_df.max().reset_index()
max_values.columns = ['field', 'max']

#-----------------------DATA TYPES-------------------------------------


data_types = pandas_df.dtypes.reset_index()
data_types.columns = ['field', 'data_type']


# Merge the DataFrames on the 'field' column
result_df = pd.merge(data_types,missing_values, on='field')
result_df = pd.merge(result_df, min_values, on='field')
result_df = pd.merge(result_df, max_values, on='field')

#-----------------------INVALID EMAILS-------------------------------------
result_df["invalid"] = np.where(result_df["field"] == "email",no_of_invalid_emails, "")


quality_stats_path = f"{artefact_file_location}/quality_stats_for_{file_name}.csv"

try:
    result_df.to_csv(quality_stats_path, index=False,sep=',')
    logger.info(f"Successfully written Data Quality Summary Table to '{quality_stats_path}'.")
except Exception as e:
    logger.error(f"Error: {str(e)}")
result_df

logger.info(f"DATA QUALITY CHECKS SUCCESSFULLY COMPLETED")

  min_values = pandas_df.min().reset_index()
  max_values = pandas_df.max().reset_index()
INFO:__main__:Successfully written Data Quality Summary Table to '4.ARTEFACTS/quality_stats_for_modelling (1).xlsx.csv'.
INFO:__main__:DATA QUALITY CHECKS SUCCESSFULLY COMPLETED


### DIMENSIONAL MODELLING 

#### 1. Create a dimensional model 

- Create dfs for each dimension based on the original df
- Create a fact table by joining the dimension tables with the original DataFrame

In [26]:

try:
    visits = df.select(
        "visit_id",
        col("visit_start_date").alias("visit_start_date_v"),  # Alias the column to avoid naming conflicts
        col("visit_end_date").alias("visit_end_date_v"), 
        col("currency"),
        col("visit_cost").alias("visit_cost_v"),
        col("procedure_code").alias("procedure_code_v"),
        col("hospital_id").alias("hospital_id_v"),
        col("owner_id").alias("owner_id_v"),
        col("pet_id").alias("pet_id_v"),
        col("dt").alias("dt_v")
    ).distinct()

    # Create dimension tables for Procedure, Hospital, Owner, and Pet. Alias for the column 'dt' to avoid naming conflicts
    procedures = df.select("procedure_code", "procedure_desc", col("dt").alias("dt_p")).distinct()
    hospitals = df.select("hospital_id", "hospital",col("dt").alias("dt_h")).distinct()
    owners = df.select("owner_id", "first_name", "last_name", "email", "address", "city", "postcode",col("dt").alias("dt_o")).distinct()
    pets = df.select("pet_id", "pet_name", "species", "breed", "pet_dob",col("dt").alias("dt_pt")).distinct()

    # Create fact table by joining the dimension tables with the original DataFrame
    fact_table = df.join(visits, "visit_id", "inner") \
        .join(procedures, "procedure_code", "inner") \
        .join(hospitals, "hospital_id", "inner") \
        .join(owners, "owner_id", "inner") \
        .join(pets, "pet_id", "inner") \
        .select(
            "visit_id",
            "visit_start_date",
            "visit_end_date",
            "visit_cost",
            "procedure_code",
            "hospital_id",
            "owner_id",
            "pet_id",
            "dt"
        )
    logger.info(f"DIMENSION MODEL CREATED'.")
    
except Exception as e:
    logger.error(f"Error: {str(e)}")

# Show the resulting DataFrames
print("Visit Dimension:")
fact_table.printSchema()

# print("Procedure Dimension:")
# procedures

# print("Hospital Dimension:")
# hospitals

# print("Owner Dimension:")
# owners

# print("Pet Dimension:")
# pets

# print("Fact Table:")
# fact_table



INFO:__main__:DIMENSION MODEL CREATED'.


Visit Dimension:
root
 |-- visit_id: integer (nullable = true)
 |-- visit_start_date: date (nullable = true)
 |-- visit_end_date: date (nullable = true)
 |-- visit_cost: double (nullable = true)
 |-- procedure_code: string (nullable = true)
 |-- hospital_id: integer (nullable = true)
 |-- owner_id: integer (nullable = true)
 |-- pet_id: integer (nullable = true)
 |-- dt: string (nullable = false)



#### 2. Rename the columns for cleaner tables


In [27]:

visits = df.select(
    "visit_id",
    col("visit_start_date").alias("visit_start_date"),  # Remove "_v"
    col("visit_end_date").alias("visit_end_date"), 
    col("currency"),
    col("visit_cost").alias("visit_cost"),
    col("procedure_code").alias("procedure_code"),
    col("hospital_id").alias("hospital_id"),
    col("owner_id").alias("owner_id"),
    col("pet_id").alias("pet_id"),
    col("dt").alias("dt")
).distinct()


procedures = procedures.withColumnRenamed("dt_p", "dt")
hospitals = hospitals.withColumnRenamed("dt_h", "dt")
pets = pets.withColumnRenamed("dt_pt", "dt")
owners = owners.withColumnRenamed("dt_o", "dt")

##### 3. Write into the transformation layer

In [28]:
tables = ['visits', 'procedures', 'hospitals', 'owners', 'pets', 'fact_table']

for table in tables:
    parquet_folder_path = f'{destination}/{table}'
    today_date = datetime.now().strftime("%Y%m%d")
    today_partition_path = f'{parquet_folder_path}/dt={today_date}'
    try:
        visits.write.mode("overwrite").parquet(today_partition_path)
        logger.info(f"Successfully written {table} to '{today_partition_path}'.")
    except Exception as e:
        logger.error(f"Error: {str(e)}")


INFO:__main__:Successfully written visits to '2.TRANSFORMATION_LAYER/visits/dt=20231011'.
INFO:__main__:Successfully written procedures to '2.TRANSFORMATION_LAYER/procedures/dt=20231011'.
INFO:__main__:Successfully written hospitals to '2.TRANSFORMATION_LAYER/hospitals/dt=20231011'.
INFO:__main__:Successfully written owners to '2.TRANSFORMATION_LAYER/owners/dt=20231011'.
INFO:__main__:Successfully written pets to '2.TRANSFORMATION_LAYER/pets/dt=20231011'.
INFO:__main__:Successfully written fact_table to '2.TRANSFORMATION_LAYER/fact_table/dt=20231011'.
