# __Traffic Volume Prediction__
<h2 align="center"><b>Advanced Data Science Capstone Poject by</b></h2>
<h2 align="center"><b>IBM / Coursera</b></h1>
<h2 align=center>Vasilis Kokkinos (September 2019)</h2>

 ## Introduction / Business Problem

USE CASE: Predictive model of traffic volume. It can be used as template for similar situations.

DATA SET: Metro Interstate Traffic Volume Data Set
Hourly Interstate 94 Westbound traffic volume for MN DoT ATR station 301, roughly midway between Minneapolis and St Paul, MN. Hourly weather features and holidays included for impacts on traffic volume.

Source: https://archive.ics.uci.edu/ml/datasets/Metro+Interstate+Traffic+Volume



### __Attribute Information:__

__holiday:__ Categorical US National holidays plus regional holiday, Minnesota State Fair

__temp:__ Numeric Average temp in kelvin

__rain_1h:__ Numeric Amount in mm of rain that occurred in the hour

__snow_1h:__ Numeric Amount in mm of snow that occurred in the hour

__clouds_all:__ Numeric Percentage of cloud cover

__weather_main:__ Categorical Short textual description of the current weather

__weather_description:__ Categorical Longer textual description of the current weather

__date_time:__ DateTime Hour of the data collected in local CST time

__traffic_volume:__ Numeric Hourly I-94 ATR 301 reported westbound traffic volume

# __ETL - Data Cleansing__

Since our data set is only a .csv, no ETL data transformations are needed.

An __Initial Data Exploration__ step was performed in the notebook: _traffic_volume.data_exp.py.v01.ipynb_

The following observations were made:

* There are duplicates in the data set.
* The temperature column has an extreme outlier with value 0. The temperature is measured in Kelvin and absolute 0 is an impossible temperature (Celsius = Kelvin - 273.15)
* The rain_1h column has an extreme outlier with value: 9831.3
    
In this ETL notebook I will fix the above issues. However, some data quality issues will be addressed in the Feature Creation / Transformation step in another notebook.

----------------------------------------------------------------------------

Import necessary packages, initialize Apache Spark session, and add supporting functions

In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import numpy as np

In [2]:
spark = SparkSession.builder.appName('Traffic Volume Prediction').getOrCreate()
spark

Create an sql context so that we can query data sets in sql like syntax

In [3]:
sqlContext = SQLContext(spark)

#### __Define the data set schema__

In [4]:
from pyspark.sql.types import (StructType, StructField, DoubleType,
                               IntegerType, StringType, TimestampType)

traffic_schema = StructType([StructField('holiday', StringType(), True),
                            StructField('temp', DoubleType(), True),
                            StructField('rain_1h', DoubleType(), True ),
                            StructField('snow_1h', DoubleType(), True),
                            StructField('clouds_all', IntegerType(), True),
                            StructField('weather_main', StringType(), True  ),
                            StructField('weather_description', StringType(), True ),
                            StructField('date_time', TimestampType(), True ),
                            StructField('traffic_volume', IntegerType(), True)
                            ])

#### __Read in the data set__

In [5]:
df = spark.read.csv('Metro_Interstate_Traffic_Volume.csv',
                     header = True, 
                     schema = traffic_schema)
df.createOrReplaceTempView('df')
df.printSchema()

root
 |-- holiday: string (nullable = true)
 |-- temp: double (nullable = true)
 |-- rain_1h: double (nullable = true)
 |-- snow_1h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- date_time: timestamp (nullable = true)
 |-- traffic_volume: integer (nullable = true)



## Basic data set checks

In [6]:
df.show(20, truncate=False)

+-------+------+-------+-------+----------+------------+-------------------+-------------------+--------------+
|holiday|temp  |rain_1h|snow_1h|clouds_all|weather_main|weather_description|date_time          |traffic_volume|
+-------+------+-------+-------+----------+------------+-------------------+-------------------+--------------+
|None   |288.28|0.0    |0.0    |40        |Clouds      |scattered clouds   |2012-10-02 09:00:00|5545          |
|None   |289.36|0.0    |0.0    |75        |Clouds      |broken clouds      |2012-10-02 10:00:00|4516          |
|None   |289.58|0.0    |0.0    |90        |Clouds      |overcast clouds    |2012-10-02 11:00:00|4767          |
|None   |290.13|0.0    |0.0    |90        |Clouds      |overcast clouds    |2012-10-02 12:00:00|5026          |
|None   |291.14|0.0    |0.0    |75        |Clouds      |broken clouds      |2012-10-02 13:00:00|4918          |
|None   |291.72|0.0    |0.0    |1         |Clear       |sky is clear       |2012-10-02 14:00:00|5181    

