# Data Engineering Capstone Project - Metro_Madrid <a class="anchor" id="top"></a>

## Project Summary
This notebook gets output data from [metro-big-data-unir](https://github.com/juananthony/metro-big-data-unir) project and create a database to analysis.

[*Metro de Madrid*](https://www.metromadrid.es/) is the name of the tube/subway service that operates in Madrid, Spain. This service has 302 stations on 13 lines plus a light rail system called *Metro Ligero*. This service is used, on average in 2020, more than 30 million times each month.

The project follows the follow steps:
* [Step 1: Scope the Project and Gather Data](#step-1)
* [Step 2: Explore and Assess the Data](#step-2)
* [Step 3: Define the Data Model](#step-3)
* [Step 4: Run ETL to Model the Data](#step-4)
* [Step 5: Project Write Up](#step-5)

***
**Set up the environment**

In [1]:
import os
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
                .builder \
                .appName('udacity-capstone') \
                .master("local[*]") \
                .getOrCreate()
spark.conf.set('spark.sql.session.timeZone', 'CET')

***
[Back to top](#top)
## Step 1: Scope the Project and Gather Data <a class="anchor" id="step-1"></a>

### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [3]:
DATA_PATH = './data'
lines_file = 'lines.csv'
stations_file = 'stations.csv'
mentions_file = 'mentions_20210210.csv'

In [4]:
lines = spark.read.csv(os.path.join(DATA_PATH, lines_file), header=True).dropDuplicates()
lines.count()

16

In [5]:
stations = spark.read.csv(os.path.join(DATA_PATH, stations_file), header=True).dropDuplicates()
stations.count()

281

In [6]:
from schemas.mentions_schema import mentions_schema
mentions = spark.read.csv(os.path.join(DATA_PATH, mentions_file), header=True, multiLine=True, escape='"', schema=mentions_schema)

In [8]:
mentions.groupBy('classification').count().show()

+--------------+------+
|classification| count|
+--------------+------+
|          null| 85465|
|       nothing|698591|
|     complaint|111612|
|         issue| 59499|
+--------------+------+



***
[Back to top](#top)
## Step 2: Explore and Assess the Data <a class="anchor" id="step-2"></a>
### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Lines
Check null if any line row exists with ```line``` or ```regex``` with null values.

In [9]:
from pyspark.sql.functions import col
lines.filter(col('line').isNull() | col('regex').isNull()).count()

0

### Cleaning Steps
Document steps necessary to clean the data

In [10]:
from pyspark.sql.functions import col, to_timestamp
classes = ['complaint', 'issue']
mentions = mentions.filter(col('classification').isin(classes))

In [11]:
mentions = mentions.withColumn('dt', to_timestamp(mentions.created_at, 'E MMM d HH:m:s Z y').alias('dt'))

In [12]:
from pyspark.sql.functions import when
from pyspark import StorageLevel
mentions = mentions.withColumn('full_text', 
                               when(~col('`extended_tweet_full_text`').isNull(), col('`extended_tweet_full_text`'))
                               .otherwise(col('text'))).drop('text','`extended_tweet_full_text`') \
                               .persist(StorageLevel.MEMORY_ONLY_SER)

***
[Back to top](#top)
## Step 3: Define the Data Model  <a class="anchor" id="step-3"></a>
### 3.1 Conceptual Data Model
The data we want to store is all messages that inform about any issue or complaint in a line or a station even if one message inform about an issue that affect two different lines. That the reason why the fact table is the inform fact, that can be a complaint or an issue. One tweet can inform about an issue that affect two lines (i.e.: a closed station and all lines that stops there). In other words, one tweet generates one or many "inform fact" records.

![fact-dimension diagram](./img/class_diagram.png "Fact-Dimension Diagram")

### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model
 
#### Line Dimension

***
[Back to top](#top)
## Step 4: Run ETL to model the data<a class="anchor" id="step-4"></a>

### 4.1 Create the data model
Build the data pipelines to create the data model.

In [30]:
OUTPUT_DIR = "./out"

#### Line Dimension
This dimension is based on the file content with all lines of Metro Madrid service.

In [25]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number
lines_w = Window.orderBy('line_name')
lines_dim = lines.withColumnRenamed('line','line_name').withColumn('line_id', row_number().over(lines_w))

Lines dimension is persisted as parquet file in ```lines``` folder inside the ```OUTPUT_DIR``` variable.

In [31]:
lines_file = os.path.join(OUTPUT_DIR, 'lines')
lines_dim.write.mode('overwrite').parquet(lines_file)

***
#### Station Dimension
Station dimensino is based on the file content with all stations of Metro de Madrid service.

In [14]:
station_w = Window.orderBy('station_name')
stations_dim = stations.select(stations.station.alias('station_name'), 'regex').withColumn('station_id', row_number().over(station_w))

Stations dimension is persisted as parquet file in ```stations``` folder inside the ```OUTPUT_DIR``` variable.

In [33]:
stations_file = os.path.join(OUTPUT_DIR, 'stations')
stations_dim.write.mode('overwrite').parquet(stations_file)

***
#### Class dimension
This dimension contains all possible incident that can be detected on tweets.

In [15]:
class_df = mentions.select(col('classification').alias('class_name')).distinct()
class_w = Window.orderBy('class_name')
class_dim = class_df.withColumn('class_id', row_number().over(class_w))

Class dimension is persisted as parquet file in ```classes``` folder inside the ```OUTPUT_DIR``` variable.

In [34]:
class_file = os.path.join(OUTPUT_DIR, 'classes')
class_dim.write.mode('overwrite').parquet(class_file)

***
#### Date dimension
This dimension is based on all date entries in tweet mentions. The ```create_at``` datetime is splitted in the following fields: ```year```, ```month```, ```day```, ```hour```, ```minute```

In [16]:
from pyspark.sql.functions import year, month, quarter, dayofweek, dayofmonth, hour, minute

date_w = Window.orderBy('year', 'month', 'day', 'hour', 'minute')

date_dim = mentions.select(
                        year(col('dt')).alias('year'),
                        quarter(col('dt')).alias('quarter'),
                        month(col('dt')).alias('month'),
                        dayofmonth(col('dt')).alias('day'),
                        dayofweek(col('dt')).alias('weekday'),
                        hour(col('dt')).alias('hour'),
                        minute(col('dt')).alias('minute')) \
                .dropDuplicates() \
                .withColumn('date_id', row_number().over(date_w))

Date dimension is persisted as parquet file in ```date``` folder inside the ```OUTPUT_DIR``` variable.

In [36]:
date_file = os.path.join(OUTPUT_DIR, 'date')
date_dim.write.mode('overwrite').parquet(date_file)

***
#### User Dimension
This step selects the columns and rename them:
* ```user.id``` -> ```user_id```
* ```user.name``` -> ```user_name```
* ```user.screen_name``` -> ```screen_name```
* ```user.description``` -> ```description```
* ```user.profile_image_url``` -> ```profile_image_url```
* ```user.profile_image_url_https``` -> ```profile_image_url_https```

In [17]:
user_dim = mentions.select(
            'user_id',
            'user_name',
            col('user_screen_name').alias('screen_name'),
            col('user_description').alias('description'),
            col('user_profile_image_url').alias('profile_image_url'),
            col('user_profile_image_url_https').alias('profile_image_url_https')
)

User dimension is persisted as parquet file in ```users``` folder inside the ```OUTPUT_DIR``` variable.

In [37]:
users_file = os.path.join(OUTPUT_DIR, 'users')
user_dim.write.mode('overwrite').parquet(users_file)

***
#### Tweet Dimension

To model this dimension, is join to the ```date``` dimension by year, month, day, hour and minute and get the following fields:
* tweet_id
* date_id
* user_id
* text
* reply_tweet_id

In [18]:
from pyspark.sql.functions import substring

tweet_dim = mentions.join(date_dim,
              (year(mentions.dt) == date_dim.year) &
              (month(mentions.dt) == date_dim.month) &
              (dayofmonth(mentions.dt) == date_dim.day) &
              (hour(mentions.dt) == date_dim.hour) &
              (minute(mentions.dt) == date_dim.minute),
              'inner'
             ) \
        .select(col('id').alias('tweet_id'),
                'date_id',
                col('`user_id`').alias('user_id'),
                col('full_text').alias('text'),
                col('in_reply_to_status_id').alias('reply_tweet_id')
               )

Tweet dimension is persisted as parquet file in ```tweets``` folder inside the ```OUTPUT_DIR``` variable.

In [46]:
tweets_file = os.path.join(OUTPUT_DIR, 'tweets')
tweet_dim.coalesce(2).write.mode('overwrite').parquet(tweets_file)

***
#### Fact table
In the tweet text any station or line can be mentioned. So, we used the regex included in lines and stations dataset to search those lines and stations in tweet text.

First, we need to extract two dictionaries with ids and regex for lines and stations. Then, we search every line and station regex in all tweet text, creating two new columns with an array that contains the ids of lines/stations mentioned in it.

<img src="./img/regex_web.png" style="height:450px;margin-left:auto;margin-right:auto;border:1px solid #888;"/>

First of all, 3 methods are defined:
* ```gen_tags()```. This method returns an array with tags. This tags are based on the dictionary with regex and the given text.
    If any regex is satisfied for the given text, the key of the dictionary is appended to the tag array.
* ```gen_line_tags()```. This method returns the an array tag with all line_id found in the given text.
* ```gen_station_tags()```. This method returns a tag array with all station_id found in the given text.

Once those methods are defined, they are used to create 2 ```udf```:
* ```gen_line_tags_udf```
* ```gen_station_tags_udf```

In [20]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import re

lines_dict = {elem.line_id: elem.regex for elem in lines_dim.collect()}
stations_dict = {elem.station_id: elem.regex for elem in stations_dim.collect()}

def gen_tags(text,dicc):
    """
    Returns an array with tags. This tags are based on the dictionary with regex and the given text.
    If any regex is satisfied for the given text, the key of the dictionary is appended to the tag array.
    """
    tags = []
    for key, expr in dicc.items():
        if re.search(expr, text, re.IGNORECASE):
            tags.append(key)
    return tags

def gen_line_tags(text):
    """
    Returns an tag array with all line_id found in the given text.
    """
    return gen_tags(text, lines_dict)
    
def gen_station_tags(text):
    """
    Returns a tag array with all station_id found in the given text.
    """
    return gen_tags(text, stations_dict)

gen_line_tags_udf = udf(gen_line_tags, ArrayType(StringType()))
gen_station_tags_udf = udf(gen_station_tags, ArrayType(StringType()))

Those ```udf``` functions are used to generate to new array columns: ```lines``` constains all line_id founded in ```full_text``` column using ```gen_line_tags_udf```, ```stations``` performs ```gen_station_tags_udf``` to generate all station_id founded in ```full_text```.

In [21]:
fact_aux = mentions.withColumn('lines', gen_line_tags_udf(col('full_text'))) \
                     .withColumn('stations', gen_station_tags_udf(col('full_text'))) \
                        .persist(StorageLevel.MEMORY_ONLY_SER)

When fact table has two array columns, we join to class dimentions to get class_id. After that, it does an ```explode_outer``` to get one row per array entry (in both columns).

In [22]:
from pyspark.sql.functions import explode_outer

fact_w = Window.orderBy('id')

fact_df = fact_aux \
        .join(class_dim, col('classification') == class_dim.class_name, 'inner') \
        .withColumn('issue_id', row_number().over(fact_w)) \
        .select('issue_id', col('id').alias('tweet_id'), 'class_id', 'lines', 'stations') \
        .select('issue_id', 'tweet_id', explode_outer('lines').alias('line_id'), 'stations') \
        .select('issue_id', 'tweet_id', 'line_id', explode_outer('stations').alias('station_id'))

Fact table is persisted as parquet file in ```fact_table``` folder inside the ```OUTPUT_DIR``` variable.

In [40]:
fact_file = os.path.join(OUTPUT_DIR, 'fact_table')
fact_df.coalesce(4).write.mode('overwrite').parquet(fact_file)

### 4.2 Data Quality Checks
Explain the data quality checks to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [23]:
user_dim.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- user_name: string (nullable = true)
 |-- screen_name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- profile_image_url: string (nullable = true)
 |-- profile_image_url_https: string (nullable = true)



In [24]:
fact_df = fact_df.filter(~col('issue_id').isNull() | ~col('tweet_id').isNull())

### 4.3 Data dictionary 

#### Dimension Tables

* **Line**
    * ```line_id```
        * ```Integer```
        * Line identifier.
    * ```line_name```
        * ```String```
        * Line name.
    * ```regex```
        * ```String```
        * Regex to search the line in a text.
* **Station**
    * ```Integer```
        * ```Long```
        * Station identifier.
    * ```station_name```
        * ```String```
        * Station name.
    * ```regex```
        * ```String```
        * Regex to search the station in a text.
* **Class**
    * ```class_id```
        * ```Integer```
        * Station identifier.
    * ```class_name```
        * ```String```
        * Station identifier.
* **User**
    * ```user_id```
        * ```Long```
        * User identifier.
    * ```user_name```
        * ```String```
        * User name.
    * ```screen_name```
        * ```String```
        * User unique string identifier.
    * ```description```
        * ```String```
        * User description.
    * ```profile_image_url```
        * ```String```
        * URL of profile image (HTTP protocol).
    * ```profile_image_url_https```
        * ```String```
        * URL of profile image (HTTPS protocol).
* **Date**
    * ```date_id```
        * ```Long```
        * Date identifier.
    * ```year```
        * ```Integer```
        * Year number (i.e.: 2019, 2020, 2021, ...).
    * ```quarter```
        * ```Integer```
        * Quarter of the year (i.e.: 1, 2, ...).
    * ```month```
        * ```Integer```
        * Month of the year as integer (i.e.: 1, 2, 3, 4, ...).
    * ```weekday```
        * ```Integer```
        * Day of the week (Sunday=1, Monday=2, ..., Saturday=7).
    * ```day```
        * ```Integer```
        * Day of the month (1, 2, 3, ...).
    * ```hour```
        * ```Integer```
        * Hour in 24-hour format (i.e.: 0, 1, 2, ..., 12, 13, 14, ..., 22, 23).
    * ```minute```
        * ```Integer```
        * Minute (i.e.: 0, 1, 2, 3, 4, ..., 59)
* **Tweet**
    * ```tweet_id```
        * ```Long```
        * Tweet identifier.
    * ```date_id```
        * ```Long```
        * Date id when the tweet was created.
    * ```user_id```
        * ```Long```
        * User id who is author of this tweet.
    * ```text```
        * ```String```
        * Tweet text.
    * ```reply_tweet_id```
        * ```Long```
        * If this tweet is a reply, this field references the tweet_id that this tweet is replying.

#### Fact Table

* **Inform**
    * ```issue_id```
        * ```Long```
        * Inform identifier.
    * ```tweet_id```
        * ```Long```
        * Tweet id that informs about a issue or complaint.
    * ```line_id```
        * ```Long```
        * Service line id.
    * ```station_id```
        * ```Long```
        * Station id.
    * ```class_id```
        * ```Long```
        * If this tweet is a reply, this field references the tweet_id that this tweet is replying.

***
[Back to top](#top)
### Step 5: Project Write Up <a class="anchor" id="step-5"></a>
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.