# ETL Pipeline for Helsinki Bikes
### Data Engineering Capstone Project

#### Project Summary
The goal is to create a database where data scientists or data analysts can describe or make predictions. The created database can answer e.g. for the following questions:

1. What are the rush hours for bike stations?
2. How to predict, how many bikes are needed in which stations and when?
3. How many bike trips were made?
4. What are the distances and durations on bike trips?
5. How weather conditions affect to bike trips?

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *

from pyspark.sql.functions import col, unix_timestamp, round
from datetime import datetime

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import datetime

%matplotlib inline

### Step 1: Scope the Project and Gather Data

#### Scope 

In this project, we are gathering data from city bikes of Helsinki and weather. The main idea is to clean and prepare datasets for data scientists and data analysts that they can make innovative descriptions and predictions. First, I made a simple ETL pipeline, that is very straight forward. Later on, it is possible to use and evaluate the same pipeline to prepare all historical data and make OLAP versions for predicting ongoing bike business.

I made quite a simple and effective ETL pipeline, but it took a lot of time to try and test with a spark. I used Spark for processing big files and Pandas for making insights into the data. Data was first in the AWS S3 bucket and then it was cleaned and processed to create dimension and fact tables. After processing, these files were transformed back as parquet files to S3. AWS EMR cluster was needed during this process.

#### Describe and Gather Data 

Dataset can be found here:
City bike stations’ Origin-Destination (OD) data includes all trips made with city bikes of Helsinki and Espoo. The data includes information about the trip’s origin and destination stations, start and end times, distance (in meters) as well as duration (in seconds). https://hri.fi/data/en_GB/dataset/helsingin-ja-espoon-kaupunkipyorilla-ajatut-matkat
Finnish Meteorological Institute Instantaneous weather observations are available from 2010, daily, and monthly observations from the 1960s onwards (depending on weather station). https://en.ilmatieteenlaitos.fi/download-observations

In [2]:
#Build spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").enableHiveSupport().getOrCreate()

## Load datasets

In [3]:
input_data = "s3://helsinki-bikes/"
output_data = "s3://helsinki-bikes/results/" 

In [4]:
# load and read the dataset
bikes_data = input_data + '/bikes_data/2019/*.csv'
df = spark.read.csv(bikes_data, inferSchema=True, header=True, sep=',', encoding='utf-8')

In [5]:
df.count()

3787948

In [6]:
df.limit(3).toPandas()

Unnamed: 0,Departure,Return,Departure station id,Departure station name,Return station id,Return station name,Covered distance (m),Duration (sec.)
0,2019-06-30 23:59:49,2019-07-01 00:11:39,51,Itälahdenkatu,60,Porkkalankatu,2229.0,706
1,2019-06-30 23:59:21,2019-07-01 00:01:17,118,Fleminginkatu,117,Brahen puistikko,289.0,112
2,2019-06-30 23:59:19,2019-07-01 00:36:33,54,Gyldenintie,17,Varsapuistikko,4872.0,2230


In [7]:
# load weather dataset
temp_df = input_data + '/weather/2019/*.csv'
temp_df = spark.read.csv(temp_df,header=True, sep=',' ,encoding='utf-8')

In [8]:
temp_df.count()

30810

In [14]:
temp_pd = temp_df.toPandas()
temp_pd.head()

Unnamed: 0,Date,Time zone,Cloud amount (1/8),Pressure (msl) (hPa),Relative humidity (%),Precipitation intensity (mm/h),Air temperature (degC),Dew-point temperature (degC),Horizontal visibility (m),Wind direction (deg),Gust speed (m/s),Wind speed (m/s)
0,2019-07-01 00:10:00,UTC,3.0,996.0,83.0,0.0,16.5,13.5,47280.0,196.0,6.1,3.8
1,2019-07-01 00:20:00,UTC,3.0,995.8,84.0,0.0,16.2,13.5,49920.0,196.0,6.4,3.3
2,2019-07-01 00:30:00,UTC,1.0,995.6,84.0,0.0,16.3,13.5,39970.0,192.0,5.4,3.2
3,2019-07-01 00:40:00,UTC,1.0,995.7,83.0,0.0,16.3,13.4,43530.0,194.0,5.8,3.7
4,2019-07-01 00:50:00,UTC,1.0,995.4,82.0,0.0,16.2,13.1,43860.0,206.0,7.1,4.0


