<a href="https://colab.research.google.com/github/joao-dias-25/dataeng-spark/blob/main/spark/challenges/challenge_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# CHALLENGE 4
##  Analyze data

- Query table "vehicles_enriched" in gold layer
- Aggregate data by municipality_name (array)
- Calculate:
  - count of vehicles (id) that pass through that municipality
  - sum speed of vehicles

Questions:
  - What are the top 3 municipalities by vehicles routes?
  - What are the top 3 municipalities with higher vehicle speed on average?


Tips:
- explode array into rows -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.explode.html


# Setting up PySpark

In [1]:
%pip install pyspark



In [2]:

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import requests

class ETLFlow:
    def __init__(self, spark: SparkSession) -> None:
        self.spark = spark

    def extract_from_file(self, format: str, path: str, **kwargs) -> DataFrame:
        df = self.spark.read.format(format).load(path)
        return df

    def extract_from_api(self, url: str, schema: StructType = None):
      response = requests.get(url)
      rdd = spark.sparkContext.parallelize(response.json())
      if schema:
        df = spark.read.schema(schema).json(rdd)
      else:
        df = spark.read.json(rdd)
      return df

    def load(self, df: DataFrame, format: str, path: str, **kwargs) -> None:
        df.coalesce(1).write.mode("overwrite").format(format).save(path)

    def load_partition(self, df: DataFrame, partition:str, format: str, path: str, **kwargs) -> None:
        df.coalesce(1).write.mode("overwrite").partitionBy(partition).format(format).save(path)

class ETLTask(ETLFlow):

    def __init__(self, spark: SparkSession) -> None:
        self.spark = spark

    def ingestion_vehicles(self):
      vehicle_schema = StructType([StructField('bearing', IntegerType(), True),
                                  StructField('block_id', StringType(), True),
                                  StructField('current_status', StringType(), True),
                                  StructField('id', StringType(), True),
                                  StructField('lat', FloatType(), True),
                                  StructField('line_id', StringType(), True),
                                  StructField('lon', FloatType(), True),
                                  StructField('pattern_id', StringType(), True),
                                  StructField('route_id', StringType(), True),
                                  StructField('schedule_relationship', StringType(), True),
                                  StructField('shift_id', StringType(), True),
                                  StructField('speed', FloatType(), True),
                                  StructField('stop_id', StringType(), True),
                                  StructField('timestamp', TimestampType(), True),
                                  StructField('trip_id', StringType(), True)])

      df = self.extract_from_api(url="https://api.carrismetropolitana.pt/vehicles", schema=vehicle_schema)
      df = df.withColumn('date', date_format('timestamp',"yyyyMMdd"))
      self.load(df=df, format="parquet", path="/content/lake/bronze/vehicles")

    def ingestion_lines(self):

      lines_schema = StructType([
                                 StructField('_corrupt_record', StringType(), True),
                                  StructField('color', StringType(), True),
                                  StructField("facilities",  ArrayType(StringType(), True), True),
                                  StructField('id', StringType(), True),
                                  StructField("localities",  ArrayType(StringType(), True), True),
                                  StructField('long_name', StringType(), True),
                                  StructField("municipalities",  ArrayType(StringType(), True), True),
                                  StructField("patterns",  ArrayType(StringType(), True), True),
                                  StructField("routes",  ArrayType(StringType(), True), True),
                                  StructField('short_name', StringType(), True),
                                  StructField('text_color', StringType(), True)
                                  ])
      df = self.extract_from_api(url="https://api.carrismetropolitana.pt/lines", schema=lines_schema)

      self.load(df=df, format="parquet", path="/content/lake/bronze/lines")

    def ingestion_municipalities(self):
      municipalities_schema = StructType([
                                 StructField('district_id', StringType(), True),
                                  StructField('district_name', StringType(), True),
                                  StructField("id",  StringType(), True),
                                  StructField('name', StringType(), True),
                                  StructField("prefix", StringType(), True),
                                  StructField('region_id', StringType(), True),
                                  StructField("region_name",  StringType(), True),

                                  ])

      df = self.extract_from_api(url="https://api.carrismetropolitana.pt/municipalities", schema=municipalities_schema)

      self.load(df=df, format="parquet", path="/content/lake/bronze/municipalities")

    def cleansing_vehicles(self):
      df = self.extract_from_file(format="parquet", path="/content/lake/bronze/vehicles")

      # transformations
      df = df.withColumnRenamed("lat", "latitude")
      df = df.withColumnRenamed("lon", "longitude")
      df = df.drop_duplicates()
      df = df.filter(df.current_status.isNotNull())

      self.load_partition(df=df, format="parquet", partition='date' ,path="/content/lake/silver/vehicles")

    def cleansing_lines(self):
      df = self.extract_from_file(format="parquet", path="/content/lake/bronze/lines")

      # transformations
      df = df.drop_duplicates()
      df = df.filter(df.long_name.isNotNull())
      self.load(df=df, format="parquet", path="/content/lake/silver/lines")

    def cleansing_municipalities(self):
      df = self.extract_from_file(format="parquet", path="/content/lake/bronze/municipalities")

      # transformations
      df = df.drop_duplicates()
      df = df.filter(df.name.isNotNull() & df.district_name.isNotNull())
      self.load(df=df, format="parquet", path="/content/lake/silver/municipalities")



    def enrich_vehicles(self):
      df_v = self.extract_from_file(format="parquet", path="/content/lake/silver/vehicles")
      df_l = self.extract_from_file(format="parquet", path="/content/lake/silver/lines")
      df_m = self.extract_from_file(format="parquet", path="/content/lake/silver/municipalities")

      # transformations
      # transformations

      df_l = df_l.withColumnRenamed("id", "line_id")
      dict = {row['id']:row['name'] for row in df_m.collect()}

      df = df_v.join(df_l.select('line_id', 'long_name','municipalities'), on='line_id' ,how='left')
      map_col = create_map([lit(x) for i in dict.items() for x in i])
      df = df.withColumn('municipality_names', transform('municipalities', lambda x: map_col[x]))

      self.load_partition(df=df, format="parquet", partition='date' ,path="/content/lake/gold/vehicles_enriched")


