In [1]:
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, IntegerType, FloatType
import os

### Instantiating a Spark session and reading csv

In [2]:
spark = SparkSession.builder.getOrCreate()
csv= "Sleep_health_and_lifestyle_dataset.csv"
data = spark.read.csv(csv, header=True, inferSchema=True)
data.printSchema()

root
 |-- Person ID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Sleep Duration: double (nullable = true)
 |-- Quality of Sleep: integer (nullable = true)
 |-- Physical Activity Level: integer (nullable = true)
 |-- Stress Level: integer (nullable = true)
 |-- BMI Category: string (nullable = true)
 |-- Blood Pressure: string (nullable = true)
 |-- Heart Rate: integer (nullable = true)
 |-- Daily Steps: integer (nullable = true)
 |-- Sleep Disorder: string (nullable = true)



### Defining fuctions in charge of performing data cleaning 

In [3]:
def assign_row_numbers(df, column, distinct=True):
    window = Window.orderBy(column)
    column_with_numbers = df.withColumn("ID_" + column, F.dense_rank().over(window)-1)
    if distinct:
        return column_with_numbers.distinct() 
    else: 
        return column_with_numbers

def check_duplicates(data, column):
    duplicates = data.groupBy(column).count().filter(F.col("count") > 1)
    if duplicates.count() != 0:
        print("Hay duplicados")
        print(f"Nº de duplicados: {duplicates.count()}")
        data = data.withColumn(column, F.col(column)).dropDuplicates() 
    return data

def cleaning(data, id_column, columnas_float, columnas_union, columnas_num): 
    data = data.withColumn('Sleep Duration', F.col('Sleep Duration').cast('float'))
    
    data = data.withColumn(columnas_union, F.when(F.col(columnas_union).isin(['Normal', 'Normal Weight']), 'Normal Weight') \
                           .otherwise(F.col(columnas_union))) 
    
    data = data.withColumn('Sistolic Pressure', F.split(F.col(columnas_float), '/').getItem(0).cast('float')) 
    data = data.withColumn('Diastolic Pressure', F.split(F.col(columnas_float), '/').getItem(1).cast('float'))

    for col in columnas_num:
        data = assign_row_numbers(data, col, distinct = False)
    
    data = check_duplicates(data, id_column)

    new_names = {col: col.replace(' ', '_') for col in data.columns if ' ' in col}
    data = data.withColumnsRenamed(new_names)
    
    return data

### Defining functions that handle the following quality testing: 
- Uniqueness
- Integrity
- Consistency
- Validity
- Completeness

In [8]:
def validate_type(df):
    required_types = {
        'Person_ID': IntegerType(),               'Gender': StringType(),
        'Age': IntegerType(),                     'Occupation': StringType(),
        'Sleep_Duration': FloatType(),            'Quality_of_Sleep': IntegerType(),
        'Physical_Activity_Level': IntegerType(), 'Stress_Level': IntegerType(),
        'BMI_Category': StringType(),             'Blood_Pressure': StringType(),
        'Heart_Rate': IntegerType(),              'Daily_Steps': IntegerType(),
        'Sleep_Disorder': StringType(),           'Sistolic_Pressure': FloatType(),
        'Diastolic_Pressure': FloatType(),        'ID_Sleep_Disorder': IntegerType(), 
        'ID_Gender': IntegerType(),               'ID_Occupation': IntegerType(), 
        'ID_BMI_Category': IntegerType()
    }
    for columna, type in required_types.items():
        if df.schema[columna].dataType != type:
            print(f'La columna {columna} tiene tipado incorrecto: \
                 {type} en lugar de {df.schema[columna].dataType}')
    return True

def verify_records_number(df, table_bd):
    df_records = df.count()
    sleep_table = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", table_bd) \
    .options(**properties) \
    .load()
    
    bd_records = sleep_table.count()
    if df_records == bd_records:
        print(f'Nº of records in df ({df_records}) coincides with the nº of records in bd ({bd_records})')
    else: 
        print(f'It does not coincide the nº of records in df ({df_records}) with the nº of records in db ({bd_records})')

def verify_nulls(df):
    for column in data.columns:
        if df.filter(F.col(column).isNull()).count() > 0: 
            print(f"There are {df.filter(F.col(column).isNull()).count()} mulls in column {column}")
        else:
            hay_nulos = False
    if not hay_nulos:  
        print('There are not nulls')

def verify_duplicates(df):
    return df.dropDuplicates().count() == df.count()

In [5]:
id_column = "Person ID"
float_columns = "Blood Pressure"
union_columns = "BMI Category"
num_columns = ["Sleep Disorder", "Gender", "Occupation", "BMI Category"]
data = cleaning(data, id_column, float_columns, union_columns, num_columns)
data.printSchema()

root
 |-- Person_ID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Sleep_Duration: float (nullable = true)
 |-- Quality_of_Sleep: integer (nullable = true)
 |-- Physical_Activity_Level: integer (nullable = true)
 |-- Stress_Level: integer (nullable = true)
 |-- BMI_Category: string (nullable = true)
 |-- Blood_Pressure: string (nullable = true)
 |-- Heart_Rate: integer (nullable = true)
 |-- Daily_Steps: integer (nullable = true)
 |-- Sleep_Disorder: string (nullable = true)
 |-- Sistolic_Pressure: float (nullable = true)
 |-- Diastolic_Pressure: float (nullable = true)
 |-- ID_Sleep_Disorder: integer (nullable = false)
 |-- ID_Gender: integer (nullable = false)
 |-- ID_Occupation: integer (nullable = false)
 |-- ID_BMI_Category: integer (nullable = false)



