In [None]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Cardio_Data_CleanUp").getOrCreate()

### Load Data into Spark DataFrame

In [None]:
import os
os.getcwd()

In [None]:
from pyspark import SparkFiles
csv_file_path = "/content/Resources/Cardio_Data.csv"
cardio_df_original = spark.read.csv(csv_file_path, header=True, mode="DROPMALFORMED")
cardio_df_original.show()
cardio_df_original.printSchema()

### Analysis of DataFrame

In [None]:
# Description of columns
cardio_df_original.describe().show()

In [None]:
# Change the column names
cardio_df = cardio_df_original.withColumnRenamed("ap_hi", "systolic_bp").withColumnRenamed("ap_lo", "diastolic_bp")


In [None]:
cardio_df.show()

In [None]:
# Check for null rows
Dict_Null = {col:cardio_df.filter(cardio_df[col].isNull()).count() for col in cardio_df.columns}
Dict_Null


In [None]:
# Check for duplicate rows 
cardio_df.distinct().count()

In [None]:
# Check the distinct values of the categorical variables
# https://stackoverflow.com/questions/64805788/get-distinct-values-of-multiple-columns
from pyspark.sql.functions import collect_set
from pyspark.sql.functions import col

In [None]:
columns = ["gender", "cholesterol", "gluc", "smoke", "alco", "active", "cardio"]
columnExprs = map(lambda c: collect_set(col(c)).alias(c), columns)
cardio_df.select(*columnExprs).show()

### Data Processing on continous variables

In [None]:
# Filter based on height. Keep height between 135 - 215cm
cardio_df = cardio_df.filter((cardio_df["height"] >=135) & (cardio_df["height"] <=215))

In [None]:
# The shape of the dataframe after filtering out the improbable values of height
print((cardio_df.count(), len(cardio_df.columns)))

In [None]:
# Filter based on weight. Keep height between 25 - 200kg
cardio_df = cardio_df.filter((cardio_df["weight"] >=25) & (cardio_df["weight"] <=200))

In [None]:
# The shape of the dataframe after filtering out the improbable values of weight
print((cardio_df.count(), len(cardio_df.columns)))

In [None]:
# Change the signs of negative values of systolic and diastolic bp
from pyspark.sql.functions import abs
cardio_df = cardio_df.withColumn("systolic_bp", abs(cardio_df["systolic_bp"])).withColumn("diastolic_bp", abs(cardio_df["diastolic_bp"]))

In [None]:
# Check for presence of negative values in the columns
cardio_df.filter((cardio_df["diastolic_bp"] <0)).groupby("diastolic_bp").count().show()
cardio_df.filter((cardio_df["systolic_bp"] <0)).groupby("systolic_bp").count().show()


In [None]:
# Filter based on systolic_bp values between 80 and 180. 
cardio_df = cardio_df.filter((cardio_df["systolic_bp"] >=80) & (cardio_df["systolic_bp"] <=180))

In [None]:
# Filter based on diastolic_bp values between 40 and 120. 
cardio_df = cardio_df.filter((cardio_df["diastolic_bp"] >=40) & (cardio_df["diastolic_bp"] <=120))

In [None]:
# The shape of the dataframe after filtering out the improbable values of systolic and diastolic bp
print((cardio_df.count(), len(cardio_df.columns)))

### Storing data into output csv file

In [None]:
# Write to output csvfile.
# First convert to a pandas_df
cardio_cleaned_df = cardio_df.toPandas()

# Store result
cardio_cleaned_df.to_csv("/content/Resources/cardio_data_cleaned.csv")