# Capstone project
## Data preparation for data analysis

This notebook prepares data for develeoping machine learning models to give additional insights about the trips.

* This is a overly simplified example of the data engineers role in the production pipeline.
* In reality, several of these steps would be initially carried out by the data scientists (feature engineering).
* When the model has been developed we can start integrating it into the data pipeline.
* We'd possibly separate the data preparation and inference into own spark jobs that would be orchestrated in Airflow.
* Results of the inference would be written to the data warehouse with the other data.

PySpark EMR notebooks do not have pandas and matplotlib pre-installed, so we need to install them using the install_pypi_package function found in SparkContext.

In [2]:
sc.install_pypi_package("pandas")
sc.install_pypi_package("matplotlib")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas
  Downloading pandas-1.0.3-cp36-cp36m-manylinux1_x86_64.whl (10.0 MB)
Collecting python-dateutil>=2.6.1
  Downloading python_dateutil-2.8.1-py2.py3-none-any.whl (227 kB)
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-1.0.3 python-dateutil-2.8.1

Collecting matplotlib
  Downloading matplotlib-3.2.1-cp36-cp36m-manylinux1_x86_64.whl (12.4 MB)
Collecting pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1
  Downloading pyparsing-2.4.7-py2.py3-none-any.whl (67 kB)
Collecting kiwisolver>=1.0.1
  Downloading kiwisolver-1.2.0-cp36-cp36m-manylinux1_x86_64.whl (88 kB)
Collecting cycler>=0.10
  Downloading cycler-0.10.0-py2.py3-none-any.whl (6.5 kB)
Installing collected packages: pyparsing, kiwisolver, cycler, matplotlib
Successfully installed cycler-0.10.0 kiwisolver-1.2.0 matplotlib-3.2.1 pyparsing-2.4.7

## Imports and Configuration settings for the job