In [7]:
print('Number of entries in the dataframe: {}'.format(df.count()))

Number of entries in the dataframe: 48204


### Check for null values

In [8]:
for column in df.columns:
    nulls = spark.sql('select count(*) from df where ' + column + ' is null').first()[0]
    print('null values in ' + column + ': ' + str(nulls))

null values in holiday: 0
null values in temp: 0
null values in rain_1h: 0
null values in snow_1h: 0
null values in clouds_all: 0
null values in weather_main: 0
null values in weather_description: 0
null values in date_time: 0
null values in traffic_volume: 0


#### Get basic statistics measurements of the initial numerical columns

In [9]:
df.describe(['temp', 'rain_1h', 'snow_1h', 'clouds_all']).show(truncate=False)

+-------+------------------+-------------------+---------------------+-----------------+
|summary|temp              |rain_1h            |snow_1h              |clouds_all       |
+-------+------------------+-------------------+---------------------+-----------------+
|count  |48204             |48204              |48204                |48204            |
|mean   |281.2058703012135 |0.33426396149697535|2.2238818355323212E-4|49.36223135009543|
|stddev |13.338231912676308|44.78913303693933  |0.008167611205361601 |39.01575046141368|
|min    |0.0               |0.0                |0.0                  |0                |
|max    |310.07            |9831.3             |0.51                 |100              |
+-------+------------------+-------------------+---------------------+-----------------+



### __GENERAL COLUMN VALUES CHECK__
On every non-numeric column I will check if the corresponding values. I am looking for values that are different (uppercase / lowercase, et.) but have the same meaning.

The __'holiday'__ column was checked during the initial data exploration step. It was fine.

__'weather_main'__ column

In [10]:
spark.sql('select weather_main, count(*) as count from df group by weather_main').show(50, truncate=False)

+------------+-----+
|weather_main|count|
+------------+-----+
|Thunderstorm|1034 |
|Drizzle     |1821 |
|Fog         |912  |
|Clear       |13391|
|Smoke       |20   |
|Squall      |4    |
|Mist        |5950 |
|Clouds      |15164|
|Rain        |5672 |
|Snow        |2876 |
|Haze        |1360 |
+------------+-----+



__'weather_description'__ column

In [11]:
spark.sql('select weather_main, weather_description, count(*) as count from df group by weather_main, \
           weather_description order by weather_main').show(50, truncate=False)

+------------+-----------------------------------+-----+
|weather_main|weather_description                |count|
+------------+-----------------------------------+-----+
|Clear       |sky is clear                       |11665|
|Clear       |Sky is Clear                       |1726 |
|Clouds      |few clouds                         |1956 |
|Clouds      |scattered clouds                   |3461 |
|Clouds      |broken clouds                      |4666 |
|Clouds      |overcast clouds                    |5081 |
|Drizzle     |heavy intensity drizzle            |64   |
|Drizzle     |shower drizzle                     |6    |
|Drizzle     |light intensity drizzle            |1100 |
|Drizzle     |drizzle                            |651  |
|Fog         |fog                                |912  |
|Haze        |haze                               |1360 |
|Mist        |mist                               |5950 |
|Rain        |proximity shower rain              |136  |
|Rain        |light rain       

There are 38 values for __'weather_description'__. One obvious value to be corrected is the __'Sky is clear'__ It will be set to __'sky is clear'__.
I am not an expert in meteorology, so I will leave the other values as is

In [12]:
df = df.withColumn('weather_description', F.when(df['weather_description']=='Sky is Clear', 'sky is clear').\
                   otherwise(df["weather_description"]))

# Refreshing the temp view of the dataframe
df.createOrReplaceTempView('df')