if __name__ == '__main__':

    # init spark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.master('local').appName('ETL Program').getOrCreate()

    print("Starting ETL program")
    etl = ETLTask(spark)

    # run tasks
    print("Running Task - Ingestion Vehicles")
    etl.ingestion_vehicles()

    print("Running Task - Ingestion lines")
    etl.ingestion_lines()

    print("Running Task - ingestion municipalities ")
    etl.ingestion_municipalities()

    print("Running Task - Cleansing Vehicles")
    etl.cleansing_vehicles()

    print("Running Task - Cleansing lines")
    etl.cleansing_lines()

    print("Running Task - Cleansing municipalities")
    etl.cleansing_municipalities()

    print("Running Task - enrich vehicles")
    etl.enrich_vehicles()

    #etl.enrich()

    print("ETL program completed")

Starting ETL program
Running Task - Ingestion Vehicles
Running Task - Ingestion lines
Running Task - ingestion municipalities 
Running Task - Cleansing Vehicles
Running Task - Cleansing lines
Running Task - Cleansing municipalities
Running Task - enrich vehicles
ETL program completed


In [24]:
df = ETLTask(spark).extract_from_file(format="parquet", path="/content/lake/gold/vehicles_enriched")
df = df.withColumn("municipality", explode(df.municipality_names))

#count of vehicles (id) that pass through that municipality
df.groupBy('municipality').agg(count('municipality').alias('lines_per_municipality'),
                               sum('speed').alias('sum_speed'),
                               avg('speed').alias('avg_speed')).orderBy(desc('lines_per_municipality')).show()


+-------------------+----------------------+------------------+------------------+
|       municipality|lines_per_municipality|         sum_speed|         avg_speed|
+-------------------+----------------------+------------------+------------------+
|             Lisboa|                   238|1307.2222224473953| 5.492530346417627|
|             Sintra|                   183| 990.0000005364418| 5.409836068505147|
|             Loures|                   152| 750.0000000596046|4.9342105267079255|
|             Almada|                   107| 559.7222218513489| 5.231048802349055|
|            Amadora|                   104|484.72222077846527|4.6607905844083195|
|             Oeiras|                   104|  436.111108481884| 4.193376043095038|
|           Odivelas|                    91|401.38888919353485| 4.410866914214669|
|             Seixal|                    79|425.27777779102325| 5.383263010012953|
|            Setúbal|                    71| 420.5555543899536|5.9233176674641355|
|   

In [25]:
df.groupBy('municipality').agg(count('municipality').alias('lines_per_municipality'),
                               sum('speed').alias('sum_speed'),
                               avg('speed').alias('avg_speed')).orderBy(desc('avg_speed')).show()

+--------------------+----------------------+------------------+------------------+
|        municipality|lines_per_municipality|         sum_speed|         avg_speed|
+--------------------+----------------------+------------------+------------------+
|       Torres Vedras|                     1| 13.61111068725586| 13.61111068725586|
|           Alcochete|                    23| 220.5555546283722| 9.589371940364009|
|               Mafra|                    50|456.38889133930206|  9.12777782678604|
|        Vendas Novas|                     2| 18.05555534362793| 9.027777671813965|
|             Montijo|                    39| 282.2222223877907| 7.236467240712582|
|             Palmela|                    53| 360.5555557012558| 6.802935013231242|
|Sobral de Monte A...|                     2|13.333333253860474| 6.666666626930237|
|            Alenquer|                     3| 19.44444441795349| 6.481481472651164|
|            Barreiro|                    24| 148.0555546283722| 6.168981442