# Bixi Telemetry Data Warehouse
### Data Engineering Capstone Project

#### Project Summary
The goal of this project is to build a data warehouse for Bixi, a Montreal-based bike-sharing service. The modeled data will then be used to analyze ride pattern and availability of bikes at every station, enabling Bixi to optimize replenishment activities.

The project follows the following steps:,
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import requests
import io
import zipfile
import json
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import explode
from pyspark.sql import functions as F
from pyspark.sql import types as T
import psycopg2
import sql_queries
import configparser

In [2]:
GBFS_SOURCE = "s3a://bixi-gbfs-data"
GBFS_DESTINATION = "s3a://bixi-gbfs-data-parquet"

STATION_INFO_DATA = "https://gbfs.velobixi.com/gbfs/en/station_information.json"
TRIP_APRIL_DATA = "https://sitewebbixi.s3.amazonaws.com/uploads/docs/biximontreal-rentals-2021-04-87aaed.zip"
TRIP_APRIL_DATA_FILE = "OD_2021_04.csv"

### Step 1: Scope the Project and Gather Data

#### Scope 

This project will outline the steps to obtain the raw data, transform and clean it, write it back to an S3 bucket and finally injest it into a Redshift datawarehouse. The source data will be collected from GBFS feed data from Bixi that had been previously extracted every 5 minutes for several weeks and stored in an S3 bucket. For. this project, 14 days worth of GBFS feed was gathered for a total of 2100 JSON files of 693 records each, totalling roughly 1.4M records. The latest station metadata and historical trip data will be retrieved from the Bixi REST API directly. The data will be read, cleansed and transfered using a Spark cluster, for efficiency given the large data size. The data warehouse will be hosted on an Amazon Redshift cluster.

#### Data 
The main dataset used will be a json output of the Bixi API, which exposes their [GBFS (General Bikeshare Feed Specification)](https://github.com/NABSA/gbfs) feed. The API was queries every 5 minutes for 3 weeks, using a Python script scheduled by a cron job. The response json was then uploaded to an Amazon S3 bucket.



#### Setting up the Spark cluster and reading in the data

We start by provisioning the AWS access ID and secret key in the spark config and instantiate a spark session

In [3]:
config = configparser.ConfigParser()
config.read(os.path.expanduser("~/.aws/credentials"))
access_id = config.get("default", "aws_access_key_id") 
access_key = config.get("default", "aws_secret_access_key")

config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB = config.get("DWH","DWH_DB")
DWH_DB_USER = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH","DWH_PORT")
DWH_ENDPOINT = config.get("DWH","DWH_ENDPOINT")
DWH_IAM_ROLE_NAME = config.get("DWH", "DWH_IAM_ROLE_NAME")

In [4]:
conf = SparkConf()
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.0')
conf.set("fs.s3a.access.key", access_id)
conf.set("fs.s3a.secret.key", access_key)

spark = SparkSession.builder.config(conf=conf).getOrCreate()

Once the Spark session is created, all the `json` files in the S3 bucket are then read into a Spark dataframe. The nested station data is exploded to individual rows and split by column. The unix epoch timestamp in column `last_updated` is converted to a timestamp, then several time parts are expanded into multiple column, which will later be used to partition the parquet output 

In [5]:
df = spark.read.json(f"{GBFS_SOURCE}/*.json")

In [6]:
df_flat = (df
    .select("last_updated",explode("data.stations").alias('stations'))
    .select("last_updated", "stations.*")    
)

In [7]:
df_flat = df_flat \
    .withColumn("last_updated_dt", F.from_unixtime("last_updated").cast('timestamp')) \
    .withColumn("station_id", df_flat.station_id.cast('long')) \
    .withColumn('is_installed', df_flat.is_installed.cast('boolean')) \
    .withColumn('is_renting', df_flat.is_renting.cast('boolean')) \
    .withColumn('is_returning', df_flat.is_returning.cast('boolean')) \
    .withColumn("year", F.year("last_updated_dt")) \
    .withColumn("month", F.month("last_updated_dt")) \
    .withColumn("day", F.dayofmonth("last_updated_dt"))


In [8]:
df_flat = df_flat.select(
    ["station_id"] + [c for c in df_flat.columns if c not in [
        'station_id','eightd_has_available_keys','last_updated'
    ]]
)

In [9]:
df_flat.printSchema()

root
 |-- station_id: long (nullable = true)
 |-- is_charging: boolean (nullable = true)
 |-- is_installed: boolean (nullable = true)
 |-- is_renting: boolean (nullable = true)
 |-- is_returning: boolean (nullable = true)
 |-- last_reported: long (nullable = true)
 |-- num_bikes_available: long (nullable = true)
 |-- num_bikes_disabled: long (nullable = true)
 |-- num_docks_available: long (nullable = true)
 |-- num_docks_disabled: long (nullable = true)
 |-- num_ebikes_available: long (nullable = true)
 |-- last_updated_dt: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)