Let's make sure it was corrected.

In [13]:
spark.sql('select weather_main, weather_description, count(*) as count from df group by weather_main, weather_description order by weather_main').show(50, truncate=False)

+------------+-----------------------------------+-----+
|weather_main|weather_description                |count|
+------------+-----------------------------------+-----+
|Clear       |sky is clear                       |13391|
|Clouds      |broken clouds                      |4666 |
|Clouds      |few clouds                         |1956 |
|Clouds      |scattered clouds                   |3461 |
|Clouds      |overcast clouds                    |5081 |
|Drizzle     |shower drizzle                     |6    |
|Drizzle     |drizzle                            |651  |
|Drizzle     |heavy intensity drizzle            |64   |
|Drizzle     |light intensity drizzle            |1100 |
|Fog         |fog                                |912  |
|Haze        |haze                               |1360 |
|Mist        |mist                               |5950 |
|Rain        |light rain                         |3372 |
|Rain        |very heavy rain                    |18   |
|Rain        |moderate rain    

### __OUTLIERS__

First I will deal with the outliers that were detected in the initial Data Exploration phase

#### __temp__ outliers
Check the row where the value in __'temp' is 0__

In [14]:
spark.sql('select * from df where temp = 0').show(truncate=False)

+-------+----+-------+-------+----------+------------+-------------------+-------------------+--------------+
|holiday|temp|rain_1h|snow_1h|clouds_all|weather_main|weather_description|date_time          |traffic_volume|
+-------+----+-------+-------+----------+------------+-------------------+-------------------+--------------+
|None   |0.0 |0.0    |0.0    |0         |Clear       |sky is clear       |2014-01-31 03:00:00|361           |
|None   |0.0 |0.0    |0.0    |0         |Clear       |sky is clear       |2014-01-31 04:00:00|734           |
|None   |0.0 |0.0    |0.0    |0         |Clear       |sky is clear       |2014-01-31 05:00:00|2557          |
|None   |0.0 |0.0    |0.0    |0         |Clear       |sky is clear       |2014-01-31 06:00:00|5150          |
|None   |0.0 |0.0    |0.0    |0         |Clear       |sky is clear       |2014-02-02 03:00:00|291           |
|None   |0.0 |0.0    |0.0    |0         |Clear       |sky is clear       |2014-02-02 04:00:00|284           |
|None   |0

Let's see if we have duplicate rows for these date_time values

In [15]:
spark.sql('select * from df where date_time in (select date_time from df where temp = 0) order by date_time').show(truncate=False)

+-------+----+-------+-------+----------+------------+-------------------+-------------------+--------------+
|holiday|temp|rain_1h|snow_1h|clouds_all|weather_main|weather_description|date_time          |traffic_volume|
+-------+----+-------+-------+----------+------------+-------------------+-------------------+--------------+
|None   |0.0 |0.0    |0.0    |0         |Clear       |sky is clear       |2014-01-31 03:00:00|361           |
|None   |0.0 |0.0    |0.0    |0         |Clear       |sky is clear       |2014-01-31 04:00:00|734           |
|None   |0.0 |0.0    |0.0    |0         |Clear       |sky is clear       |2014-01-31 05:00:00|2557          |
|None   |0.0 |0.0    |0.0    |0         |Clear       |sky is clear       |2014-01-31 06:00:00|5150          |
|None   |0.0 |0.0    |0.0    |0         |Clear       |sky is clear       |2014-02-02 03:00:00|291           |
|None   |0.0 |0.0    |0.0    |0         |Clear       |sky is clear       |2014-02-02 04:00:00|284           |
|None   |0

So it is not a case of duplicates. Probably something was wrong with the thermometer at these times? Maybe we could check the weather conditions __around__ these date_time values?

In [16]:
date_time_str = '(\'2014-01-31 02:00:00\', \'2014-01-31 07:00:00\', \'2014-02-02 02:00:00\', \'2014-02-02 09:00:00\')'
spark.sql('select * from df where date_time in ' + date_time_str).show(truncate=False)