In [33]:
import os
from pyspark.sql.functions import year, month, dayofmonth, hour
from pyspark.ml.feature import OneHotEncoderEstimator

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
output_conf = {
    's3_bucket': 's3://dend-tomra',
    's3_model_key': 'taxi_ml/model',
    's3_data_key': 'taxi_ml/data',    
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [1]:
etl_conf = { 
    "s3_taxi_dir_path":"s3://dend-tomra/chicago-taxi-rides-2016",
    "s3_precip_file_path":"s3://dend-tomra/ghcnd/2016",
    "s3_weather_dir_path":"s3://dend-tomra/historical-hourly-weather-data",
    "s3_holidays_file_path":"s3://dend-tomra/US-Bank-holidays.csv"
}

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1587285063067_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Reading the data

For this first step we'll settle on reading only one month's data

In [6]:
# use for only one file
filename = 'chicago_taxi_trips_2016_01.csv'

# use for reading all files
# filename = '*'

df = spark.read \
    .format('csv') \
    .options(header=True, inferSchema=True) \
    .load(os.path.join(etl_conf['s3_taxi_dir_path'], filename))
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- taxi_id: integer (nullable = true)
 |-- trip_start_timestamp: timestamp (nullable = true)
 |-- trip_end_timestamp: timestamp (nullable = true)
 |-- trip_seconds: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- pickup_census_tract: string (nullable = true)
 |-- dropoff_census_tract: integer (nullable = true)
 |-- pickup_community_area: integer (nullable = true)
 |-- dropoff_community_area: integer (nullable = true)
 |-- fare: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- extras: double (nullable = true)
 |-- trip_total: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- company: integer (nullable = true)
 |-- pickup_latitude: integer (nullable = true)
 |-- pickup_longitude: integer (nullable = true)
 |-- dropoff_latitude: integer (nullable = true)
 |-- dropoff_longitude: integer (nullable = true)

In [14]:
# Take a look at the top rows
df.limit(5).toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   taxi_id trip_start_timestamp  ... dropoff_latitude  dropoff_longitude
0       85  2016-01-13 06:15:00  ...            199.0              510.0
1     2776  2016-01-22 09:30:00  ...              NaN                NaN
2     3168  2016-01-31 21:30:00  ...              NaN                NaN
3     4237  2016-01-23 17:30:00  ...            686.0              500.0
4     5710  2016-01-14 05:45:00  ...              NaN                NaN

[5 rows x 20 columns]

In [30]:
# Check initial number of records
df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1705805

## Feature engineering

For this step we are assuming that the data scientists have also taken a look at the data and given us the information about what features (columns) to use in the model.
Furthermore we're assuming that is this specific case we are developing a clustering algorithm to try to identify different customer profiles.

The features of interest are:
* Trip time of day (hour)
* Pickup community area
* Dropoff communicaty area

In addition we want to include year, month, day for partitioning of the resulting data.

The assumption is that data the analysists have found out that there is a strong correlation how pickup and dropoff areas affect trip duration, mileage and cost. Thus we can omit these features in this case.

### Extract hour
We use the *hour* function from the pyspark.sql.functions library to get the hour from `trip_start_timestamp`

In [25]:
df_with_hour = df.withColumn('year', year(df.trip_start_timestamp))\
                 .withColumn('month', month(df.trip_start_timestamp))\
                 .withColumn('day', dayofmonth(df.trip_start_timestamp))\
                 .withColumn('hour', hour(df.trip_start_timestamp))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Create dataframe with wanted features
We create a new DataFram only containing the features we will be using by the clustering model

In [26]:
df_features = df_with_hour.select('year', 'month', 'day', 'hour', 'pickup_community_area', 'dropoff_community_area')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Drop null values

Machine learning models cannot handle null values, so we will need to do something with those. One option is to fill them with e.g. average values or similar. Since we're looking at start and end locations a decision is made that the only feasible option is to drop the values, since making wrong assumptions how the customer is travelling probably would end up in an erroneous model.

In [27]:
df_no_nulls = df_features.dropna()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
df_no_nulls.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1382311

### Handling categorical features
Although the pickup and dropoff areas are conveniently already in numeric format we're not yet ready to pass them to a machine learning model. We could, but the results might not be what we expect. The reasonis that these features are considered categorical values meaning that they are used to indicate distinct values but the values in relation to each other is irrelevant. An area with value 100 is no better or worse than a value of 600, they merely belong to different categories. The same applies to hour data.

This situation is common in data science and is handled useing a method called one-hot enconding. 

From the PySpark documentation:
> A one-hot encoder that maps a column of category indices to a column of binary vectors, with at most a single one-value per row that indicates the input category index.

In [43]:
community_area_encoder = OneHotEncoderEstimator() \
    .setInputCols(['hour', 'pickup_community_area', 'dropoff_community_area']) \
    .setOutputCols(['hour_encoded', 'pickup_community_area_encoded', 'dropoff_community_area_encoded'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [47]:
community_area_model = community_area_encoder.fit(df_no_nulls)
df_encoded = community_area_model.transform(df_no_nulls)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [48]:
df_encoded.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- pickup_community_area: integer (nullable = true)
 |-- dropoff_community_area: integer (nullable = true)
 |-- hour_encoded: vector (nullable = true)
 |-- pickup_community_area_encoded: vector (nullable = true)
 |-- dropoff_community_area_encoded: vector (nullable = true)

### Save the model for later reference

In [51]:
bucket = output_conf['s3_bucket']
key = output_conf['s3_model_key']
encoder_name = 'community_area_encoder'
model_name = 'community_area_model'

encoder_path = os.path.join(bucket, key, encoder_name)
community_area_encoder.write().overwrite().save(encoder_path)

model_path = os.path.join(bucket, key, model_name)
community_area_model.write().overwrite().save(model_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Make the final DataFrame
Finally, let's drop the unneeded columns and store the final dataframe to S3.

In [53]:
df_final = df_encoded.select('year',
                             'month',
                             'day',
                             'hour_encoded',
                             'pickup_community_area_encoded',
                             'dropoff_community_area_encoded'
                            )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
bucket = output_conf['s3_bucket']
key = output_conf['s3_data_key']

output_path = os.path.join(bucket, key)

df_final.write.partitionBy('year', 'month', 'day') \
        .parquet(output_path, mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Final notes
When running the script on whole dataset it failed because some of the pickup and dropoff data was in string format albeit being numberic (otherwise the main ETL would have failed since that uses numeric data type for said columns). As a result the resulting `data_prep.py` script has two additional steps that were not included here. I perform `StringIndexer` transformation to both pickup and dropoff columns so that all values are converted to numeric values before feeding them to the one-hot encoder.