# Handling Missing Data in PySpark HW Solutions

In this HW assignment you will be strengthening your skill sets dealing with missing data.
 
**Review:** you have 2 basic options for filling in missing data (you will personally have to make the decision for what is the right approach:

1. Drop the missing data points (including the entire row)
2. Fill them in with some other value.

Let's practice some examples of each of these methods!


#### But first!

Start your Spark session

In [1]:
import findspark
from pyspark.sql import SparkSession

findspark.init()

spark = SparkSession.builder.appName('handling').getOrCreate()

## Read in the dataset for this Notebook

Weather.csv attached to this lecture. 

In [2]:
dataset_path ='dataset/Weather.csv'

data = spark.read.csv(dataset_path,inferSchema=True,header=True)



## About this dataset

**New York City Taxi Trip - Hourly Weather Data**

Here is some detailed weather data for the New York City Taxi Trips.

**Source:** https://www.kaggle.com/meinertsen/new-york-city-taxi-trip-hourly-weather-data

### Print a view of the first several lines of the dataframe to see what our data looks like

In [3]:
data.show()

+-------------------+-----+-----+------+------+----+-----+-----+------+------+-----+--------+----+----+---------+---------+----------+----------+----------+----------+-------+-------+----------------+------------+---+----+----+----+-------+-------+
|    pickup_datetime|tempm|tempi|dewptm|dewpti| hum|wspdm|wspdi|wgustm|wgusti|wdird|   wdire|vism|visi|pressurem|pressurei|windchillm|windchilli|heatindexm|heatindexi|precipm|precipi|           conds|        icon|fog|rain|snow|hail|thunder|tornado|
+-------------------+-----+-----+------+------+----+-----+-----+------+------+-----+--------+----+----+---------+---------+----------+----------+----------+----------+-------+-------+----------------+------------+---+----+----+----+-------+-------+
|2015-12-31 00:15:00|  7.8| 46.0|   6.1|  43.0|89.0|  7.4|  4.6|  NULL|  NULL|   40|      NE| 4.0| 2.5|   1018.2|    30.07|       6.6|      43.9|      NULL|      NULL|    0.5|   0.02|      Light Rain|        rain|  0|   1|   0|   0|      0|      0|
|201

### Print the schema 

So that we can see if we need to make any corrections to the data types.

In [4]:
data.printSchema()

root
 |-- pickup_datetime: timestamp (nullable = true)
 |-- tempm: double (nullable = true)
 |-- tempi: double (nullable = true)
 |-- dewptm: double (nullable = true)
 |-- dewpti: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- wspdm: double (nullable = true)
 |-- wspdi: double (nullable = true)
 |-- wgustm: double (nullable = true)
 |-- wgusti: double (nullable = true)
 |-- wdird: integer (nullable = true)
 |-- wdire: string (nullable = true)
 |-- vism: double (nullable = true)
 |-- visi: double (nullable = true)
 |-- pressurem: double (nullable = true)
 |-- pressurei: double (nullable = true)
 |-- windchillm: double (nullable = true)
 |-- windchilli: double (nullable = true)
 |-- heatindexm: double (nullable = true)
 |-- heatindexi: double (nullable = true)
 |-- precipm: double (nullable = true)
 |-- precipi: double (nullable = true)
 |-- conds: string (nullable = true)
 |-- icon: string (nullable = true)
 |-- fog: integer (nullable = true)
 |-- rain: integer (nullab

## 1. How much missing data are we working with?

Get a count and percentage of each variable in the dataset to answer this question.

In [5]:
from pyspark.sql.functions import *

def null_value_calc(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        if(nullRows > 0):
            temp = k,nullRows,(nullRows/numRows)*100
            null_columns_counts.append(temp)
    return(null_columns_counts)

null_columns_calc_list = null_value_calc(data)
null_df = spark.createDataFrame(null_columns_calc_list, ['Column_Name', 'Null_Values_Count','Null_Value_Percent'])
null_df.show()

+-----------+-----------------+-------------------+
|Column_Name|Null_Values_Count| Null_Value_Percent|
+-----------+-----------------+-------------------+
|      tempm|                5|0.04770537162484496|
|      tempi|                5|0.04770537162484496|
|     dewptm|                5|0.04770537162484496|
|     dewpti|                5|0.04770537162484496|
|        hum|                5|0.04770537162484496|
|      wspdm|              737|  7.031771777502146|
|      wspdi|              737|  7.031771777502146|
|     wgustm|             8605|  82.10094456635817|
|     wgusti|             8605|  82.10094456635817|
|       vism|              245| 2.3375632096174033|
|       visi|              245| 2.3375632096174033|
|  pressurem|              239| 2.2803167636675887|
|  pressurei|              239| 2.2803167636675887|
| windchillm|             7775|  74.18185287663391|
| windchilli|             7775|  74.18185287663391|
| heatindexm|             9644|  92.01412079000096|
| heatindexi

## 2. How many rows contain at least one null value?

We want to know, if we use the df.na option, how many rows will we loose. 

In [6]:
from pyspark.sql import functions as F

# Assuming your DataFrame is named 'df'
filtered_df = data.filter(F.col("pickup_datetime").isNull() | F.col("tempm").isNull() |
                          F.col("tempi").isNull() | F.col("dewptm").isNull() | F.col("dewpti").isNull() |
                          F.col("hum").isNull() | F.col("wspdm").isNull() | F.col("wspdi").isNull() |
                          F.col("wgustm").isNull() | F.col("wgusti").isNull() | F.col("wdird").isNull() | 
                          F.col("wdire").isNull() | F.col("vism").isNull() | F.col("visi").isNull() | 
                          F.col("pressurem").isNull() | F.col("pressurei").isNull() | 
                          F.col("windchillm").isNull() | F.col("windchilli").isNull() | 
                          F.col("heatindexm").isNull() | F.col("heatindexi").isNull() | 
                          F.col("precipm").isNull() | F.col("precipi").isNull() | F.col("conds").isNull() | 
                          F.col("icon").isNull() | F.col("fog").isNull() | F.col("rain").isNull() | F.col("snow").isNull() | 
                          F.col("hail").isNull() | F.col("thunder").isNull() | F.col("tornado").isNull())


# Assuming your DataFrame is named 'df'
#filtered_df = data.filter([data[c].isNull() for c in data.columns])

# Count the number of rows in the filtered DataFrame
num_rows_with_nulls = filtered_df.count()

# Print the result
print(f"Number of rows with at least one null value: {num_rows_with_nulls}")


Number of rows with at least one null value: 10481


## 3. Drop the missing data

Drop any row that contains missing data across the whole dataset

In [7]:
drop_missing_data = data.na.drop()
drop_missing_data.show()

+---------------+-----+-----+------+------+---+-----+-----+------+------+-----+-----+----+----+---------+---------+----------+----------+----------+----------+-------+-------+-----+----+---+----+----+----+-------+-------+
|pickup_datetime|tempm|tempi|dewptm|dewpti|hum|wspdm|wspdi|wgustm|wgusti|wdird|wdire|vism|visi|pressurem|pressurei|windchillm|windchilli|heatindexm|heatindexi|precipm|precipi|conds|icon|fog|rain|snow|hail|thunder|tornado|
+---------------+-----+-----+------+------+---+-----+-----+------+------+-----+-----+----+----+---------+---------+----------+----------+----------+----------+-------+-------+-----+----+---+----+----+----+-------+-------+
+---------------+-----+-----+------+------+---+-----+-----+------+------+-----+-----+----+----+---------+---------+----------+----------+----------+----------+-------+-------+-----+----+---+----+----+----+-------+-------+



## 4. Drop with a threshold

Count how many rows would be dropped if we only dropped rows that had a least 12 NON-Null values

In [8]:
og_len = data.count()
drop_len = data.na.drop(thresh=12).count()
print("Total Rows Dropped:",og_len-drop_len)

Total Rows Dropped: 5


## 5. Drop rows according to specific column value

Now count how many rows would be dropped if you only drop rows whose values in the tempm column are null/NaN

In [9]:
from pyspark.sql import functions as F

# Assuming your DataFrame is named 'df'
filtered_df = data.filter(F.col("tempm").isNotNull())  # Keep rows with non-null tempm

# Count the number of rows in the original DataFrame
total_rows = data.count()

# Count the number of rows remaining after filtering
remaining_rows = filtered_df.count()

# Calculate the number of rows that would be dropped
rows_to_drop = total_rows - remaining_rows

# Print the result
print(f"Number of rows to be dropped: {rows_to_drop}")


Number of rows to be dropped: 5


## 6. Drop rows that are null accross all columns

Count how many rows would be dropped if you only dropped rows where ALL the values are null

In [10]:
from pyspark.sql import functions as F

# Assuming your DataFrame is named 'df'
# Filter rows where none of the columns are null
all_columns_not_null = data.where(~F.col("pickup_datetime").isNull())

for col in data.columns:
  all_columns_not_null = all_columns_not_null.where(~F.col(col).isNull())

# Count the number of rows that would be dropped (total rows - rows with non-null values)
num_dropped_rows = data.count() - all_columns_not_null.count()

# Print the number of dropped rows
print(f"Number of rows dropped if only rows with all null values are removed: {num_dropped_rows}")


Number of rows dropped if only rows with all null values are removed: 10481


## 7. Fill in all the string columns missing values with the word "N/A"

Make sure you don't edit the datadataframe itself. Create a copy of the datathen edit that one.

In [13]:
from pyspark.sql import functions as F

# Assuming your DataFrame is named 'df'
# Select only string columns (filter data types)
#string_cols = data.select(data.filter(F.col("dataType").cast("string").contains("string")))

# Impute missing values in string columns with 'N/A'
imputed_df = data.fillna('N/A')

# Create a new DataFrame with all columns by joining the imputed string columns with the original DataFrame
#all_cols_df = data.join(imputed_df, how='left')

imputed_df.show()
# Print the original and imputed DataFrames (optional)
# print("Original DataFrame:\n", df.show())
# print("Imputed DataFrame:\n", all_cols_df.show())


+-------------------+-----+-----+------+------+----+-----+-----+------+------+-----+--------+----+----+---------+---------+----------+----------+----------+----------+-------+-------+----------------+------------+---+----+----+----+-------+-------+
|    pickup_datetime|tempm|tempi|dewptm|dewpti| hum|wspdm|wspdi|wgustm|wgusti|wdird|   wdire|vism|visi|pressurem|pressurei|windchillm|windchilli|heatindexm|heatindexi|precipm|precipi|           conds|        icon|fog|rain|snow|hail|thunder|tornado|
+-------------------+-----+-----+------+------+----+-----+-----+------+------+-----+--------+----+----+---------+---------+----------+----------+----------+----------+-------+-------+----------------+------------+---+----+----+----+-------+-------+
|2015-12-31 00:15:00|  7.8| 46.0|   6.1|  43.0|89.0|  7.4|  4.6|  NULL|  NULL|   40|      NE| 4.0| 2.5|   1018.2|    30.07|       6.6|      43.9|      NULL|      NULL|    0.5|   0.02|      Light Rain|        rain|  0|   1|   0|   0|      0|      0|
|201

## 8. Fill in NaN values with averages for the tempm and tempi columns

*Note: you will first need to compute the averages for each column and then fill in with the corresponding value.*

In [14]:
from pyspark.sql import functions as F

# Assuming your DataFrame is named 'df'
# Calculate the average temperature for tempm and tempi columns
avg_temp_m = data.agg(F.avg("tempm").alias("avg_tempm_m"))
avg_temp_i = data.agg(F.avg("tempi").alias("avg_tempi_i"))

# Fill NaN values in tempm and tempi with the respective averages
filled_df = data.withColumn("tempm", F.coalesce(F.col("tempm"), avg_temp_m.select("avg_tempm_m").first()[0])) \
             .withColumn("tempi", F.coalesce(F.col("tempi"), avg_temp_i.select("avg_tempi_i").first()[0]))

# Print the original and filled DataFrames (optional)
# print("Original DataFrame:\n", df.show())
print("Filled DataFrame:\n", filled_df.show())


TypeError: Invalid argument, not a string or column: 13.705622374952132 of type <class 'float'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

### That's it! Great Job!