+-------+------+-------+-------+----------+------------+-------------------+-------------------+--------------+
|holiday|temp  |rain_1h|snow_1h|clouds_all|weather_main|weather_description|date_time          |traffic_volume|
+-------+------+-------+-------+----------+------------+-------------------+-------------------+--------------+
|None   |255.93|0.0    |0.0    |0         |Clear       |sky is clear       |2014-01-31 02:00:00|313           |
|None   |255.93|0.0    |0.0    |0         |Clear       |sky is clear       |2014-01-31 07:00:00|5363          |
|None   |255.37|0.0    |0.0    |0         |Clear       |sky is clear       |2014-02-02 02:00:00|539           |
|None   |255.62|0.0    |0.0    |0         |Clear       |sky is clear       |2014-02-02 09:00:00|2506          |
+-------+------+-------+-------+----------+------------+-------------------+-------------------+--------------+



Actually these values make sense, so:
* for the 2014-01-31 outliers I will assign the temp value 255.93
* for the 2014-02-02 outliers I will assign the temp value 255.37

It is a calculated guess, and most possibly the new values are near the real ones of these dates and times

In [17]:
from pyspark.sql import functions as F

df = df.withColumn('temp', F.when(df['date_time']=='2014-01-31 03:00:00', 255.93).otherwise(df["temp"]))
df = df.withColumn('temp', F.when(df['date_time']=='2014-01-31 04:00:00', 255.93).otherwise(df["temp"]))
df = df.withColumn('temp', F.when(df['date_time']=='2014-01-31 05:00:00', 255.93).otherwise(df["temp"]))
df = df.withColumn('temp', F.when(df['date_time']=='2014-01-31 06:00:00', 255.93).otherwise(df["temp"]))
df = df.withColumn('temp', F.when(df['date_time']=='2014-02-02 03:00:00', 255.37).otherwise(df["temp"]))
df = df.withColumn('temp', F.when(df['date_time']=='2014-02-02 04:00:00', 255.37).otherwise(df["temp"]))
df = df.withColumn('temp', F.when(df['date_time']=='2014-02-02 05:00:00', 255.37).otherwise(df["temp"]))
df = df.withColumn('temp', F.when(df['date_time']=='2014-02-02 06:00:00', 255.37).otherwise(df["temp"]))
df = df.withColumn('temp', F.when(df['date_time']=='2014-02-02 07:00:00', 255.37).otherwise(df["temp"]))
df = df.withColumn('temp', F.when(df['date_time']=='2014-02-02 08:00:00', 255.37).otherwise(df["temp"]))

# Refreshing the temp view of the dataframe
df.createOrReplaceTempView('df')

# And let's check the new values
date_time_str = '(\'2014-01-31 02:00:00\', \'2014-01-31 04:00:00\', \'2014-01-31 05:00:00\', \
                  \'2014-01-31 06:00:00\', \'2014-02-02 03:00:00\', \'2014-02-02 04:00:00\',  \
                  \'2014-02-02 05:00:00\', \'2014-02-02 06:00:00\', \'2014-02-02 06:00:00\', \
                  \'2014-02-02 07:00:00\', \'2014-02-02 08:00:00\')'
spark.sql('select * from df where date_time in ' + date_time_str).show(truncate=False)

+-------+------+-------+-------+----------+------------+-------------------+-------------------+--------------+
|holiday|temp  |rain_1h|snow_1h|clouds_all|weather_main|weather_description|date_time          |traffic_volume|
+-------+------+-------+-------+----------+------------+-------------------+-------------------+--------------+
|None   |255.93|0.0    |0.0    |0         |Clear       |sky is clear       |2014-01-31 02:00:00|313           |
|None   |255.93|0.0    |0.0    |0         |Clear       |sky is clear       |2014-01-31 04:00:00|734           |
|None   |255.93|0.0    |0.0    |0         |Clear       |sky is clear       |2014-01-31 05:00:00|2557          |
|None   |255.93|0.0    |0.0    |0         |Clear       |sky is clear       |2014-01-31 06:00:00|5150          |
|None   |255.37|0.0    |0.0    |0         |Clear       |sky is clear       |2014-02-02 03:00:00|291           |
|None   |255.37|0.0    |0.0    |0         |Clear       |sky is clear       |2014-02-02 04:00:00|284     