In [6]:
verify_nulls(data)

No hay nulos


In [9]:
validate_type(data)

True

In [10]:
verify_duplicates(data)

True

In [11]:
data = data.withColumn("Person_ID", data["Person_ID"] - 1)

In [12]:
data.agg(F.min("Person_ID")).collect()[0][0]

0

In [13]:
selected_column = data.select("Gender")
df_gender = assign_row_numbers(selected_column, "Gender")
df_gender.show()

+------+---------+
|Gender|ID_Gender|
+------+---------+
|Female|        0|
|  Male|        1|
+------+---------+



In [14]:
selected_column = data.select("Sleep_Disorder")
df_sleep_disorder = assign_row_numbers(selected_column, "Sleep_Disorder")
df_sleep_disorder.show()

+--------------+-----------------+
|Sleep_Disorder|ID_Sleep_Disorder|
+--------------+-----------------+
|      Insomnia|                0|
|          None|                1|
|   Sleep Apnea|                2|
+--------------+-----------------+



In [15]:
selected_column = data.select("BMI_category")
df_bmi_category = assign_row_numbers(selected_column, "BMI_category")
df_bmi_category.show()

+-------------+---------------+
| BMI_category|ID_BMI_category|
+-------------+---------------+
|Normal Weight|              0|
|        Obese|              1|
|   Overweight|              2|
+-------------+---------------+



In [16]:
selected_column = data.select("Occupation")
df_occupation = assign_row_numbers(selected_column, "Occupation")
df_occupation.show()

+--------------------+-------------+
|          Occupation|ID_Occupation|
+--------------------+-------------+
|          Accountant|            0|
|              Doctor|            1|
|            Engineer|            2|
|              Lawyer|            3|
|             Manager|            4|
|               Nurse|            5|
|Sales Representative|            6|
|         Salesperson|            7|
|           Scientist|            8|
|   Software Engineer|            9|
|             Teacher|           10|
+--------------------+-------------+



In [17]:
facts_table         = 'facts'
dim_occupation_table = 'occupation'
dim_gender_table     = 'gender'
dim_bmi_table        = 'bmi'
dim_disorder_table   = 'disorder'

In [20]:
url = f"jdbc:mysql://{os.environ['DB_HOST']}:3306/{os.environ['DB_NAME']}"
properties = {
    "driver"   : "com.mysql.jdbc.Driver",
    "user"     : os.environ["DB_USER"],
    "password" : os.environ["DB_PASSWORD"]
    }

In [21]:
cols = [
    'Person_ID','ID_Gender','Age','ID_Occupation','Sleep_Duration','Quality_of_Sleep','Physical_Activity_Level',
    'Stress_Level','Heart_Rate','Daily_Steps','ID_Sleep_Disorder','Sistolic_Pressure','Diastolic_Pressure','ID_BMI_Category'
    ]

data.select(cols).write.format("jdbc") \
    .option("url", url) \
    .option("dbtable", facts_table) \
    .options(**properties) \
    .save()

In [22]:
dict_tb = {
    dim_gender_table    : df_gender,
    dim_disorder_table  : df_sleep_disorder,
    dim_bmi_table       : df_bmi_category,
    dim_occupation_tables: df_occupation
}
for tb, df in dict_tb.items():
    df.select("*").write.format("jdbc") \
    .option("url", url) \
    .option("dbtable", tb) \
    .options(**properties) \
    .save()

In [23]:
def verify_relationships(table_pk, pk, table_fk, fk):
    upperbound_dim = data.agg(F.max(fk)).collect()[0][0]
    lowerbound_dim = data.agg(F.min(fk)).collect()[0][0]
    upperbound_hechos = data.agg(F.max(pk)).collect()[0][0]
    lowerbound_hechos = data.agg(F.min(pk)).collect()[0][0]
    #print(lowerbound_hechos,upperbound_hechos,lowerbound_dim,upperbound_dim)
    facts_id = spark.read.format("jdbc") \
                .option("url", url) \
                .option("dbtable", table_pk) \
                .option("partitionColumn", pk) \
                .option("lowerBound", lowerbound_hechos) \
                .option("upperBound", upperbound_hechos) \
                .option("numPartitions", 1) \
                .options(**properties) \
                .load()
    dim_id = spark.read.format("jdbc") \
             .option("url", url) \
             .option("dbtable", table_fk) \
             .option("partitionColumn", fk) \
             .option("lowerBound", lowerbound_dim) \
             .option("upperBound", upperbound_dim) \
             .option("numPartitions", 1) \
             .options(**properties) \
             .load()

    # Verify consistency between PKs and FKs
    consistency = dim_id.join(facts_id, dim_id[fk] == facts_id[pk], "leftanti")

    # Show unconsitent records
    if consistency.count() > 0:
        return print(f"There is not consistency between keys: {consistency}")
    else:
        return "There is consistency between keys"

In [24]:
verify_relationships(facts_table, "Person_ID", dim_disorder_table, "ID_Sleep_Disorder")

'Hay consistencia entre las claves'

In [25]:
verify_records_number(data, 'facts')

Coinciden el nº de registros en df 374 con registros en bd 374
