# Spark DataFrame - Data Cleaning
There are three options when dealing with missing data: 
1. Changing the data to null
2. Drop the data point (or entire row)
3. Fill it in with a different value

These points are dependent on your requirements. 

Objective: Let's explore our options when it comes to cleaning a basic dataset.

In [1]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('missing').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/03 04:41:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/03 04:41:05 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [6]:
# Importing data which has a header. Schema is automatically configured.
df = spark.read.csv('modify1.csv', header=True, inferSchema=True)
heart_data_situation = spark.read.csv('heart-data-situation.csv', header=True, inferSchema=True)

merged_df = df.join(heart_data_situation, on="id", how="inner")  

# Let's see the data. You'll notice nulls.
merged_df.show()

+---+-----+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
| id|index|  age|gender|height|weight|ap_hi|ap_lo|cholesterol|gluc|smoke|alco|active|cardio|
+---+-----+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
|  0|    0|18393|     2|   168|  62.0|  110|   80|          1|   1|    0|   0|     1|     0|
|  1|    1|20228|     1|   156|  85.0|  140|   90|          3|   1|    0|   0|     1|     1|
|  2|    2|18857|     1|   165|  64.0|  130|   70|          3|   1|    0|   0|     0|     1|
|  3|    3|17623|     2|   169|  82.0|  150|  100|          1|   1|    0|   0|     1|     1|
|  4|    4|17474|     1|   156|  56.0|  100|   60|          1|   1|    0|   0|     0|     0|
|  8|    5|21914|     1|   151|  67.0|  120|   80|          2|   2|    0|   0|     0|     0|
|  9|    6|22113|     1|   157|  93.0|  130|   80|          3|   1|    0|   0|     1|     0|
| 12|    7|22584|     2|   178|  95.0|  130|   90|          3|   3|   

In [16]:
from pyspark.ml.classification import RandomForestClassifier

feature_cols = ['index', 'id', 'age', 'height', 'weight', 'ap_hi', 'ap_lo', 'cholesterol', 'gluc', 'smoke', 'alco', 'active']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
assembled_data = assembler.transform(merged_df)

rf = RandomForestClassifier(featuresCol="features", labelCol="cardio")
model = rf.fit(assembled_data)

importances = model.featureImportances.toArray()

importances_list = [(feature, float(importance)) for feature, importance in zip(feature_cols, importances)]
importance_df = spark.createDataFrame(importances_list, ["Feature", "Importance"])

importance_df = importance_df.orderBy("Importance", ascending=False)

from pyspark.sql.functions import round

importance_df = importance_df.withColumn("Importance", round(importance_df["Importance"], 2))
importance_df.show()




                                                                                

+-----------+----------+
|    Feature|Importance|
+-----------+----------+
|      ap_hi|      0.53|
|      ap_lo|      0.25|
|        age|       0.1|
|cholesterol|       0.1|
|     weight|      0.01|
|       gluc|      0.01|
|     active|       0.0|
|     height|       0.0|
|      smoke|       0.0|
|       alco|       0.0|
|         id|       0.0|
|      index|       0.0|
+-----------+----------+



In [19]:
merged_df.describe().toPandas()

                                                                                

Unnamed: 0,summary,id,index,age,gender,height,weight,ap_hi,ap_lo,cholesterol,gluc,smoke,alco,active,cardio
0,count,68056.0,68056.0,68056.0,68056.0,68056.0,68056.0,68056.0,68056.0,68056.0,68056.0,68056.0,68056.0,68056.0,68056.0
1,mean,49961.38948806865,34991.75198366051,19467.79632949336,1.3482132361584578,164.41048548254378,73.72453273774539,126.59862466204302,81.28977606676854,1.3629657928764547,1.2239773128012226,0.0875602445045256,0.0531033266721523,0.8037057717174092,0.4936228987892324
2,stddev,28838.84055511243,20198.679156291822,2466.8985187743706,0.4764075075852865,7.825610733050222,13.423933068620316,16.517518537922864,9.398565292233744,0.6778290292171032,0.5698690836303668,0.2826563674162662,0.2242411697974277,0.3971965790282978,0.4999630041132576
3,min,0.0,0.0,10798.0,1.0,130.0,30.0,90.0,40.0,1.0,1.0,0.0,0.0,0.0,0.0
4,max,99999.0,69999.0,23713.0,2.0,207.0,120.0,220.0,140.0,3.0,3.0,1.0,1.0,1.0,1.0


In [20]:
from pyspark.sql.functions import col, round

# Calculate BMI
merged_df = merged_df.withColumn("bmi", col("weight") / ((col("height") / 100) ** 2))

# Round BMI to 2 decimal places
merged_df = merged_df.withColumn("bmi", round(col("bmi"), 2))

# Show descriptive statistics for BMI
merged_df.describe("bmi").show()


+-------+------------------+
|summary|               bmi|
+-------+------------------+
|  count|             68056|
|   mean|27.313671829080924|
| stddev| 4.934954139584941|
|    min|             10.73|
|    max|             58.02|
+-------+------------------+



In [22]:
from pyspark.sql.functions import when, col, percentile_approx

# Calculate the median of bmi
median_bmi = merged_df.approxQuantile("bmi", [0.5], 0.0)[0]  # Using approxQuantile to get the median

# Define the conditions for outliers
condition = (col("bmi") < 16) | (col("bmi") > 35)

# Replace outliers with the median value
merged_df = merged_df.withColumn("bmi", when(condition, median_bmi).otherwise(col("bmi")))
merged_df.describe("bmi").show()


+-------+------------------+
|summary|               bmi|
+-------+------------------+
|  count|             68056|
|   mean|26.348999059599507|
| stddev| 3.571605400963905|
|    min|              16.0|
|    max|              35.0|
+-------+------------------+



In [27]:
from pyspark.sql.functions import col, round

# Convert 'age' from days to years and round it to 1 decimal place
merged_df = merged_df.withColumn("age", round(col("age") / 365.0, 1))
merged_df.show()


+---+-----+----+------+------+------+-----+-----+-----------+----+-----+----+------+------+-----+
| id|index| age|gender|height|weight|ap_hi|ap_lo|cholesterol|gluc|smoke|alco|active|cardio|  bmi|
+---+-----+----+------+------+------+-----+-----+-----------+----+-----+----+------+------+-----+
|  0|    0|50.4|     2|   168|  62.0|  110|   80|          1|   1|    0|   0|     1|     0|21.97|
|  1|    1|55.4|     1|   156|  85.0|  140|   90|          3|   1|    0|   0|     1|     1|34.93|
|  2|    2|51.7|     1|   165|  64.0|  130|   70|          3|   1|    0|   0|     0|     1|23.51|
|  3|    3|48.3|     2|   169|  82.0|  150|  100|          1|   1|    0|   0|     1|     1|28.71|
|  4|    4|47.9|     1|   156|  56.0|  100|   60|          1|   1|    0|   0|     0|     0|23.01|
|  8|    5|60.0|     1|   151|  67.0|  120|   80|          2|   2|    0|   0|     0|     0|29.38|
|  9|    6|60.6|     1|   157|  93.0|  130|   80|          3|   1|    0|   0|     1|     0| 26.3|
| 12|    7|61.9|    

[Row(avg(Sales)=400.5)]

Row(avg(Sales)=400.5)

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



Great work! At this stage, we're pretty much done with understanding DataFrames. You can now move on to applying an algorithm. We recommend going through linear regression, then logistic regression and finishing off with tree methods. It's best to start with the documentation example before moving to the advanced example. 