# Missing Data

In [1]:
# find and load PySpark
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark

In [2]:
# import a spark session
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('missing_data').getOrCreate()

In [4]:
# load the dataframe
df =  spark.read.csv('ContainsNull.csv', header=True, inferSchema=True)
# print the schema
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [5]:
# show the dataframe
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [6]:
# DROPPIN THE MISSING DATA
df.na.drop().show()    # drops all the row having any missing data

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [9]:
# we can setup THRESHOLD to keep rows having less than two missing values
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [11]:
# WE CAN ALSO USE THE PARAMERTER "how"
df.na.drop(how='any').show() # drops if there is any null values

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [12]:
df.na.drop(how='all').show()  # drops rows if there are all the values that are missing

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [15]:
# we can also see only a column using SUBSET
df.na.drop(subset=['Sales']).show()  # checks only the sales column

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



### Substituting missing values

In [19]:
# fill value takes the datatype and fills the matching column having same datatype
df.na.fill('String_Data').show() # columns with datatype int/double/float will not be filled

+----+-----------+-----+
|  Id|       Name|Sales|
+----+-----------+-----+
|emp1|       John| null|
|emp2|String_Data| null|
|emp3|String_Data|345.0|
|emp4|      Cindy|456.0|
+----+-----------+-----+



In [21]:
df.na.fill(0).show() # columns with datatype string will not be filled

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [23]:
# targetting the columns to fill
# It is always recommended to use the column names as subset while filling data
df.na.fill('No_Name', subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No_Name| null|
|emp3|No_Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [25]:
from pyspark.sql.functions import mean

In [32]:
# lets fill the missing sales with mean value

# get the mean value
mean_value = df.select(mean(df['Sales'])).collect()
mean_value

[Row(avg(Sales)=400.5)]

In [35]:
# get the number from the row object
mean_sales = mean_value[0][0]

In [37]:
# fill the missing values with mean sales
df.na.fill(mean_sales, subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



---

## Date and Timestamps

In [38]:
# creating a session for data and timestamps
spark = SparkSession.builder.appName('date_time').getOrCreate()

In [39]:
# load the stock data set
df =  spark.read.csv('appl_stock.csv', header=True, inferSchema=True)
# print the schema
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [40]:
df.head(1)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)]

In [42]:
# check the date column
df.select(['Date', 'Volume']).show()

+--------------------+---------+
|                Date|   Volume|
+--------------------+---------+
|2010-01-04 00:00:...|123432400|
|2010-01-05 00:00:...|150476200|
|2010-01-06 00:00:...|138040000|
|2010-01-07 00:00:...|119282800|
|2010-01-08 00:00:...|111902700|
|2010-01-11 00:00:...|115557400|
|2010-01-12 00:00:...|148614900|
|2010-01-13 00:00:...|151473000|
|2010-01-14 00:00:...|108223500|
|2010-01-15 00:00:...|148516900|
|2010-01-19 00:00:...|182501900|
|2010-01-20 00:00:...|153038200|
|2010-01-21 00:00:...|152038600|
|2010-01-22 00:00:...|220441900|
|2010-01-25 00:00:...|266424900|
|2010-01-26 00:00:...|466777500|
|2010-01-27 00:00:...|430642100|
|2010-01-28 00:00:...|293375600|
|2010-01-29 00:00:...|311488100|
|2010-02-01 00:00:...|187469100|
+--------------------+---------+
only showing top 20 rows



In [43]:
# importing some important date functions
from pyspark.sql.functions import (dayofmonth, dayofyear, 
                                   year, month, hour, weekofyear, 
                                   format_number, date_format)

In [46]:
# show only the date
df.select(dayofmonth(df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [50]:
# using multiple function create diff cols
df.select(dayofmonth(df['Date']), year(df['Date'])).show()

+----------------+----------+
|dayofmonth(Date)|year(Date)|
+----------------+----------+
|               4|      2010|
|               5|      2010|
|               6|      2010|
|               7|      2010|
|               8|      2010|
|              11|      2010|
|              12|      2010|
|              13|      2010|
|              14|      2010|
|              15|      2010|
|              19|      2010|
|              20|      2010|
|              21|      2010|
|              22|      2010|
|              25|      2010|
|              26|      2010|
|              27|      2010|
|              28|      2010|
|              29|      2010|
|               1|      2010|
+----------------+----------+
only showing top 20 rows



In [58]:
# calculate average closing price per year

# creating a new column year using WITHCOLUMN
df_year = df.withColumn('Year', year(df['Date']))
df_year.head(1)[0]

Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039, Year=2010)

In [71]:
# group by and select two columns year and Close
avg_close_per_year = df_year.groupBy('Year').mean().select(['Year', 'avg(Close)'])

In [72]:
# rename the columns
avg_close_per_year = avg_close_per_year.withColumnRenamed("avg(Close)", "Average Closing Price")

In [74]:
# format the numbers
avg_close_per_year.select(['Year', format_number('Average Closing Price', 2)]).show()

+----+---------------------------------------+
|Year|format_number(Average Closing Price, 2)|
+----+---------------------------------------+
|2015|                                 120.04|
|2013|                                 472.63|
|2014|                                 295.40|
|2012|                                 576.05|
|2016|                                 104.60|
|2010|                                 259.84|
|2011|                                 364.00|
+----+---------------------------------------+



In [77]:
# use alias to rename the column
avg_close_per_year.select(['Year', format_number('Average Closing Price', 2).alias("Avg Close")]).show()

+----+---------+
|Year|Avg Close|
+----+---------+
|2015|   120.04|
|2013|   472.63|
|2014|   295.40|
|2012|   576.05|
|2016|   104.60|
|2010|   259.84|
|2011|   364.00|
+----+---------+