Next, the Bixi API is called again to obtain the latest station metadata

In [10]:
json_output = "data/station_info.json"
with open(json_output, 'w') as f:
    json.dump(requests.get(STATION_INFO_DATA).json(),f)

In [11]:
df_station = spark.read.json(json_output)

In [12]:
df_station_flat = df_station \
    .select(explode("data.stations").alias('stations')) \
    .select("stations.station_id"
        , "stations.name"
        , 'stations.lat'
        , "stations.lon"
    )

In [13]:
df_station_flat.printSchema()

root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)



The last dataset is trip data from the month april 2021, which is the same timeframe as the GBFS feed, which needs to be extracted from a `zip` file, read into a spark dataframe from a `csv` file.

In [14]:
csv_path = f"data/{TRIP_APRIL_DATA_FILE}"

r = requests.get(TRIP_APRIL_DATA)
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extract(TRIP_APRIL_DATA_FILE, path = 'data')

'data/OD_2021_04.csv'

In [15]:
df_trips = spark.read.csv(csv_path, header=True)
df_trips = df_trips.withColumn("trip_id",F.monotonically_increasing_id())

In [16]:
df_trips = df_trips \
    .withColumn("start_date", F.from_utc_timestamp("start_date","EST")) \
    .withColumn("end_date", F.from_utc_timestamp("end_date","EST")) \
    .withColumn("duration_sec", df_trips.duration_sec.cast('int')) \
    .withColumn("start_station_id", df_trips.emplacement_pk_start.cast('long')) \
    .withColumn("end_station_id", df_trips.emplacement_pk_end.cast('long')) \
    .withColumn("is_member", df_trips.is_member.cast('boolean')) \
    .withColumn("year", F.year("start_date")) \
    .withColumn("month", F.month("start_date")) \
    .withColumn("day", F.dayofmonth("start_date"))

df_trips = df_trips.select(["trip_id"] + [c for c in df_trips.columns if c not in ["trip_id","emplacement_pk_start","emplacement_pk_end"]])

In [17]:
df_trips.printSchema()

