<a href="https://colab.research.google.com/github/pedrohenrocha/data_processing_module/blob/main/spark/challenges/challenge_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# CHALLENGE 1
##  Implement INGESTION process
- Set up path in the "lake"
  - !mkdir -p /content/lake/bronze

- Read data from API https://api.carrismetropolitana.pt/
  - Endpoints:
    - vehicles
    - lines
    - municipalities
  - Use StructFields to enforce schema

- Transformations
  - vehicles
    - create "date" extracted from "timestamp" column (format: date - yyyy-mm-dd or yyyymmdd)

- Write data as PARQUET into the BRONZE layer (/content/lake/bronze)
  - Partition "vehicles" by "date" column
  - Paths:
    - vehicles - path: /content/lake/bronze/vehicles
    - lines - path: /content/lake/bronze/lines
    - municipalities - path: /content/lake/bronze/municipalities
  - Make sure there is only 1 single parquet created
  - Use overwrite as write mode

# Setting up PySpark

In [1]:
%pip install pyspark



In [2]:
!mkdir -p /content/lake/bronze/vehicles
!mkdir -p /content/lake/bronze/lines
!mkdir -p /content/lake/bronze/municipalities

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName('Spark Course').config('spark.ui.port', '4050').getOrCreate()
sc = spark.sparkContext

In [4]:
import requests
from pyspark.sql.types import *

def readFromAPI(url: str, schema: StructType = None):
  response = requests.get(url)
  rdd = sc.parallelize(response.json())

  if schema:
    df = spark.read.schema(schema).json(rdd)
  else:
    df = spark.read.json(rdd)
  return df

In [5]:
#Definition of Schemas

municipalities_schema = StructType([StructField('district_id', StringType(), True),
                                    StructField('district_name', StringType(), True),
                                    StructField('id', StringType(), True),
                                    StructField('name', StringType(), True),
                                    StructField('prefix', StringType(), True),
                                    StructField('region_id', StringType(), True),
                                    StructField('region_name', StringType(), True)])

lines_schema = StructType([StructField('_corrupt_record', StringType(), True),
                           StructField('color', StringType(), True),
                           StructField('facilities', ArrayType(StringType(), True), True),
                           StructField('id', StringType(), True),
                           StructField('localities', ArrayType(StringType(), True), True),
                           StructField('long_name', StringType(), True),
                           StructField('municipalities', ArrayType(StringType(), True), True),
                           StructField('patterns', ArrayType(StringType(), True), True),
                           StructField('routes', ArrayType(StringType(), True), True),
                           StructField('short_name', StringType(), True),
                           StructField('text_color', StringType(), True)])

vehicles_schema = StructType([StructField('bearing', LongType(), True),
                              StructField('block_id', StringType(), True),
                              StructField('current_status', StringType(), True),
                              StructField('id', StringType(), True),
                              StructField('lat', DoubleType(), True),
                              StructField('line_id', StringType(), True),
                              StructField('lon', DoubleType(), True),
                              StructField('pattern_id', StringType(), True),
                              StructField('route_id', StringType(), True),
                              StructField('schedule_relationship', StringType(), True),
                              StructField('shift_id', StringType(), True),
                              StructField('speed', DoubleType(), True),
                              StructField('stop_id', StringType(), True),
                              StructField('timestamp', LongType(), True),
                              StructField('trip_id', StringType(), True)])




In [6]:
#Definition of endpoints as parameters
schemas = {
    "vehicles": vehicles_schema,
    "lines": lines_schema,
    "municipalities": municipalities_schema
}


endpoints = {
    "vehicles": "https://api.carrismetropolitana.pt/vehicles",
    "lines": "https://api.carrismetropolitana.pt/lines",
    "municipalities": "https://api.carrismetropolitana.pt/municipalities"}

In [7]:
# Iteration through the parameters defined above to read the data using the API and then writing them into their respective bronze folder

from pyspark.sql.functions import from_unixtime, col, to_date

for endpoint_name, url in endpoints.items():
  schema = schemas.get(endpoint_name)

  if schema:
    print(f"Reading data from {endpoint_name} endpoint...")
    df = readFromAPI(url, schema=schema)

    print(f"Successfully read data for {endpoint_name}.")

    if endpoint_name == "vehicles":
      df = df.withColumn("date", to_date(from_unixtime(col("timestamp"))))
      df.write.mode("overwrite").partitionBy("date").parquet(f"/content/lake/bronze/vehicles")
    else:
      df.write.mode("overwrite").parquet(f"/content/lake/bronze/{endpoint_name}")

  else:
    print(f"No schema defined for {endpoint_name} endpoint.")