#### __rain_1h__ outliers
Check the row where the value in __'rain_1h' is 9831.3__

In [18]:
spark.sql('select * from df where rain_1h = 9831.3').show(truncate=False)

+-------+------+-------+-------+----------+------------+-------------------+-------------------+--------------+
|holiday|temp  |rain_1h|snow_1h|clouds_all|weather_main|weather_description|date_time          |traffic_volume|
+-------+------+-------+-------+----------+------------+-------------------+-------------------+--------------+
|None   |302.11|9831.3 |0.0    |75        |Rain        |very heavy rain    |2016-07-11 17:00:00|5535          |
+-------+------+-------+-------+----------+------------+-------------------+-------------------+--------------+



Again we could check the weather conditions __around__ these date_time value

In [19]:
date_time_str = '(\'2016-07-11 16:00:00\', \'2016-07-11 18:00:00\')'
spark.sql('select * from df where date_time in ' + date_time_str).show(truncate=False)

+-------+------+-------+-------+----------+------------+----------------------+-------------------+--------------+
|holiday|temp  |rain_1h|snow_1h|clouds_all|weather_main|weather_description   |date_time          |traffic_volume|
+-------+------+-------+-------+----------+------------+----------------------+-------------------+--------------+
|None   |301.48|0.0    |0.0    |75        |Thunderstorm|proximity thunderstorm|2016-07-11 16:00:00|5934          |
|None   |302.54|0.0    |0.0    |75        |Thunderstorm|proximity thunderstorm|2016-07-11 18:00:00|3900          |
+-------+------+-------+-------+----------+------------+----------------------+-------------------+--------------+



This time I will __delete__ the outlier row. I don't have any indication of what the amount of rain was in that hour

In [20]:
df = df.filter(df['rain_1h'] < 9831.3)
# Refreshing the temp view of the dataframe
df.createOrReplaceTempView('df')
date_time_str = '(\'2016-07-11 17:00:00\')'
spark.sql('select * from df where date_time in ' + date_time_str).show(truncate=False)

+-------+----+-------+-------+----------+------------+-------------------+---------+--------------+
|holiday|temp|rain_1h|snow_1h|clouds_all|weather_main|weather_description|date_time|traffic_volume|
+-------+----+-------+-------+----------+------------+-------------------+---------+--------------+
+-------+----+-------+-------+----------+------------+-------------------+---------+--------------+



### __Duplicate Rows Deletion__

Since the __'date_time'__ values should be unique in the data set I will base the check for duplicates on that column. 

In [21]:
spark.sql('select date_time, count(*) as count from df group by date_time having count > 1 order by count desc').show(20, truncate=False)

+-------------------+-----+
|date_time          |count|
+-------------------+-----+
|2013-04-18 22:00:00|6    |
|2013-05-19 10:00:00|6    |
|2016-03-27 04:00:00|6    |
|2013-06-01 02:00:00|5    |
|2018-04-13 21:00:00|5    |
|2012-12-16 19:00:00|5    |
|2013-04-11 01:00:00|5    |
|2016-12-25 21:00:00|5    |
|2012-10-25 15:00:00|5    |
|2013-05-19 09:00:00|5    |
|2012-10-26 04:00:00|5    |
|2017-04-15 07:00:00|5    |
|2013-12-03 13:00:00|5    |
|2013-05-31 00:00:00|5    |
|2013-05-20 17:00:00|5    |
|2013-04-22 19:00:00|5    |
|2018-04-14 09:00:00|5    |
|2017-11-05 01:00:00|5    |
|2013-04-18 23:00:00|5    |
|2013-05-19 08:00:00|5    |
+-------------------+-----+
only showing top 20 rows



To start with, the rows where all the columns have duplicate values will be removed.

In [22]:
df = df.dropDuplicates()
print('Number of initial distinct entries in the dataframe : {}'.format(df.count()))

# Refresh the view of the dataset
df.createOrReplaceTempView('df')

Number of initial distinct entries in the dataframe : 48186


Let's check again

In [23]:
spark.sql('select date_time, count(*) as count from df group by date_time having count > 1 order by count desc').show(20, truncate=False)

