<a href="https://colab.research.google.com/github/tomassalcedas/dataeng/blob/main/batch_processing_evaluation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [42]:
import requests
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import *
from pyspark.sql.functions import expr, col, explode, countDistinct, sum, avg

In [43]:
!mkdir -p /content/lake/bronze
!mkdir -p /content/lake/silver
!mkdir -p /content/lake/gold

In [44]:
spark = SparkSession.builder.master("local").appName("CarrisETL").getOrCreate()

In [45]:
def fetch_endpoint(endpoint: str, schema: StructType):
    url = f"https://api.carrismetropolitana.pt/{endpoint}"
    response = requests.get(url)
    data = response.json()
    return spark.read.schema(schema).json(spark.sparkContext.parallelize(data))

In [46]:
class ETLFlow:
    def __init__(self, spark: SparkSession):
        self.spark = spark

    def extract_from_api(self, endpoint: str, schema: StructType):
        return fetch_endpoint(endpoint, schema)

    def load(self, df: DataFrame, format: str, path: str, partition_column: str = None):
        writer = df.coalesce(1).write.mode("overwrite").format(format)
        if partition_column:
            writer = writer.partitionBy(partition_column)
        writer.save(path)

In [47]:
class ETLTask(ETLFlow):
    def ingestion_vehicles(self):
        schema = StructType([
            StructField("bearing", IntegerType(), True),
            StructField("block_id", StringType(), True),
            StructField("current_status", StringType(), True),
            StructField("id", StringType(), True),
            StructField("lat", FloatType(), True),
            StructField("line_id", StringType(), True),
            StructField("lon", FloatType(), True),
            StructField("pattern_id", StringType(), True),
            StructField("route_id", StringType(), True),
            StructField("schedule_relationship", StringType(), True),
            StructField("shift_id", StringType(), True),
            StructField("speed", FloatType(), True),
            StructField("stop_id", StringType(), True),
            StructField("timestamp", LongType(), True),
            StructField("trip_id", StringType(), True)
        ])
        df = self.extract_from_api("vehicles", schema)
        df = df.withColumn("date", expr("date(from_unixtime(timestamp))"))
        self.load(df, "parquet", "/content/lake/bronze/vehicles", partition_column="date")

    def ingestion_lines(self):
        schema = StructType([
            StructField("color", StringType(), True),
            StructField("facilities", ArrayType(StringType()), True),
            StructField("id", StringType(), True),
            StructField("localities", ArrayType(StringType()), True),
            StructField("long_name", StringType(), True),
            StructField("municipalities", ArrayType(StringType()), True),
            StructField("patterns", ArrayType(StringType()), True),
            StructField("routes", ArrayType(StringType()), True),
            StructField("short_name", StringType(), True),
            StructField("text_color", StringType(), True)
        ])
        df = self.extract_from_api("lines", schema)
        self.load(df, "parquet", "/content/lake/bronze/lines")

    def ingestion_municipalities(self):
        schema = StructType([
            StructField("district_id", StringType(), True),
            StructField("district_name", StringType(), True),
            StructField("id", StringType(), True),
            StructField("name", StringType(), True),
            StructField("prefix", StringType(), True),
            StructField("region_id", StringType(), True),
            StructField("region_name", StringType(), True)
        ])
        df = self.extract_from_api("municipalities", schema)
        self.load(df, "parquet", "/content/lake/bronze/municipalities")

    def transform_and_save_silver(self):
        # VEHICLES
        df_vehicles = self.spark.read.parquet("/content/lake/bronze/vehicles")
        df_vehicles_clean = (
            df_vehicles
            .withColumnRenamed("lat", "latitude")
            .withColumnRenamed("lon", "longitude")
            .dropDuplicates()
            .filter("current_status IS NOT NULL")
        )
        self.load(df_vehicles_clean, "parquet", "/content/lake/silver/vehicles", partition_column="date")

        # LINES
        df_lines = self.spark.read.parquet("/content/lake/bronze/lines")
        df_lines_clean = (
            df_lines
            .dropDuplicates()
            .filter("long_name IS NOT NULL")
        )
        self.load(df_lines_clean, "parquet", "/content/lake/silver/lines")

        # MUNICIPALITIES
        df_municipalities = self.spark.read.parquet("/content/lake/bronze/municipalities")
        df_municipalities_clean = (
            df_municipalities
            .dropDuplicates()
            .filter("name IS NOT NULL AND district_name IS NOT NULL")
        )
        self.load(df_municipalities_clean, "parquet", "/content/lake/silver/municipalities")

In [48]:
if __name__ == "__main__":
    etl = ETLTask(spark)

    print("📥 Ingesting vehicles...")
    etl.ingestion_vehicles()

    print("📥 Ingesting lines...")
    etl.ingestion_lines()

    print("📥 Ingesting municipalities...")
    etl.ingestion_municipalities()

    print("✅ Bronze ingestion complete.")

📥 Ingesting vehicles...
📥 Ingesting lines...
📥 Ingesting municipalities...
✅ Bronze ingestion complete.