Reading data from vehicles endpoint...
Successfully read data for vehicles.
Reading data from lines endpoint...
Successfully read data for lines.
Reading data from municipalities endpoint...
Successfully read data for municipalities.


# CHALLENGE 2
##  Implement CLEANSING process
- Set up path in the "lake"
  - !mkdir -p /content/lake/silver

- Read data from BRONZE layer as PARQUET:
    - vehicles - path: /content/lake/bronze/vehicles
    - lines - path: /content/lake/bronze/lines
    - municipalities - path: /content/lake/bronze/municipalities

- Transformations
  - vehicles
    - rename "lat" and "lon" to "latitude" and "longitude" respectively
    - remove possible duplicates
    - remove rows when the column CURRENT_STATUS is null
    - remove any corrupted record
  - lines
    - remove duplicates
    - remove rows when the column LONG_NAME is null
    - remove any corrupted record
  - municipalities
    - remove duplicates
    - remove rows when the columns NAME or DISTRICT_NAME are null
    - remove any corrupted record

- Write data as PARQUET into the SILVER layer (/content/lake/silver)
  - Partition "vehicles" by "date"(created in the ingestion)
  - Paths:
    - vehicles - path: /content/lake/silver/vehicles
    - lines - path: /content/lake/silver/lines
    - municipalities - path: /content/lake/silver/municipalities

In [12]:
# Creation of silver directories

!mkdir -p /content/lake/silver/vehicles
!mkdir -p /content/lake/silver/lines
!mkdir -p /content/lake/silver/municipalities

In [8]:
# Reading the parquet files from the bronze layer
bronze_paths = {
    "vehicles": "/content/lake/bronze/vehicles",
    "lines": "/content/lake/bronze/lines",
    "municipalities": "/content/lake/bronze/municipalities"
}

df_vehicles = spark.read.parquet(bronze_paths["vehicles"])
df_lines = spark.read.parquet(bronze_paths["lines"])
df_municipalities = spark.read.parquet(bronze_paths["municipalities"])

In [9]:
# Transformations in Vehicles

df_vehicles = df_vehicles.withColumnRenamed("lat", "latitude") \
                         .withColumnRenamed("lon", "longitude") \
                         .dropDuplicates() \
                         .filter(col("current_status").isNotNull())

if "_corrupt_record" in df_vehicles.columns:
  df_vehicles = df_vehicles.filter(col("_corrupt_record").isNull())

In [10]:
# Transformation in Lines

df_lines = df_lines.dropDuplicates() \
                   .filter(col("long_name").isNotNull())

if "_corrupt_record" in df_lines.columns:
  df_lines = df_lines.filter(col("_corrupt_record").isNull())

In [11]:
# Transformation in Municipalties

df_municipalities = df_municipalities.dropDuplicates() \
                                    .filter(col("name").isNotNull() | col("district_name").isNotNull())

if "_corrupt_record" in df_municipalities.columns:
  df_municipalities = df_municipalities.filter(col("_corrupt_record").isNull())

In [12]:
# Writing the dataframes as parquet in the silver layer

silver_paths = {
    "vehicles": "/content/lake/silver/vehicles",
    "lines": "/content/lake/silver/lines",
    "municipalities": "/content/lake/silver/municipalities"
}

df_vehicles.write.mode("overwrite").partitionBy("date").parquet(silver_paths["vehicles"])
df_lines.write.mode("overwrite").parquet(silver_paths["lines"])
df_municipalities.write.mode("overwrite").parquet(silver_paths["municipalities"])

# CHALLENGE 3
##  Implement ENRICH process
- Set up path in the "lake"
  - !mkdir -p /content/lake/gold

- Read data from SILVER layer
  - Paths:
    - vehicles - path: /content/lake/silver/vehicles
    - lines - path: /content/lake/silver/lines
    - municipalities - path: /content/lake/silver/municipalities
  - Use StructFields to enforce schema

- Enrichment
  - Enrich vehicles dataset with information from the line and municipalities
    - join vehicles with lines and municipalities
      - select all columns from vehicles + lines.long_name (name: line_name, format:string) + municipalities.name (name: municipality_name, format: array)
      - Note that "municipalities.name" is an array

- Write data as PARQUET into the GOLD layer (/content/lake/gold)
  - Dataset name: vehicles_enriched
  - Partition "vehicles_enriched" by "date" column
  - Paths:
    - vehicles - path: /content/lake/gold/vehicles_enriched
  - Make sure there is only 1 single parquet created
  - Use overwrite as write mode

In [None]:
!mkdir -p /content/lake/gold

In [13]:
# Reading the parquet files from the silver layer
silver_paths = {
    "vehicles": "/content/lake/silver/vehicles",
    "lines": "/content/lake/silver/lines",
    "municipalities": "/content/lake/silver/municipalities"
}

