In [1]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc = SparkContext()
sqlContext = SQLContext(sc)

In [2]:
df = sqlContext.read.load('file:///home/vlad/Documents/github/my_github_repository/coursera/big-data/4-big-data-machine-learning/notebooks/daily_weather.csv', 
                          format='com.databricks.spark.csv', 
                          header='true',inferSchema='true')
df.columns

['number',
 'air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

We have 1095 rows

In [3]:
df.count()

1095

1090 not missing values for air_temp_9am:

In [4]:
df.describe('air_temp_9am').show()

+-------+------------------+
|summary|      air_temp_9am|
+-------+------------------+
|  count|              1090|
|   mean| 64.93300141287072|
| stddev|11.175514003175877|
|    min|36.752000000000685|
|    max| 98.90599999999992|
+-------+------------------+



After deleting missing values from whole dataset : 1064 rows 

In [5]:
removeAllDF = df.na.drop()
removeAllDF.describe('air_temp_9am').show()

+-------+------------------+
|summary|      air_temp_9am|
+-------+------------------+
|  count|              1064|
|   mean| 65.02260949558733|
| stddev|11.168033449415704|
|    min|36.752000000000685|
|    max| 98.90599999999992|
+-------+------------------+



We can see that the mean and standard deviation is close to the original values: mean is 64.933 vs. 65.022, and standard deviation is 11.175 vs. 11.168.

**Impute missing values.** Instead of removing rows containing missing values, let's replace the values with the mean value for that column. First, we'll load the avg function and make a copy of the original DataFrame:

In [6]:
imputeDF = df

Next, we'll iterate through each column in the DataFrame: 
* compute the mean value for that column 
* replace any missing values in that column with the mean.

In [7]:
from pyspark.sql.functions import avg

for x in imputeDF.columns:
    meanValue = removeAllDF.agg(avg(x)).first()[0]
    print(x, meanValue)
    imputeDF = imputeDF.na.fill(meanValue, [x])

('number', 545.0018796992481)
('air_pressure_9am', 918.9031798641051)
('air_temp_9am', 65.02260949558733)
('avg_wind_direction_9am', 142.30675564934037)
('avg_wind_speed_9am', 5.48579305071369)
('max_wind_direction_9am', 148.48042413321315)
('max_wind_speed_9am', 6.999713658875691)
('rain_accumulation_9am', 0.18202347650615522)
('rain_duration_9am', 266.3936973996037)
('relative_humidity_9am', 34.07743985327709)
('relative_humidity_3pm', 35.14838093290533)


* The agg() function performs an aggregate calculation on the DataFrame and avg(x) specifies to compute the mean on column x. The agg() function returns a DataFrame, first() returns the first Row, and [0] gets the first value.

* The last line of code uses na.fill() to replace the missing values with the mean value (first argument) in column x (second argument).

* The output of executing this cell prints the mean values for each column and we can see the mean value for air_temp_9am is the same as the mean when we removed all the missing values in step 4, i.e., 65.022.

In [8]:
imputeDF.describe('air_temp_9am').show()

+-------+------------------+
|summary|      air_temp_9am|
+-------+------------------+
|  count|              1095|
|   mean| 64.93341058219825|
| stddev|11.149948199920228|
|    min|36.752000000000685|
|    max| 98.90599999999992|
+-------+------------------+



In [9]:
df.describe('air_temp_9am').show()

+-------+------------------+
|summary|      air_temp_9am|
+-------+------------------+
|  count|              1090|
|   mean| 64.93300141287072|
| stddev|11.175514003175877|
|    min|36.752000000000685|
|    max| 98.90599999999992|
+-------+------------------+



**1. If we remove all missing values from the data, how many air pressure at 9am measurements have values between 911.736 and 914.67?**

In [10]:
removeAllDF.filter(df.air_pressure_9am > 911.736).filter(df.air_pressure_9am < 914.67).count()

77

**2. If we impute the missing values with the minimum value, how many air temperature at 9am measurements are less than 42.292?**

In [11]:
from pyspark.sql.functions import min as spark_min
imputeMin = df
for x in imputeMin.columns:
    meanValue = removeAllDF.agg(spark_min(x)).first()[0]
    imputeMin = imputeMin.na.fill(meanValue, [x])
    
print imputeMin.filter(imputeMin.air_temp_9am < 42.292).count()

28


**3. How many samples have missing values for air_pressure_9am?**

In [12]:
df.count() - df.na.drop(subset=['air_pressure_9am']).count()

3

**4. Which column in the weather dataset has the most number of missing values?**

In [13]:
print df.describe().toPandas().transpose()[0]
print '\nMinimum value :', min(df.describe().toPandas().transpose()[0])

summary                   count
number                     1095
air_pressure_9am           1092
air_temp_9am               1090
avg_wind_direction_9am     1091
avg_wind_speed_9am         1092
max_wind_direction_9am     1092
max_wind_speed_9am         1091
rain_accumulation_9am      1089
rain_duration_9am          1092
relative_humidity_9am      1095
relative_humidity_3pm      1095
Name: 0, dtype: object

Minimum value : 1089


**5. When we remove all the missing values from the dataset, the number of rows is 1064, yet the variable with most missing values has 1089 rows. Why did the number of rows decrease so much?**

Because the missing values in each column are not necessarily in the same row