+-------------------+-----+
|date_time          |count|
+-------------------+-----+
|2013-04-18 22:00:00|6    |
|2013-05-19 10:00:00|6    |
|2016-03-27 04:00:00|6    |
|2013-06-01 02:00:00|5    |
|2013-05-20 17:00:00|5    |
|2012-12-16 19:00:00|5    |
|2013-12-03 13:00:00|5    |
|2018-04-13 21:00:00|5    |
|2013-04-22 19:00:00|5    |
|2018-09-20 18:00:00|5    |
|2013-05-19 08:00:00|5    |
|2013-04-11 01:00:00|5    |
|2013-05-31 00:00:00|5    |
|2016-12-25 21:00:00|5    |
|2012-10-25 15:00:00|5    |
|2018-04-14 09:00:00|5    |
|2016-12-25 02:00:00|5    |
|2013-04-18 23:00:00|5    |
|2012-10-26 04:00:00|5    |
|2017-04-15 07:00:00|5    |
+-------------------+-----+
only showing top 20 rows



Let's examine a couple of duplicate examples. Maybe we can identify a caterory of duplicates

In [24]:
spark.sql('select * from df where date_time in (\'2013-04-18 22:00:00\', \'2013-05-19 10:00:00\') order by date_time').show(truncate=False)

+-------+------+-------+-------+----------+------------+----------------------------+-------------------+--------------+
|holiday|temp  |rain_1h|snow_1h|clouds_all|weather_main|weather_description         |date_time          |traffic_volume|
+-------+------+-------+-------+----------+------------+----------------------------+-------------------+--------------+
|None   |274.79|0.0    |0.0    |90        |Rain        |moderate rain               |2013-04-18 22:00:00|1532          |
|None   |274.79|0.0    |0.0    |90        |Snow        |heavy snow                  |2013-04-18 22:00:00|1532          |
|None   |274.79|0.0    |0.0    |90        |Mist        |mist                        |2013-04-18 22:00:00|1532          |
|None   |274.79|0.0    |0.0    |90        |Snow        |snow                        |2013-04-18 22:00:00|1532          |
|None   |274.79|0.0    |0.0    |90        |Drizzle     |light intensity drizzle     |2013-04-18 22:00:00|1532          |
|None   |274.79|0.0    |0.0    |

Maybe the colums weather_main and weather_description are responsible for the duplicates

Let's try and group by those columns too.

In [25]:
spark.sql('select date_time, weather_main, weather_description, count(*) as count \
          from df group by date_time, weather_main, weather_description having count > 1 \
          order by date_time').show(20, truncate=False)

+-------------------+------------+-------------------+-----+
|date_time          |weather_main|weather_description|count|
+-------------------+------------+-------------------+-----+
|2012-11-04 01:00:00|Clouds      |overcast clouds    |2    |
|2014-01-03 20:00:00|Clouds      |overcast clouds    |2    |
|2014-01-16 16:00:00|Clouds      |broken clouds      |2    |
|2014-01-19 16:00:00|Rain        |light rain         |2    |
|2014-01-20 16:00:00|Clouds      |broken clouds      |2    |
|2014-02-25 02:00:00|Clear       |sky is clear       |2    |
|2014-02-25 03:00:00|Clear       |sky is clear       |2    |
|2014-04-21 15:00:00|Clouds      |few clouds         |2    |
|2014-05-10 04:00:00|Clear       |sky is clear       |2    |
|2014-05-31 18:00:00|Rain        |light rain         |2    |
|2014-06-01 05:00:00|Rain        |moderate rain      |2    |
|2014-06-01 05:00:00|Thunderstorm|thunderstorm       |2    |
|2014-07-06 07:00:00|Rain        |moderate rain      |2    |
|2014-07-06 07:00:00|Mis

Let's examine a couple of examples. Maybe we can identify a caterory of duplicates

In [26]:
spark.sql('select * from df where date_time in (\'2016-12-07 07:00:00\', \'2014-01-03 20:00:00\') order by date_time').show(truncate=False)