df_vehicles_gold = spark.read.parquet(silver_paths["vehicles"])
df_lines_gold = spark.read.parquet(silver_paths["lines"])
df_municipalities_gold = spark.read.parquet(silver_paths["municipalities"])

In [15]:
# Joins to get lines and municipalities names
from pyspark.sql.functions import collect_list, explode

# Explode municipalities array in df_lines_gold to join with municipalities
df_lines_municipalities = df_lines_gold.withColumn("municipality_id_line", explode(col("municipalities")))

# Join lines and municipalities to get municipality names for each line
df_lines_with_municipality_names = df_lines_municipalities.join(
    df_municipalities_gold.select(col("id").alias("municipality_id_mun"), col("name").alias("municipality_name_str")),
    col("municipality_id_line") == col("municipality_id_mun"),
    "left"
)

# Group by line_id and collect municipality names into an array
df_lines_aggregated_municipalities = df_lines_with_municipality_names.groupBy(col("id").alias("line_id_agg")).agg(
    collect_list("municipality_name_str").alias("municipality_name")
).drop("municipality_id_line", "municipality_id_mun", "municipality_name_str")

df_vehicles_enriched = df_vehicles_gold.join(
    df_lines_aggregated_municipalities,
    df_vehicles_gold.line_id == col("line_id_agg"),
    "left"
).join(
    df_lines_gold.select(col("id").alias("line_id_lines"), col("long_name").alias("line_name")),
    df_vehicles_gold.line_id == col("line_id_lines"),
    "left"
).drop("line_id_agg", "line_id_lines")

In [16]:
# Writing the dataframe as parquet in the gold layer
df_vehicles_enriched.write.mode("overwrite").partitionBy("date").parquet("/content/lake/gold/vehicles_enriched")

# CHALLENGE 4
##  Analyze data

- Query table "vehicles_enriched" in gold layer
- Aggregate data by municipality_name (array)
- Calculate:
  - count of vehicles (id) that pass through that municipality
  - sum speed of vehicles

Questions:
  - What are the top 3 municipalities by vehicles routes?
  - What are the top 3 municipalities with higher vehicle speed on average?


Tips:
- explode array into rows -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.explode.html


In [24]:
# Querying and aggregating the column to be analyzed
from pyspark.sql.functions import count, sum, explode, col, avg

df_analysis = spark.read.parquet("/content/lake/gold/vehicles_enriched")
df_analysis = df_analysis.withColumn("municipality_name", explode(col("municipality_name")))

aggregated_df = df_analysis.groupBy("municipality_name").agg(
    count("id").alias("count_vehicles"),
    avg("speed").alias("average_speed")
)

aggregated_df.show()

+-------------------+--------------+------------------+
|  municipality_name|count_vehicles|     average_speed|
+-------------------+--------------+------------------+
|           Odivelas|            46| 6.956521739130435|
|           Barreiro|             7| 9.325396825396826|
|             Sintra|            44| 6.578282828282831|
|            Palmela|            11|10.126262626262626|
|          Alcochete|             2| 4.027777777777778|
|            Cascais|             9| 6.728395061728395|
|Vila Franca de Xira|            23| 6.340579710144928|
|              Moita|             8| 7.708333333333334|
|            Amadora|            34|5.5964052287581705|
|             Almada|            51|7.7178649237472765|
|              Mafra|            11| 9.722222222222221|
|             Lisboa|           104| 7.852564102564099|
|             Loures|            66| 7.121212121212121|
|            Setúbal|            20|10.097222222222221|
|            Montijo|             9| 9.413580246

In [27]:
# Question 1: What are the top 3 municipalities by vehicles routes?

df_question1 = aggregated_df.orderBy(col("count_vehicles").desc()).limit(3)
df_question1.show()

+-----------------+--------------+------------------+
|municipality_name|count_vehicles|     average_speed|
+-----------------+--------------+------------------+
|           Lisboa|           104| 7.852564102564099|
|           Loures|            66| 7.121212121212121|
|           Almada|            51|7.7178649237472765|
+-----------------+--------------+------------------+



In [25]:
# Question 2: What are the top 3 municipalities with higher vehicle speed on average?

df_question2 = aggregated_df.orderBy(col("average_speed").desc()).limit(3)
df_question2.show()

+-----------------+--------------+------------------+
|municipality_name|count_vehicles|     average_speed|
+-----------------+--------------+------------------+
|          Palmela|            11|10.126262626262626|
|          Setúbal|            20|10.097222222222221|
|            Mafra|            11| 9.722222222222221|
+-----------------+--------------+------------------+

