<a href="https://colab.research.google.com/github/jorgeneves16/dataeng-dataprocessing/blob/main/dataprocessing_challenge.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.types import *
import requests
from functools import reduce # Import reduce from functools

class ETLFlow:
    def __init__(self, spark: SparkSession) -> None:
        self.spark = spark

    def extract_from_api(self, url: str, schema: StructType = None):
      response = requests.get(url)
      rdd = spark.sparkContext.parallelize(response.json())
      if schema:
        df = spark.read.schema(schema).json(rdd)
      else:
        df = spark.read.json(rdd)
      return df

    def extract_from_file(self, format: str, path: str, **kwargs) -> DataFrame:
        df = self.spark.read.format(format).load(path)
        return df

    def load(self, df: DataFrame, format: str, path: str, partition_column: str = None, **kwargs) -> None:
        if partition_column:
          df.coalesce(1).write.mode("overwrite").partitionBy(partition_column).format(format).save(path)
        else:
          df.coalesce(1).write.mode("overwrite").format(format).save(path)

class ETLTask(ETLFlow):

    def __init__(self, spark: SparkSession) -> None:
        self.spark = spark

    def ingestion_lines(self):
      # schema
      lines_schema = StructType([StructField('color', StringType(), True),
                                 StructField('facilities', ArrayType(StringType(), True), True),
                                 StructField('id', StringType(), True),
                                 StructField('localities',ArrayType(StringType(), True), True),
                                 StructField('long_name', StringType(), True),
                                 StructField('municipalities', ArrayType(StringType(), True), True),
                                 StructField('patterns', ArrayType(StringType(), True), True),
                                 StructField('routes', ArrayType(StringType(), True), True),
                                 StructField('short_name', StringType(), True), StructField('text_color', StringType(), True)])
      # ingestion
      df = self.extract_from_api(url="https://api.carrismetropolitana.pt/lines", schema=lines_schema)

      #ensure only 1 file is created
      df = df.repartition(1)

      # load
      self.load(df=df, format="parquet", path="content/lake/bronze/lines")


    def ingestion_vehicles(self):
      vehicle_schema = StructType([StructField('bearing', IntegerType(), True),
                                  StructField('block_id', StringType(), True),
                                  StructField('current_status', StringType(), True),
                                  StructField('id', StringType(), True),
                                  StructField('lat', FloatType(), True),
                                  StructField('line_id', StringType(), True),
                                  StructField('lon', FloatType(), True),
                                  StructField('pattern_id', StringType(), True),
                                  StructField('route_id', StringType(), True),
                                  StructField('schedule_relationship', StringType(), True),
                                  StructField('shift_id', StringType(), True),
                                  StructField('speed', FloatType(), True),
                                  StructField('stop_id', StringType(), True),
                                  StructField('timestamp', TimestampType(), True),
                                  StructField('trip_id', StringType(), True)])

      df = self.extract_from_api(url="https://api.carrismetropolitana.pt/vehicles", schema=vehicle_schema)

      df = df.withColumn("date", expr("date(timestamp)"))

      df = df.repartition(1)

      self.load(df=df, format="parquet", path="content/lake/bronze/vehicles", partition_column="date")


    def ingestion_municipalities(self):
      municipalities_schema = StructType([StructField('id', StringType(), True),
                                          StructField('name', StringType(), True),
                                          StructField('prefix', StringType(), True),
                                          StructField('district_id', StringType(), True),
                                          StructField('district_name', StringType(), True),
                                          StructField('region_id', StringType(), True),
                                          StructField('region_name', StringType(), True)])

      df = self.extract_from_api(url="https://api.carrismetropolitana.pt/municipalities", schema=municipalities_schema)

      df = df.repartition(1)

      self.load(df=df, format="parquet", path="content/lake/bronze/municipalities")


    def cleansing_vehicles(self):
        df = self.extract_from_file(format="parquet", path="content/lake/bronze/vehicles")

        df = df.withColumnRenamed("lat", "latitude") \
              .withColumnRenamed("lon", "longitude")

        df = df.drop_duplicates()

        df = df.filter(
            col("current_status").isNotNull() &
            (col("current_status") != "") &
            (col("current_status") != "NONE")
        )

        df = df.withColumn("date", to_date(col("timestamp")))

        df.show()

        self.load(df=df, format="parquet", path="content/lake/silver/vehicles", partition_column="date")


    def cleansing_lines(self):
        df = self.extract_from_file(format="parquet", path="content/lake/bronze/lines")

        df = df.drop_duplicates()

        df = df.filter(
            col("long_name").isNotNull() &
            (col("long_name") != "") &
            (col("long_name") != "NONE")
        )

        df.show()

        self.load(df=df, format="parquet", path="content/lake/silver/lines")


    def cleansing_municipalities(self):
        df = self.extract_from_file(format="parquet", path="content/lake/bronze/municipalities")

        df = df.drop_duplicates()

        df = df.filter(
            col("name").isNotNull() &
            (col("name") != "") &
            (col("name") != "NONE") &
            col("district_name").isNotNull() &
            (col("district_name") != "") &
            (col("district_name") != "NONE")
        )

        df.show()

        self.load(df=df, format="parquet", path="content/lake/silver/municipalities")


    def enrich_vehicles(self):
        df_vehicles = self.extract_from_file(format="parquet", path="content/lake/silver/vehicles")
        df_vehicles.show()

        df_lines = self.extract_from_file(format="parquet", path="content/lake/silver/lines")
        df_lines.show()

        df_municipalities = self.extract_from_file(format="parquet", path="content/lake/silver/municipalities")
        df_municipalities.show()

        #Explode municipalities array from lines to create one row per municipality
        df_lines_exploded = df_lines.select(
            F.col("id").alias("line_id_from_lines"),
            F.col("long_name").alias("line_name"),
            F.explode_outer("municipalities").alias("municipality_id"),
        )
        df_lines_exploded.show()

        #Join vehicles with lines exploded (left join on line_id)
        df_vehicles_lines = df_vehicles.join(
            df_lines_exploded,
            df_vehicles["line_id"] == df_lines_exploded["line_id_from_lines"],
            how="left"
        ).drop("line_id_from_lines")

        df_vehicles_lines.show()


        df_municipalities_selected = df_municipalities.select(
            F.col("id").alias("municipality_id"),
            F.col("name")
        )

        #Join with municipalities to get municipality name
        df_enriched = df_vehicles_lines.join(
            df_municipalities_selected,
            df_vehicles_lines["municipality_id"] == df_municipalities_selected["municipality_id"],
            how="left"
        )

        #Select all vehicle columns + line_name + municipality name
        vehicle_columns = [col for col in df_vehicles.columns]
        df_selected = df_enriched.select(
            *vehicle_columns,
            F.col("line_name"),
            F.col("name")
        )
        df_selected.show()

        #Group by all vehicle columns + line_name and collect municipality name into an array
        grouped_columns = vehicle_columns + ['line_name']
        df_enriched_final = df_selected.groupBy(grouped_columns).agg(
            F.collect_list("name").alias("municipality_name")
        )

        #Remove duplicates from municipality_name array and handle null values
        df_enriched_final = df_enriched_final.withColumn(
            "municipality_name",
            F.when(
                F.col("municipality_name").isNull(), F.array()
            ).when(
                F.size(F.col("municipality_name")) == 0, F.array()
            ).otherwise(
                F.array_distinct(F.col("municipality_name"))
            )
        )

        df_enriched_final.show()

        self.load(df=df_enriched_final, format="parquet", path="content/lake/gold/vehicles_enriched", partition_column="date")