In [15]:
# load stations dataset
stations_df = input_data + '/stations/2019/*.csv'
stations_df = spark.read.csv(stations_df,header=True, sep=',',encoding='utf-8')

In [16]:
stations_df.count()

351

In [18]:
stations_pd = stations_df.toPandas()
stations_pd.head(3)

Unnamed: 0,FID,ID,Nimi,Namn,Name,Osoite,Adress,Kaupunki,Stad,Operaattor,Kapasiteet,x,y
0,1,501,Hanasaari,Hanaholmen,Hanasaari,Hanasaarenranta 1,Hanaholmsstranden 1,Espoo,Esbo,CityBike Finland,10,24.840319,60.16582
1,2,503,Keilalahti,Kägelviken,Keilalahti,Keilalahdentie 2,Kägelviksvägen 2,Espoo,Esbo,CityBike Finland,28,24.827467,60.171524
2,3,505,Westendinasema,Westendstationen,Westendinasema,Westendintie 1,Westendvägen 1,Espoo,Esbo,CityBike Finland,16,24.805758,60.168266


### Step 2: Explore and Assess the Data
#### Explore the Data 

I made a preliminary (added DateTime to weather data files) data exploration and founded out, that these datasets had quite minimal issues with missing or duplicated values. From the quality checks (end of this notebook, we can see how many records were removed. Anyhow making the clean dataset for creating tables was not so easy task. Spark had some odd features and it was very challenging to find the right workflow. It took many tries and errors.

The main thing with cleaning the data was to cast the right data types and setting up DateTime (timestamp data type) relation with bikes data and weather data (temp_df). Rounding timestamps with pySpark is not so easy as python. It looks simple now, but it was hard to find answers form Stackoverflow or anywhere, had to do many tries.  

#### Cleaning Steps
Document steps necessary to clean the data

# Performing cleaning tasks here
Bikes data:
- cast right data types
- rename columns
- create new features: extract year, month, day, weekday and hour from the timestamp 
- create rounded date column from original timestamp to make relation with weather data
- fill NaN values with 0

Weather data:
- cast right data types
- rename columns
- fill NaN values with 0

Stations data:
- cast right data types
- rename columns
- fill NaN values with 0




### Cleaning Bikes dataset and creating tables 

In [19]:
df.printSchema()

root
 |-- Departure: timestamp (nullable = true)
 |-- Return: timestamp (nullable = true)
 |-- Departure station id: integer (nullable = true)
 |-- Departure station name: string (nullable = true)
 |-- Return station id: integer (nullable = true)
 |-- Return station name: string (nullable = true)
 |-- Covered distance (m): double (nullable = true)
 |-- Duration (sec.): integer (nullable = true)



In [20]:
temp_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Time zone: string (nullable = true)
 |-- Cloud amount (1/8): string (nullable = true)
 |-- Pressure (msl) (hPa): string (nullable = true)
 |-- Relative humidity (%): string (nullable = true)
 |-- Precipitation intensity (mm/h): string (nullable = true)
 |-- Air temperature (degC): string (nullable = true)
 |-- Dew-point temperature (degC): string (nullable = true)
 |-- Horizontal visibility (m): string (nullable = true)
 |-- Wind direction (deg): string (nullable = true)
 |-- Gust speed (m/s): string (nullable = true)
 |-- Wind speed (m/s): string (nullable = true)



In [21]:
stations_df.printSchema()

root
 |-- FID: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Nimi: string (nullable = true)
 |-- Namn: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Osoite: string (nullable = true)
 |-- Adress: string (nullable = true)
 |-- Kaupunki: string (nullable = true)
 |-- Stad: string (nullable = true)
 |-- Operaattor: string (nullable = true)
 |-- Kapasiteet: string (nullable = true)
 |-- x: string (nullable = true)
 |-- y: string (nullable = true)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
For the main purpose the simplified star schema model suits best. I prepared a fact table to start quick analysis and with those exploration it is easy to follow new paths.

Fact table (fact_table) 3 787 377 records, 1.4.-31.102019
- date
- year
- month
- day
- weekday
- hour
- departure station name
- departure time
- return time
- return station name
- departure station id
- return station id
- distance
- duration
- air temperature
- humidity
- wind speed
- longitude
- latitude

Bikes dimension table (dim_bikes) 3 787 946 records
- departure station name
- departure time
- return time
- return station name
- distance
- duration
- date
- year
- month
- day
- weekday
- hour

Weather dimension table (dim_temp) 30 810 records, 1.4.-31.102019
- date
- cloud amount
- pressure (msl)
- relative humidity
- precipitation intensity
- air temperature
- dew-point temperature
- horizontal visibility
- wind direction
- gust speed
- wind speed

Stations dimension table (dim_stations) 351 records
- station id
- city
- name
- address
- operator
- capacity
- longitude
- latitude

#### 3.2 Mapping Out Data Pipelines
Use efective way to clean and create tables
1. Bikes data in dataframe df to clean and prepare for creating table 
2. Weather data in dataframe temp_df to clean and prepare for creating table 
3. Stations data in dataframe temp_df to clean and prepare for creating table 
4. Create tempView to create dimension bikes table 
5. Create tempView to create dimension weather table 
6. Create tempView to create dimension stations table 
7. Create tempView to create a fact table
8. Process data quality checks

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

In [22]:
# change the data types & create new features for futher analysis
# rename and cast data types to columns to align with data model 
df = df\
        .withColumn('departure', col('Departure').cast('timestamp'))\
        .withColumn('return', col('Return').cast('timestamp'))\
        .withColumnRenamed('Departure station id', 'dep_station_id') \
        .withColumnRenamed('Departure station name', 'dep_station_name') \
        .withColumn('ret_station_id', col('Return station id').cast('integer'))\
        .withColumnRenamed('Return station name', 'ret_station_name') \
        .withColumn('distance', col('Covered distance (m)').cast('integer'))\
        .withColumnRenamed('Duration (sec.)', 'duration')\
        .withColumn('date', ((round(unix_timestamp(col("departure")) / 600) * 600).cast('timestamp'))) \
        .withColumn('year', year('date'))\
        .withColumn('month', month('date'))\
        .withColumn('day', dayofweek('date'))\
        .withColumn('weekday', date_format(col('date'), 'EEEE'))\
        .withColumn('hour', hour('date'))\
        .drop('Departure station id', 'Departure station name','Return station id','Return station name','Covered distance (m)','Duration (sec.)')\
        .fillna(0)\
    

In [24]:
# rename and cast data types to columns to align with data model 
temp_df = temp_df\
        .withColumn('date', col('Date').cast('timestamp'))\
        .withColumn('cloud_amount', col('Cloud amount (1/8)').cast('float'))\
        .withColumn('pressure', col('Pressure (msl) (hPa)').cast('float'))\
        .withColumn('humidity', col('Relative humidity (%)').cast('float'))\
        .withColumn('precipitation', col('Precipitation intensity (mm/h)').cast('float'))\
        .withColumn('air_temp', col('Air temperature (degC)').cast('float'))\
        .withColumn('dev_point_temp', col('Dew-point temperature (degC)').cast('float'))\
        .withColumn('visibility', col('Horizontal visibility (m)').cast('float'))\
        .withColumn('wind_direc', col('Wind direction (deg)').cast('float'))\
        .withColumn('gust_speed', col('Gust speed (m/s)').cast('float'))\
        .withColumn('wind_speed', col('Wind speed (m/s)').cast('float'))\
        .withColumnRenamed('Time zone', 'time_zone')\
        .drop('Cloud amount (1/8)','Pressure (msl) (hPa)','Relative humidity (%)','Precipitation intensity (mm/h)','Air temperature (degC)', 'Dew-point temperature (degC)','Horizontal visibility (m)','Wind direction (deg)','Gust speed (m/s)','Wind speed (m/s)','Time zone' )\
        .fillna(0)\
           

In [25]:
# rename and cast data types to columns to align with data model 
stations_df = stations_df\
        .withColumn('station_id', col('ID').cast('integer'))\
        .withColumn('name', col('Nimi').cast('string'))\
        .withColumnRenamed('Osoite', 'address')\
        .withColumnRenamed('Kaupunki', 'city')\
        .withColumnRenamed('Operaattor', 'operator')\
        .withColumn('capacity', col('Kapasiteet').cast('integer'))\
        .withColumn('longitude', col('x').cast('float'))\
        .withColumn('latitude', col('y').cast('float'))\
        .fillna(0)\
    
    

### Create views & tables to write parquet files

In [28]:
# create bikes view from cleaned dataframe df
df.createOrReplaceTempView('bikes_view')

# create bikes dimension table
dim_bikes = spark.sql('''

SELECT distinct 
        bikes_view.dep_station_name,
        bikes_view.departure,
        bikes_view.return,
        bikes_view.ret_station_name,
        bikes_view.distance,
        bikes_view.duration,
        bikes_view.date,
        bikes_view.year,
        bikes_view.month,
        bikes_view.day,
        bikes_view.weekday,
        bikes_view.hour
      

FROM bikes_view
''')

In [29]:
# write dim_bikes to parquet files
dim_bikes.write.mode('overwrite').parquet(output_data + 'dim_bikes')

In [30]:
# create temperature view from cleaned dataframe temp_df
temp_df.createOrReplaceTempView('temp_view')

# create temperatures dimension table
dim_temp = spark.sql('''

SELECT 
        temp_view.date,
        temp_view.cloud_amount,
        temp_view.pressure,
        temp_view.humidity,
        temp_view.precipitation,
        temp_view.air_temp,
        temp_view.dev_point_temp,
        temp_view.visibility,
        temp_view.wind_direc,
        temp_view.gust_speed,
        temp_view.wind_speed
        
FROM temp_view
''')

In [31]:
# write dim_temp parquet files
temp_df.write.mode('overwrite').parquet(output_data + 'dim_temp')

In [32]:
# create stations view from cleaned dataframe stations_df
stations_df.createOrReplaceTempView('stations_view')

dim_stations = spark.sql('''

SELECT distinct 
        stations_view.station_id,
        stations_view.city,
        stations_view.name,
        stations_view.address,
        stations_view.operator,
        stations_view.capacity,
        stations_view.longitude,
        stations_view.latitude
        
        
        
FROM stations_view
''')

In [33]:
# write dim_stations parquet files
stations_df.write.mode('overwrite').parquet(output_data + 'dim_stations')

In [34]:
# create bikes view from cleaned dataframe stations_df
#df.createOrReplaceTempView('bikes_view')
#temp_df.createOrReplaceTempView('temp_view')

# Create the fact table by joining bikes view, temp viev and stations view

fact_table = spark.sql('''
SELECT  distinct
        bikes_view.date,
        bikes_view.year,
        bikes_view.month,
        bikes_view.day,
        bikes_view.weekday,
        bikes_view.hour,
        bikes_view.dep_station_name,
        bikes_view.departure,
        bikes_view.return,
        bikes_view.ret_station_name,
        bikes_view.dep_station_id,
        bikes_view.ret_station_id,
        bikes_view.distance,
        bikes_view.duration,
        temp_view.air_temp,
        temp_view.humidity,
        temp_view.wind_speed,
        stations_view.longitude,
        stations_view.latitude
        
   

FROM bikes_view
inner JOIN temp_view ON bikes_view.date = temp_view.date
JOIN stations_view ON bikes_view.dep_station_id = stations_view.station_id




''')

#### 4.2 Data Quality Checks

Data quality checks consits of count checks to ensure completeness



After finishing all the checks, data looks good to go.

In [35]:
def quality_check(df, description):
    '''
    Input: Spark dataframes, dimension's and a fact table with descriptions
    
    Output: Outcome of data quality check
    
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 

# Process data quality check
quality_check(df, "bikes dataset")
quality_check(dim_bikes, "bikes dimension table")
quality_check(temp_df, "temperature dataset")
quality_check(dim_temp, "temperature dimension table")
quality_check(stations_df, "stations dataset")
quality_check(fact_table, "bikes fact table")

Data quality check passed for bikes dataset with 3787948 records
Data quality check passed for bikes dimension table with 3787946 records
Data quality check passed for temperature dataset with 30810 records
Data quality check passed for temperature dimension table with 30810 records
Data quality check passed for stations dataset with 351 records
Data quality check passed for bikes fact table with 3787377 records


#### 4.3 Data dictionary 
Data dictionary is in the file Data Dictionary

#### Step 5: Complete Project Write Up

Spark was a very good technolygy for handling big datasets. Spark SQL was also very simple way to create tables without having standard SQL operations.

* Propose how often the data should be updated and why.

In this first phase data can be updated when analyst will need it. Later on it is possible update as often as needed, but the I'll have to make an automation such as Airflow can offer.

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
If the data was increased by 100x, I would no longer process the data like this. I would make an automated process for that, such as Airflow. Also a good implemention of data warehouse is needed then.

 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
If the need of SLA is that data needs to populate a dashboard daily, then I surely would use a scheduling tool such as Airflow to run the ETL pipeline. 

 * The database needed to be accessed by 100+ people.
 
If the data needs to be accessed by 100+ people, I could use Spark, Hive, Spark sql template views, and ofcourse Apache Airflow