root
 |-- trip_id: long (nullable = false)
 |-- start_date: timestamp (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- duration_sec: integer (nullable = true)
 |-- is_member: boolean (nullable = true)
 |-- start_station_id: long (nullable = true)
 |-- end_station_id: long (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)



### Step 2: Explore and Assess the Data

Since this data comes from the Bixi API and comes from automated telemetry, we can assume that the data is faily clean. Neverthless there are likely scenarios we would like to exclude.

Let's first look like there are any trip data with 0 duration, missing station, or where the end datetime is smaller or equal to the start datetime. 

In [22]:
errors = df_trips.where(
    df_trips.end_date <= df_trips.start_date
).collect()

print(f"{str(len(errors))} errors found")

0 errors found


In [23]:
zero_duration = df_trips.where(
    df_trips.duration_sec == 0
).collect()

print(f"{str(len(zero_duration))} errors found")

0 errors found


In [24]:
missing_station = df_trips.where(
    df_trips.start_station_id.isNull() | \
    df_trips.end_station_id.isNull()
).collect()

print(f"{str(len(missing_station))} errors found")

0 errors found


No errors found there. As expected, such errors were probably already removed by the Bixi team. We can also check if we have trips that started and ended at the same station. Since this could be a valid use case for someone going on a ride for fun, we can restrict to a duration of no more than 2 min. Such trips are likely not real trips and should be excluded.

In [25]:
dummy_trip = df_trips.where(
    (df_trips.start_station_id == df_trips.end_station_id) & \
    (df_trips.duration_sec < 120)
)

print(f"{str(len(dummy_trip.collect()))} errors found")

778 errors found


There are many of such instances. Let us remove them from the data frame

In [26]:
df_trips_clean = df_trips.join(dummy_trip, on='trip_id', how='left_anti')

Lastly, the transformed and cleaned dataframe are written back to an S3 bucket as `parquet` and `csv` files. The trips and gbfs files are partitioned by year, month, day, given the size and frequency of the raw data

In [23]:
gbfs_file_destination = f"{GBFS_DESTINATION}/gbfs.parquet"

df_flat.write \
    .partitionBy('year','month','day') \
    .parquet(path = gbfs_file_destination, mode = "overwrite") 

In [25]:
stations_destination = f"{GBFS_DESTINATION}/stations.csv"
df_station_flat.write.csv(
    path = stations_destination, 
    mode = "overwrite"
)

In [41]:
trips_destination = f"{GBFS_DESTINATION}/trips.parquet"
df_trips_clean.write \
    .partitionBy('year','month', 'day') \
    .parquet(
        path = trips_destination, 
        mode = "overwrite"
    )

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

![](resources/erd.png)

The stations table is the master dimension table for the bike stations. Both the trips and gbfs tables have foreign key relationships to it. Additionally, a time dimension table was also created containing various time parts of the timestamps found in the fact tables. The goal is to pre-compute these as they will likely be heavily used for analytics purposes. This allows us to improve query performance and reduce cost.

#### 3.2 Mapping Out Data Pipelines

![](resources/pipeline.png)

The first step is a scheduled Python script through cron job that fetches the latest GBFS feed data from Bixi every 5 minutes. The data is saved to a S3 bucket. Once collected, the data is read from S3 by a Spark job on a jupyter notebook. The notebook script also retrives the latest station metadata, along with the last month of trip data. The 3 resulting dataframes are stored to a separate S3 bucket in parquet and csv format. From there, the notebook script executes queries into the Redshift cluster to crate the tables and copy the parquet data to those tables. Finally, the time dimension table is computed.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

First, we instantiated the connection to the Redshift database

In [18]:
conn = psycopg2.connect(
    dbname= DWH_DB, 
    host= DWH_ENDPOINT, 
    port= DWH_PORT, 
    user= DWH_DB_USER, 
    password= DWH_DB_PASSWORD
)

In [19]:
def run_query(query):
    cur = conn.cursor()
    results = None
    try:
        cur.execute(query)
        try:
            results = cur.fetchall()
        except:
            pass
        conn.commit()
        cur.close()
    except Exception as e:
        print(e)
        cur.close()
    return results

Then we create the necessary tables

In [16]:
for query in sql_queries.create_queries:
    run_query(query)

Finally, we run a query for each table which copies the data from S3 directly into our Redshift tables

In [44]:
trips_destination = f"{GBFS_DESTINATION}/trips.parquet"

copy_trips = sql_queries.copy_table_query.format(
    'trips',trips_destination.replace('s3a','s3'), access_id, access_key, 'parquet'
)
run_query(copy_trips)

In [45]:
stations_destination = f"{GBFS_DESTINATION}/stations.csv"
copy_stations = sql_queries.copy_table_query.format(
    'stations',stations_destination.replace('s3a','s3'), access_id, access_key, 'csv'
)
run_query(copy_stations)

In [46]:
gbfs_file_destination = f"{GBFS_DESTINATION}/gbfs.parquet"
copy_gbfs= sql_queries.copy_table_query.format(
    'gbfs',gbfs_file_destination.replace('s3a','s3'), access_id, access_key, 'parquet'
)
run_query(copy_gbfs)

Lastly, the time table is computed from the loaded tables' timestamp columns

In [7]:
run_query(sql_queries.copy_time_query)

We can also run a sample query to test the table joins

In [8]:
run_query(sql_queries.sample_query)

[(47, 'William / St-Henri', 5, 5050),
 (147, 'Calixa-Lavallée / Sherbrooke', 6, 2020),
 (171, 'Wolfe / Robin', 11, 3939),
 (289, 'Boyer / Beaubien', 14, 4646),
 (344, 'de la Roche / Everett', 4, 1212)]

#### 4.2 Data Quality Checks

Next, we will ensure the pipeline ran sucessfully by comparing the row count in the source dataframe with the row count of the resulting table. An error will be thrown for any errors

In [20]:
def check_row_count(df_loaded, tbl_name):
    df_count = df_loaded.count()
    tbl_count = run_query(sql_queries.check_row_count.format(tbl_name))[0][0]
    if df_count != tbl_count:
        raise ValueError (f"Data check failed for table {tbl_name}. Row count mistamatch. Expected {df_count}, got {tbl_count}")
    else:
        print(f"Data check passed from table {tbl_name} passed with {tbl_count} rows")

In [27]:
check_row_count(df_flat, 'gbfs')
check_row_count(df_station_flat, 'stations')
check_row_count(df_trips_clean, 'trips')

Data check passed from table gbfs passed with 1312541 rows
Data check passed from table stations passed with 707 rows
Data check passed from table trips passed with 237177 rows


In [28]:
def check_primary_key_constraint(tbl_name, col_name):
    cnt = run_query(sql_queries.check_primary_key_constraint.format(col_name,tbl_name))[0][0]
    if cnt > 0:
        raise ValueError (f"Data check failed for table {tbl_name}. {cnt} values in column {col_name} are not unique")
    else:
        print(f"Data check passed from table {tbl_name}. Primary key constraint respected.")

In [29]:
check_primary_key_constraint('trips','trip_id')
check_primary_key_constraint('stations', 'station_id')

Data check passed from table trips. Primary key constraint respected.
Data check passed from table stations. Primary key constraint respected.


#### 4.3 Data dictionary 

`trips`

| Column | Data Type | Description |
|:--- |:--- |:---|
| trip_id | **integer** | Unique identifier for the trips, added during transformation|
| start_date | **timestamp** | Start datetime of the trip|
| end_date | **timestamp** | End datetime of the trip|
| duration_sec | **integer** | Trip duration in seconds|
| is_member | **boolean** | Flag identifying if trip was made by a Bixi member|
| start_station_id | **integer** | Station identifier where trip began
| end_startion_id | **integer** | Station identifier where trip ended

`station`

| Column | Data Type | Description |
|:--- |:--- |:---|
| station_id | **integer** | Unique identifier for the station|
| name | **varchar** | Long name of the station|
| lat | **decimal** | Latitude of the station location|
| lon | **decimal** | Longitude of the station location|

`gbfs`

| Column | Data Type | Description |
|:--- |:--- |:---|
| station_id | **integer** | Unique identifier for the station|
| is_charging | **boolean** | If station is being charged|
| is_installed | **boolean** | If station is installed|
| is_renting | **boolean** | If station is renting out bikes|
| is_returning | **boolean** | If station is raccepting bike returns|
| last_reported | **integer** | Epoch unix timestamp when station last reported data|
| num_bikes_available | **integer** | Number of bikes available|
| num_bikes_disabled | **integer** | Number of bikes disabled|
| num_docks_available | **integer** | Number of docks available|
| num_docks_disabled| **integer** | ENumber of bikes disabled|
| num_ebikes_available | **integer** | Number of electric bikes available|
| last_updated_dt | **integer** | Timestamp when GBFS feed API was queried|

`time`

| Column | Data Type | Description |
|:--- |:--- |:---|
| datetime | **timestamp** | Unique identifier for the timestamp|
| year | **integer** | Year of timestamp|
| month | **integer** | Month # of timestamp|
| day | **integer** | Day of month of timestamp|
| day_of_week | **integer** | Day of week of timestamp|
| hour | **integer** | Hour of timestamp|


#### Step 5: Complete Project Write Up

Summary of steps taken for this project:
1. The first step was to routinely call the Bixi GBFS api to grab the latest station data and push the response content to an S3 bucket
2. Next, a Redshift cluster was setup on AWS and provisioned with the appropriate permissions
3. Then, a Spark cluster was set up to handle the data load and transformations
4. The Jupyter notebook script starts by retrieving the GBFS data from S3 and reads it into a dataframe. The latest station information and historical trip data is also retrieved from the Bixi API and read into separate dataframe
5. The script then transforms the data by casting to specific data types, and computing year, month and day columns which are then used to partition the parquet files
6. Some assessment is then done on the data. The trip data is cleansed by removing outlier trips, defined as starting and ending at the same station for a duration of less than 2 minutes.
7. The dataframes are then written back to a separate S3 bucket, in parquet or csv format, with appropriate partitionning. 
8. Finally, a connection to the Redshift cluster is instantiated, and a query is run to first create the tables if they dont already exist, then copy each of the 3 files to their destination tables
9. Finally, data check are run to validate that the table row count matches the dataframe row count


The pipeline architecture was chose with efficiency and cost in mind. As Bixi generates a lot of data from their infrastructure, it needs to be stored in scalable storage, which S3 allows. Furthermore, leveraging the distributed computing power of Spark allows this pipeline even more scalability as this data will likely increase in volume

Given that the trip data is available monthly, the pipeline would also be run monthly.

Future use case scenarios:
* If the data were to increase by 100x, the S3 storage would allow scalability. However, the Spark cluster might need to be upgraded with to have additional, more powerful nodes. Same goes for the Redshift cluster. 
* If a dashboard needed to be produced and updated on a daily basis ay 7am, the pipeline should then also be run daily, early in the monring to allow ample time for execution. The trip data would also need to be made available more often (For the purposes of this project, the only source was the Bixi open data platform which is updated monthly, however Bixi would presumable have access to live data). Additionally, to handle the daily loads, this pipeline could be set up as an Airflow DAG which would handle scheduling, and retries upon failure, and backfill.
* If 100+ people needed access to the database, while Redshift should allow scalability, the infrastructure could benefit from an added load balancer (e.g. Kubernetes cluster) which would better control concurrent traffic. IAM user management will allow to give specific specific permissions on the database as specific by specific roles. Furthermore, the notebook should be moved to execute on an EMR cluster.