In [49]:
etl = ETLTask(spark)

print("📥 Ingesting Bronze data...")
etl.ingestion_vehicles()
etl.ingestion_lines()
etl.ingestion_municipalities()

print("✨ Transforming & writing Silver data...")
etl.transform_and_save_silver()

print("✅ Bronze and Silver ETL complete.")


📥 Ingesting Bronze data...
✨ Transforming & writing Silver data...
✅ Bronze and Silver ETL complete.


In [50]:
vehicles_schema = StructType([
    StructField("bearing", IntegerType(), True),
    StructField("block_id", StringType(), True),
    StructField("current_status", StringType(), True),
    StructField("id", StringType(), True),
    StructField("latitude", FloatType(), True),
    StructField("line_id", StringType(), True),
    StructField("longitude", FloatType(), True),
    StructField("pattern_id", StringType(), True),
    StructField("route_id", StringType(), True),
    StructField("schedule_relationship", StringType(), True),
    StructField("shift_id", StringType(), True),
    StructField("speed", FloatType(), True),
    StructField("stop_id", StringType(), True),
    StructField("timestamp", LongType(), True),
    StructField("trip_id", StringType(), True),
    StructField("date", DateType(), True)
])

lines_schema = StructType([
    StructField("color", StringType(), True),
    StructField("facilities", ArrayType(StringType()), True),
    StructField("id", StringType(), True),
    StructField("localities", ArrayType(StringType()), True),
    StructField("long_name", StringType(), True),
    StructField("municipalities", ArrayType(StringType()), True),
    StructField("patterns", ArrayType(StringType()), True),
    StructField("routes", ArrayType(StringType()), True),
    StructField("short_name", StringType(), True),
    StructField("text_color", StringType(), True)
])

municipalities_schema = StructType([
    StructField("district_id", StringType(), True),
    StructField("district_name", StringType(), True),
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("prefix", StringType(), True),
    StructField("region_id", StringType(), True),
    StructField("region_name", StringType(), True)
])


In [51]:
vehicles_df = spark.read.schema(vehicles_schema).parquet("/content/lake/silver/vehicles")
lines_df = spark.read.schema(lines_schema).parquet("/content/lake/silver/lines")
municipalities_df = spark.read.schema(municipalities_schema).parquet("/content/lake/silver/municipalities")

In [52]:
# Join vehicles with lines (line_id → id)
vehicles_lines_df = vehicles_df.join(
    lines_df.select(col("id").alias("line_id_join"), col("long_name"), col("municipalities")),
    vehicles_df.line_id == col("line_id_join"),
    how="left"
)

# Explode municipalities list to prepare for join
vehicles_lines_df = vehicles_lines_df.withColumn("municipality_id", explode("municipalities"))

# Join with municipalities (municipality_id → id)
vehicles_enriched_df = vehicles_lines_df.join(
    municipalities_df.select(col("id").alias("municipality_id_join"), col("name").alias("municipality_name")),
    vehicles_lines_df.municipality_id == col("municipality_id_join"),
    how="left"
)

# Select required columns
vehicles_enriched_df = vehicles_enriched_df.select(
    "bearing", "block_id", "current_status", "id", "latitude", "line_id", "longitude",
    "pattern_id", "route_id", "schedule_relationship", "shift_id", "speed",
    "stop_id", "timestamp", "trip_id", "date",
    col("long_name").alias("line_name"),
    col("municipality_name")
)

In [53]:
vehicles_enriched_df.coalesce(1).write \
    .mode("overwrite") \
    .partitionBy("date") \
    .parquet("/content/lake/gold/vehicles_enriched")

In [54]:
vehicles_enriched_df = spark.read.parquet("/content/lake/gold/vehicles_enriched")

In [55]:
aggregated_df = vehicles_enriched_df.groupBy("municipality_name").agg(
    countDistinct("id").alias("vehicle_count"),
    sum("speed").alias("total_speed"),
    avg("speed").alias("average_speed")
)

In [56]:
aggregated_df.orderBy(col("vehicle_count").desc()).show(3)

+-----------------+-------------+-----------------+-----------------+
|municipality_name|vehicle_count|      total_speed|    average_speed|
+-----------------+-------------+-----------------+-----------------+
|           Lisboa|          138|952.2222249507904|6.900161050368046|
|           Sintra|          117|599.7222203612328|  5.1258309432584|
|           Loures|           83|576.1111133098602|6.941097750721208|
+-----------------+-------------+-----------------+-----------------+
only showing top 3 rows



In [57]:
aggregated_df.orderBy(col("average_speed").desc()).show(3)

+-----------------+-------------+------------------+-----------------+
|municipality_name|vehicle_count|       total_speed|    average_speed|
+-----------------+-------------+------------------+-----------------+
|          Montijo|           21| 173.8888885974884|8.280423266547066|
|            Moita|           14|114.16666531562805|8.154761808259147|
|        Alcochete|           16|116.94444584846497| 7.30902786552906|
+-----------------+-------------+------------------+-----------------+
only showing top 3 rows