if __name__ == '__main__':

    # init spark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.master('local').appName('ETL Program').getOrCreate()


    try:
      print("Starting ETL program")
      etl = ETLTask(spark)

      # run tasks
      print("Running Task - Ingestion Vehicles")
      etl.ingestion_vehicles()

      print("Running Task - Ingestion Lines")
      etl.ingestion_lines()

      print("Running Task - Ingestion Municipalities")
      etl.ingestion_municipalities()

      print("Running Task - Cleansing Vehicles")
      etl.cleansing_vehicles()

      print("Running Task - Cleansing Lines")
      etl.cleansing_lines()

      print("Running Task - Cleansing Municipalities")
      etl.cleansing_municipalities()

      print("Running Task - Final data enrichment")
      etl.enrich_vehicles()

    finally:
      print("Closing Spark Session")
      spark.stop()

Starting ETL program
Running Task - Ingestion Vehicles
Running Task - Ingestion Lines
Running Task - Ingestion Municipalities
Running Task - Cleansing Vehicles
+-------+--------------------+--------------+--------+---------+-------+---------+----------+--------+---------------------+------------+---------+-------+-------------------+--------------------+----------+
|bearing|            block_id|current_status|      id| latitude|line_id|longitude|pattern_id|route_id|schedule_relationship|    shift_id|    speed|stop_id|          timestamp|             trip_id|      date|
+-------+--------------------+--------------+--------+---------+-------+---------+----------+--------+---------------------+------------+---------+-------+-------------------+--------------------+----------+
|    277|20250707-64810274...| IN_TRANSIT_TO|44|12668|38.706974|   4705|-8.978909|  4705_0_1|  4705_0|            SCHEDULED|323530234560|5.2777777| 100083|2025-07-07 20:09:05|4705_0_1|1500|203...|2025-07-07|
|    188

In [12]:
# check results
spark.read.parquet("content/lake/bronze/vehicles").show()
spark.read.parquet("content/lake/bronze/lines").show()
spark.read.parquet("content/lake/bronze/municipalities").show()

spark.read.parquet("content/lake/gold/vehicles_enriched").show()

+-------+--------------------+--------------+--------+---------+-------+---------+----------+--------+---------------------+------------+---------+-------+-------------------+--------------------+----------+
|bearing|            block_id|current_status|      id|      lat|line_id|      lon|pattern_id|route_id|schedule_relationship|    shift_id|    speed|stop_id|          timestamp|             trip_id|      date|
+-------+--------------------+--------------+--------+---------+-------+---------+----------+--------+---------------------+------------+---------+-------+-------------------+--------------------+----------+
|      0|           7_7760-13|    STOPPED_AT| 41|1222|38.793205|   1625|-9.335123|  1625_0_2|  1625_0|            SCHEDULED|        7832|      0.0| 170091|2025-07-07 20:08:58|1625_0_2_2100_212...|2025-07-07|
|      0|       VER_DU_VU3029|    STOPPED_AT| 43|2103|38.622726|   3615|-9.098495|  3615_0_2|  3615_0|            SCHEDULED|      VU3075|      0.0| 140051|2025-07-07 20