+-------+------+-------+-------+----------+------------+-------------------+-------------------+--------------+
|holiday|temp  |rain_1h|snow_1h|clouds_all|weather_main|weather_description|date_time          |traffic_volume|
+-------+------+-------+-------+----------+------------+-------------------+-------------------+--------------+
|None   |267.97|0.0    |0.0    |90        |Clouds      |overcast clouds    |2014-01-03 20:00:00|2427          |
|None   |268.88|0.0    |0.0    |90        |Clouds      |overcast clouds    |2014-01-03 20:00:00|2427          |
|None   |265.22|0.0    |0.0    |76        |Snow        |light snow         |2016-12-07 07:00:00|5551          |
|None   |265.22|0.0    |0.0    |90        |Snow        |light snow         |2016-12-07 07:00:00|5551          |
+-------+------+-------+-------+----------+------------+-------------------+-------------------+--------------+



It is obvious that a lot of columns may contribute to duplicate date times.

I tried to only keep the first row for each 'date_time' that appears multple times in the data set, but it was extremely slow. In fact I had to interrupt the kernel before the operation finished.

So I will remove all the duplicates from the data set.

In [27]:
df = df.dropDuplicates(['date_time'])

# Refreshing the temp view of the dataframe
df.createOrReplaceTempView('df')
df.count()

40569

Let's check that the duplicates were actually removed.

In [28]:
spark.sql('select date_time, count(*) as count from df group by date_time having count > 1 order by count desc').show(50, truncate=False)

+---------+-----+
|date_time|count|
+---------+-----+
+---------+-----+



#### Finally, let's get count of rows per year. We will also see the years we are talking about

In [29]:
spark.sql('select year(date_time) as year, count(*) from df group by year order by year').show()

+----+--------+
|year|count(1)|
+----+--------+
|2012|    2103|
|2013|    7293|
|2014|    4500|
|2015|    3593|
|2016|    7836|
|2017|    8712|
|2018|    6532|
+----+--------+



We expect to have about __8760__ (24 * 365) data points for each year.

However there are a lot of missing values!!! 

Since there is no way to get the measurements needed, I will work with the available data.

The missing data could affect the quality of the machine learning models.

Also check that each holiday only appears once each year.

In [30]:
spark.sql('select year(date_time) as year, holiday, count(*) from df group by year, holiday order by year, holiday').show(100)

+----+--------------------+--------+
|year|             holiday|count(1)|
+----+--------------------+--------+
|2012|       Christmas Day|       1|
|2012|        Columbus Day|       1|
|2012|                None|    2099|
|2012|    Thanksgiving Day|       1|
|2012|        Veterans Day|       1|
|2013|       Christmas Day|       1|
|2013|        Columbus Day|       1|
|2013|    Independence Day|       1|
|2013|           Labor Day|       1|
|2013|        Memorial Day|       1|
|2013|       New Years Day|       1|
|2013|                None|    7283|
|2013|          State Fair|       1|
|2013|    Thanksgiving Day|       1|
|2013|        Veterans Day|       1|
|2013|Washingtons Birthday|       1|
|2014|Martin Luther Kin...|       1|
|2014|        Memorial Day|       1|
|2014|       New Years Day|       1|
|2014|                None|    4496|
|2014|Washingtons Birthday|       1|
|2015|       Christmas Day|       1|
|2015|        Columbus Day|       1|
|2015|    Independence Day|       1|
|

#### Let's again get basic statistics measurements of the initial numerical columns again

In [31]:
df.describe(['temp', 'rain_1h', 'snow_1h', 'clouds_all']).show(truncate=False)

+-------+------------------+-------------------+---------------------+-----------------+
|summary|temp              |rain_1h            |snow_1h              |clouds_all       |
+-------+------------------+-------------------+---------------------+-----------------+
|count  |40569             |40569              |40569                |40569            |
|mean   |281.3785764007    |0.07635041534176344|1.1733096699450319E-4|44.19882176045749|
|stddev |13.099580934750465|0.7696957215092919 |0.005676571537482932 |38.68463638299327|
|min    |243.39            |0.0                |0.0                  |0                |
|max    |310.07            |55.63              |0.51                 |100              |
+-------+------------------+-------------------+---------------------+-----------------+



Finally, save the resulting data frame in __parquet__ format so that it can be read and used in the next phases

In [32]:
df = df.repartition(1)
df.write.parquet('traffic_volume_etl_df.parquet')