<a href="https://colab.research.google.com/github/vaniamv/dataprocessing/blob/main/spark/challenges/challenge_final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# CHALLENGE 1
##  Implement INGESTION process
- Set up path in the "lake"
  - !mkdir -p /content/lake/bronze

- Read data from API https://api.carrismetropolitana.pt/
  - Endpoints:
    - vehicles
    - lines
    - municipalities
  - Use StructFields to enforce schema

- Transformations
  - vehicles
    - create "date" extracted from "timestamp" column (format: hh24miss)

- Write data as PARQUET into the BRONZE layer (/content/lake/bronze)
  - Partition "vehicles" by "date" column
  - Paths:
    - vehicles - path: /content/lake/bronze/vehicles
    - lines - path: /content/lake/bronze/lines
    - municipalities - path: /content/lake/bronze/municipalities
  - Make sure there is only 1 single parquet created
  - Use overwrite as write mode

# CHALLENGE 2
##  Implement CLEANSING process
- Set up path in the "lake"
  - !mkdir -p /content/lake/silver

- Read data from BRONZE layer as PARQUET:
    - vehicles - path: /content/lake/bronze/vehicles
    - lines - path: /content/lake/bronze/lines
    - municipalities - path: /content/lake/bronze/municipalities

- Transformations
  - vehicles
    - rename "lat" and "lon" to "latitude" and "longitude" respectively
    - remove possible duplicates
    - remove rows when the column CURRENT_STATUS is null
    - remove any corrupted record
  - lines
    - remove duplicates
    - remove rows when the column LONG_NAME is null
    - remove any corrupted record
  - municipalities
    - remove duplicates
    - remove rows when the columns NAME or DISTRICT_NAME are null
    - remove any corrupted record

- Write data as PARQUET into the SILVER layer (/content/lake/silver)
  - Partition "vehicles" by "date"(created in the ingestion)
  - Paths:
    - vehicles - path: /content/lake/silver/vehicles
    - lines - path: /content/lake/silver/lines
    - municipalities - path: /content/lake/silver/municipalities

# CHALLENGE 3
##  Implement ENRICH process
- Set up path in the "lake"
  - !mkdir -p /content/lake/gold

- Read data from SILVER layer
  - Paths:
    - vehicles - path: /content/lake/silver/vehicles
    - lines - path: /content/lake/silver/lines
    - municipalities - path: /content/lake/silver/municipalities
  - Use StructFields to enforce schema

- Enrichment
  - Enrich vehicles dataset with information from the line and municipalities
    - join vehicles with lines and municipalities
      - select all columns from vehicles + lines.long_name (name: line_name, format:string) + municipalities.name (name: municipality_name, format: array)
      - Note that "municipalities.name" is an array

- Write data as PARQUET into the GOLD layer (/content/lake/gold)
  - Dataset name: vehicles_enriched
  - Partition "vehicles_enriched" by "date" column
  - Paths:
    - vehicles - path: /content/lake/gold/vehicles_enriched
  - Make sure there is only 1 single parquet created
  - Use overwrite as write mode

# CHALLENGE 4
##  Analyze data

- Query table "vehicles_enriched" in gold layer
- Aggregate data by municipality_name (array)
- Calculate:
  - count of vehicles (id) that pass through that municipality
  - sum speed of vehicles

Questions:
  - What are the top 3 municipalities by vehicles routes?
  - What are the top 3 municipalities with higher vehicle speed on average?


Tips:
- explode array into rows -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.explode.html

# Setting up PySpark

In [None]:
%pip install pyspark



In [None]:
!rm -rf /content/*

In [None]:
!mkdir -p /content/lake/bronze

In [None]:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import requests

class ETLFlow:
    def __init__(self, spark: SparkSession) -> None:
        self.spark = spark

    def extract_from_file(self, format: str, path: str, **kwargs) -> DataFrame:
        df = self.spark.read.format(format).load(path)
        return df

    def extract_from_api(self, url: str, schema: StructType = None):
      response = requests.get(url)
      rdd = spark.sparkContext.parallelize(response.json())
      if schema:
        df = spark.read.schema(schema).json(rdd)
      else:
        df = spark.read.json(rdd)
      return df

    def load(self, df: DataFrame, format: str, path: str, **kwargs) -> None:
        df.write.mode("overwrite").format(format).save(path)

class ETLTask(ETLFlow):

    def __init__(self, spark: SparkSession) -> None:
        self.spark = spark

    def ingestion_vehicles(self):
      vehicle_schema = StructType([StructField('bearing', IntegerType(), True),
                                  StructField('block_id', StringType(), True),
                                  StructField('current_status', StringType(), True),
                                  StructField('id', StringType(), True),
                                  StructField('lat', FloatType(), True),
                                  StructField('line_id', StringType(), True),
                                  StructField('lon', FloatType(), True),
                                  StructField('pattern_id', StringType(), True),
                                  StructField('route_id', StringType(), True),
                                  StructField('schedule_relationship', StringType(), True),
                                  StructField('shift_id', StringType(), True),
                                  StructField('speed', FloatType(), True),
                                  StructField('stop_id', StringType(), True),
                                  StructField('timestamp', TimestampType(), True),
                                  StructField('trip_id', StringType(), True)])
      df = self.extract_from_api(url="https://api.carrismetropolitana.pt/vehicles", schema=vehicle_schema)
      self.write_partitioned_parquet(df=df, path="/content/lake/bronze/vehicles")


    def ingestion_lines(self):
      lines_schema = StructType([StructField("id", StringType(), True),
                                 StructField("short_name", StringType(), True),
                                 StructField("long_name", StringType(), True),
                                 StructField("municipalities", ArrayType(StringType()), True),
                                 StructField("localities", ArrayType(StringType()), True),
                                 StructField("routes", ArrayType(StringType()), True),
                                 StructField("patterns", ArrayType(StringType()), True),
                                 StructField("facilities", ArrayType(StringType()), True)])
      df = self.extract_from_api(url="https://api.carrismetropolitana.pt/lines", schema=lines_schema)
      self.load(df=df, format="parquet", path="/content/lake/bronze/lines")


    def ingestion_municipalities(self):
        municipalities_schema = StructType([StructField("district_id", StringType(), True),
                                            StructField("district_name", StringType(), True),
                                            StructField("id", StringType(), True),
                                            StructField("name", StringType(), True),
                                            StructField("prefix", StringType(), True),
                                            StructField("region_id", StringType(), True),
                                            StructField("region_name", StringType(), True)])
        df = self.extract_from_api(url="https://api.carrismetropolitana.pt/municipalities", schema=municipalities_schema)
        self.load(df=df, format="parquet", path="/content/lake/bronze/municipalities")

    def cleansing_vehicles(self):
      df = self.extract_from_file(format="parquet", path="/content/lake/bronze/vehicles")
      df = df.withColumnRenamed("lat", "latitude")\
                  .withColumnRenamed("lon", "longitude")
      df = df.drop_duplicates()
      df = df.dropna(subset=['CURRENT_STATUS'])
      self.write_partitioned_parquet(df=df, path="/content/lake/silver/vehicles")

    def cleansing_lines(self):
      df = self.extract_from_file(format="parquet", path="/content/lake/bronze/lines")
      df = df.drop_duplicates()
      df = df.dropna(subset=['LONG_NAME'])
      self.load(df=df, format="parquet", path="/content/lake/silver/lines")

    def cleansing_municipalities(self):
      df = self.extract_from_file(format="parquet", path="/content/lake/bronze/municipalities")
      df = df.drop_duplicates()
      df = df.dropna(subset=['NAME', 'DISTRICT_NAME'])
      self.load(df=df, format="parquet", path="/content/lake/silver/municipalities")

    def write_partitioned_parquet(self, df: DataFrame, path: str):

      df = df.withColumn("date", expr("date(timestamp)"))

      self.spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
      (df
        .coalesce(1) #make sure that there is only one parquet created
        .write
        .mode("overwrite")
        .partitionBy("date")
        .format("parquet")
        .save(path))

    def enrich(self):
      vehicles = self.extract_from_file(format="parquet", path="/content/lake/silver/vehicles")
      lines = self.extract_from_file(format="parquet", path="/content/lake/silver/lines")
      municipalities = self.extract_from_file(format="parquet", path="/content/lake/silver/municipalities")

      #Transformations' logic 1
      vehicles_enriched = vehicles.join(lines, vehicles.line_id == lines.id, "inner").select(vehicles["*"],lines["long_name"])

      #Transformations' logic 2
      final_df = lines.select("id", explode("municipalities").alias("municipality_id")) \
                .join(municipalities.selectExpr("id as municipality_id", "name"), "municipality_id", "left") \
                .groupBy("id").agg(collect_list("name").alias("municipality_names_array")) \
                .withColumnRenamed("id", "line_id")  # Rename 'id' to 'line_id' for clarity

      # Join with vehicles_enriched
      vehicles_enriched = vehicles_enriched.join(final_df, "line_id", "left")

      self.write_partitioned_parquet(df=vehicles_enriched, path="/content/lake/gold/vehicles")

In [None]:
if __name__ == '__main__':

    # init spark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.master('local').appName('ETL Program').getOrCreate()

    print("Starting ETL program")
    etl = ETLTask(spark)

    # run tasks
    print("Running Task - Ingestion Vehicles")
    etl.ingestion_vehicles()

    print("Running Task - Ingestion lines")
    etl.ingestion_lines()

    print("Running Task - Ingestion municipalities")
    etl.ingestion_municipalities()

    print("Running Task - Cleansing Vehicles")
    etl.cleansing_vehicles()

    print("Running Task - Cleansing Lines")
    etl.cleansing_lines()

    print("Running Task - Cleansing municipalities")
    etl.cleansing_municipalities()


    etl.enrich()

    print("ETL program completed")

Starting ETL program
Running Task - Ingestion Vehicles
Running Task - Ingestion lines
Running Task - Ingestion municipalities
Running Task - Cleansing Vehicles
Running Task - Cleansing Lines
Running Task - Cleansing municipalities
ETL program completed


# ***Challenge 4 - answers***

In [None]:
vehicles_enriched = spark.read.parquet("/content/lake/gold/vehicles")

In [None]:
vehicles_enriched.show()

+-------+-------+--------------------+--------------+--------+---------+---------+----------+--------+---------------------+------------+---------+-------+-------------------+--------------------+--------------------+------------------------+----------+
|line_id|bearing|            block_id|current_status|      id| latitude|longitude|pattern_id|route_id|schedule_relationship|    shift_id|    speed|stop_id|          timestamp|             trip_id|           long_name|municipality_names_array|      date|
+-------+-------+--------------------+--------------+--------+---------+---------+----------+--------+---------------------+------------+---------+-------+-------------------+--------------------+--------------------+------------------------+----------+
|   2769|     29|             1807-11|    STOPPED_AT| 42|2513| 38.80098|-9.158984|  2769_0_2|  2769_0|            SCHEDULED|        1842|      0.0| 071139|2024-12-02 21:09:12|2769_0_2|1|1|2055...|Cidade Nova - Lis...|    [Loures, Odivelas

In [None]:
# Step 1: Explode array
df_exploded = vehicles_enriched.withColumn("municipality_names_array_exploded", explode(col("municipality_names_array")))
df_exploded.show()

+-------+-------+--------------------+--------------+--------+---------+---------+----------+--------+---------------------+------------+---------+-------+-------------------+--------------------+--------------------+------------------------+----------+---------------------------------+
|line_id|bearing|            block_id|current_status|      id| latitude|longitude|pattern_id|route_id|schedule_relationship|    shift_id|    speed|stop_id|          timestamp|             trip_id|           long_name|municipality_names_array|      date|municipality_names_array_exploded|
+-------+-------+--------------------+--------------+--------+---------+---------+----------+--------+---------------------+------------+---------+-------+-------------------+--------------------+--------------------+------------------------+----------+---------------------------------+
|   2769|     29|             1807-11|    STOPPED_AT| 42|2513| 38.80098|-9.158984|  2769_0_2|  2769_0|            SCHEDULED|        1842

In [None]:
df_exploded.count()

897

In [None]:
if df_exploded.count() > df_exploded.dropDuplicates().count():
  raise ValueError('Data has duplicates')

In [None]:
duplicate_rows = df_exploded.exceptAll(df_exploded.dropDuplicates())
duplicate_rows.show()

+-------+-------+--------+--------------+---+--------+---------+----------+--------+---------------------+--------+-----+-------+---------+-------+---------+------------------------+----+---------------------------------+
|line_id|bearing|block_id|current_status| id|latitude|longitude|pattern_id|route_id|schedule_relationship|shift_id|speed|stop_id|timestamp|trip_id|long_name|municipality_names_array|date|municipality_names_array_exploded|
+-------+-------+--------+--------------+---+--------+---------+----------+--------+---------------------+--------+-----+-------+---------+-------+---------+------------------------+----+---------------------------------+
+-------+-------+--------+--------------+---+--------+---------+----------+--------+---------------------+--------+-----+-------+---------+-------+---------+------------------------+----+---------------------------------+



In [None]:
df_dedup = df_exploded.dropDuplicates()
df_dedup.count()

897

In [None]:
#count of vehicles (id) that pass through that municipality
df_result = df_dedup.groupBy("municipality_names_array_exploded").count()
df_result.sort("count", ascending=False).show()

+---------------------------------+-----+
|municipality_names_array_exploded|count|
+---------------------------------+-----+
|                           Lisboa|  166|
|                           Sintra|   95|
|                           Loures|   92|
|                           Almada|   77|
|                          Amadora|   64|
|                           Seixal|   60|
|                         Odivelas|   56|
|                           Oeiras|   52|
|              Vila Franca de Xira|   37|
|                          Setúbal|   34|
|                          Montijo|   31|
|                          Cascais|   26|
|                         Barreiro|   21|
|                        Alcochete|   21|
|                          Palmela|   19|
|                            Moita|   19|
|                            Mafra|   14|
|                         Sesimbra|   12|
|                         Alenquer|    1|
+---------------------------------+-----+



In [None]:
df_result = df_dedup.groupBy("municipality_names_array_exploded").agg({"speed": "sum"})
df_result.sort("sum(speed)", ascending=False).show(truncate=False)

+---------------------------------+------------------+
|municipality_names_array_exploded|sum(speed)        |
+---------------------------------+------------------+
|Lisboa                           |1282.2222259044647|
|Sintra                           |588.6111105680466 |
|Loures                           |585.2777785658836 |
|Almada                           |508.0555568933487 |
|Amadora                          |353.61111211776733|
|Oeiras                           |339.4444465637207 |
|Vila Franca de Xira              |326.3888885974884 |
|Odivelas                         |305.277776658535  |
|Seixal                           |298.61111056804657|
|Setúbal                          |290.83333545923233|
|Montijo                          |274.1666651368141 |
|Palmela                          |223.88888722658157|
|Alcochete                        |209.72222119569778|
|Cascais                          |175.8333330154419 |
|Moita                            |144.44444274902344|
|Barreiro 

In [None]:
df_result = df_dedup.groupBy("municipality_names_array_exploded").agg({"speed": "avg"})
df_result.sort("avg(speed)", ascending=False).show(truncate=False)

+---------------------------------+------------------+
|municipality_names_array_exploded|avg(speed)        |
+---------------------------------+------------------+
|Palmela                          |11.783625643504294|
|Alcochete                        |9.986772437890371 |
|Mafra                            |9.325396831546511 |
|Montijo                          |8.844085972155295 |
|Vila Franca de Xira              |8.821321313445633 |
|Setúbal                          |8.553921631153893 |
|Lisboa                           |7.72423027653292  |
|Moita                            |7.602339092053865 |
|Cascais                          |6.762820500593919 |
|Almada                           |6.598124115498035 |
|Oeiras                           |6.527777818533091 |
|Barreiro                         |6.3756614071982245|
|Loures                           |6.361714984411779 |
|Sintra                           |6.195906427032069 |
|Sesimbra                         |6.064814865589142 |
|Amadora  

Questions:



1.   What are the top 3 municipalities by vehicles routes?
 - Lisboa, Sintra and Loures
2.   What are the top 3 municipalities with higher vehicle speed on average?
 - Palmela, Alcochete and Mafra
