<a href="https://colab.research.google.com/github/telmavcosta/data_processing/blob/main/spark/challenges/rep_challenge_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# CHALLENGE 4
##  Analyze data

- Query table "vehicles_enriched" in gold layer
- Aggregate data by municipality_name (array)
- Calculate:
  - count of vehicles (id) that pass through that municipality
  - sum speed of vehicles

Questions:
  - What are the top 3 municipalities by vehicles routes?
  - What are the top 3 municipalities with higher vehicle speed on average?


Tips:
- explode array into rows -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.explode.html


# Setting up PySpark

In [8]:
%pip install pyspark

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.types import *

import requests

class ETLFlow:
    def __init__(self, spark: SparkSession) -> None:
        self.spark = spark

    def extract_from_file(self, format: str, path: str, **kwargs) -> DataFrame:
        df = self.spark.read.format(format).load(path)
        return df

    def extract_from_api(self, url: str, schema: StructType = None):
      response = requests.get(url)
      rdd = spark.sparkContext.parallelize(response.json())
      if schema:
        df = spark.read.schema(schema).json(rdd)
      else:
        df = spark.read.json(rdd)
      return df

    def load(self, df: DataFrame, format: str, path: str, partition_column: str = None, **kwargs) -> None:
        if partition_column:
          df.coalesce(1).write.mode("overwrite").partitionBy(partition_column).format(format).save(path)
        else:
          df.coalesce(1).write.mode("overwrite").format(format).save(path)

class ETLTask(ETLFlow):

    def __init__(self, spark: SparkSession) -> None:
        self.spark = spark

    def ingestion_lines(self):
      # schema
      lines_schema = StructType([StructField('color', StringType(), True),
                                 StructField('facilities', ArrayType(StringType(), True), True),
                                 StructField('id', StringType(), True),
                                 StructField('localities',ArrayType(StringType(), True), True),
                                 StructField('long_name', StringType(), True),
                                 StructField('municipalities', ArrayType(StringType(), True), True),
                                 StructField('patterns', ArrayType(StringType(), True), True),
                                 StructField('routes', ArrayType(StringType(), True), True),
                                 StructField('short_name', StringType(), True), StructField('text_color', StringType(), True)])
      # ingestion
      df = self.extract_from_api(url="https://api.carrismetropolitana.pt/lines", schema=lines_schema)
      # load
      self.load(df=df, format="parquet", path="/content/lake/bronze/lines")


    def ingestion_vehicles(self):
      #schema
      vehicles_schema = StructType([StructField('bearing', IntegerType(), True),
                                  StructField('block_id', StringType(), True),
                                  StructField('current_status', StringType(), True),
                                  StructField('id', StringType(), True),
                                  StructField('lat', FloatType(), True),
                                  StructField('line_id', StringType(), True),
                                  StructField('lon', FloatType(), True),
                                  StructField('pattern_id', StringType(), True),
                                  StructField('route_id', StringType(), True),
                                  StructField('schedule_relationship', StringType(), True),
                                  StructField('shift_id', StringType(), True),
                                  StructField('speed', FloatType(), True),
                                  StructField('stop_id', StringType(), True),
                                  StructField('timestamp', TimestampType(), True),
                                  StructField('trip_id', StringType(), True)])

      #ingestion
      df = self.extract_from_api(url="https://api.carrismetropolitana.pt/vehicles", schema=vehicles_schema)

      # create date column
      df = df.withColumn("date", expr("date(timestamp)"))

      #load
      self.load(df=df, format="parquet", path="/content/lake/bronze/vehicles", partition_column="date")

    def ingestion_municipalities(self):
      # schema
      municipalities_schema = StructType([StructField('district_id', StringType(), True),
                                 StructField('district_name', StringType(), True),
                                 StructField('id', StringType(), True),
                                 StructField('name', StringType(), True),
                                 StructField('prefix', StringType(), True),
                                 StructField('region_id', StringType(), True),
                                 StructField('region_name', StringType(), True)])
      # ingestion
      df = self.extract_from_api(url="https://api.carrismetropolitana.pt/municipalities", schema=municipalities_schema)
      # load
      self.load(df=df, format="parquet", path="/content/lake/bronze/municipalities")

    def cleansing_vehicles(self):
      df = self.extract_from_file(format="parquet", path="/content/lake/bronze/vehicles")

      # transformations

      df = df.withColumnRenamed("lat", "latitude")
      df = df.withColumnRenamed("lon", "longitude")

      df = df.dropDuplicates()

      df = df.filter(df.current_status.isNotNull())

      if "_corrupt_record" in df.columns:
          df = df.filter(df["_corrupt_record"].isNotNull())

      # load silver layer partition by date
      self.load(df=df, format="parquet", path="/content/lake/silver/vehicles", partition_column="date")


    def cleansing_lines(self):
      df = self.extract_from_file(format="parquet", path="/content/lake/bronze/lines")

      # transformations

      df = df.dropDuplicates()

      df = df.filter(df.long_name.isNotNull())

      if "_corrupt_record" in df.columns:
          df = df.filter(df["_corrupt_record"].isNotNull())

      # load silver layer
      self.load(df=df, format="parquet", path="/content/lake/silver/lines")


    def cleansing_municipalities(self):
      df = self.extract_from_file(format="parquet", path="/content/lake/bronze/municipalities")

      # transformations

      df = df.dropDuplicates()

      #remove rows when the columns NAME or DISTRICT_NAME are null
      df = df.filter(df.name.isNotNull())
      df = df.filter(df.district_name.isNotNull())

      if "_corrupt_record" in df.columns:
          df = df.filter(df["_corrupt_record"].isNotNull())

      # load silver layer
      self.load(df=df, format="parquet", path="/content/lake/silver/municipalities")

    def enrich(self):

        vehicles_schema = StructType([StructField('bearing', IntegerType(), True),
                                  StructField('block_id', StringType(), True),
                                  StructField('current_status', StringType(), True),
                                  StructField('id', StringType(), True),
                                  StructField('latitude', FloatType(), True),
                                  StructField('line_id', StringType(), True),
                                  StructField('longitude', FloatType(), True),
                                  StructField('pattern_id', StringType(), True),
                                  StructField('route_id', StringType(), True),
                                  StructField('schedule_relationship', StringType(), True),
                                  StructField('shift_id', StringType(), True),
                                  StructField('speed', FloatType(), True),
                                  StructField('stop_id', StringType(), True),
                                  StructField('timestamp', TimestampType(), True),
                                  StructField('trip_id', StringType(), True),
                                  StructField('date', DateType(), True)])

        lines_schema = StructType([StructField('color', StringType(), True),
                                 StructField('facilities', ArrayType(StringType(), True), True),
                                 StructField('id', StringType(), True),
                                 StructField('localities',ArrayType(StringType(), True), True),
                                 StructField('long_name', StringType(), True),
                                 StructField('municipalities', ArrayType(StringType(), True), True),
                                 StructField('patterns', ArrayType(StringType(), True), True),
                                 StructField('routes', ArrayType(StringType(), True), True),
                                 StructField('short_name', StringType(), True), StructField('text_color', StringType(), True)])

        municipalities_schema = StructType([StructField('district_id', StringType(), True),
                                 StructField('district_name', StringType(), True),
                                 StructField('id', StringType(), True),
                                 StructField('name', StringType(), True),
                                 StructField('prefix', StringType(), True),
                                 StructField('region_id', StringType(), True),
                                 StructField('region_name', StringType(), True)])

        vehicles_df = spark.read.schema(vehicles_schema).parquet("/content/lake/silver/vehicles")
        lines_df = spark.read.schema(lines_schema).parquet("/content/lake/silver/lines")
        municipalities_df = spark.read.schema(municipalities_schema).parquet("/content/lake/silver/municipalities")

        lines_df = lines_df.withColumn("municipalities", F.explode("municipalities"))

        # join with vehicles df
        df_joined = vehicles_df.join(lines_df,vehicles_df["line_id"] == lines_df["id"], how="inner")
        #print(f'Total vehicles_df rows {vehicles_df.count()}')

        #select all columns from vehicles + lines.long_name (name: line_name, format:string)
        vehicles_lines_df = df_joined.select(vehicles_df["*"],col("long_name").alias("line_name"),col("municipalities"))
        #print(f'Total vehicles_lines df rows {vehicles_lines_df.count()}')

        # join with municipalities df
        df_joined_2 = vehicles_lines_df.join(municipalities_df,vehicles_lines_df["municipalities"] == municipalities_df["id"], how="inner")

        #select all columns from vehicles + lines.long_name + municipalities.name (name: municipality_name, format: array)
        vehicles_enriched_df = df_joined_2.select(vehicles_lines_df["*"],col("name").alias("municipality_name"))
        #print(f'Total vehicles enriched df rows {vehicles_enriched_df.count()}')

        #Recreate municipality_name array, group by "bearing","block_id" and "line_id" fields
        df_vehicles_grouped = vehicles_enriched_df.groupBy("bearing","block_id","line_id").agg(
            first("current_status").alias("current_status"),
            first("id").alias("id"),
            first("latitude").alias("latitude"),
            first("longitude").alias("longitude"),
            first("pattern_id").alias("pattern_id"),
            first("route_id").alias("route_id"),
            first("schedule_relationship").alias("schedule_relationship"),
            first("shift_id").alias("shift_id"),
            first("speed").alias("speed"),
            first("stop_id").alias("stop_id"),
            first("timestamp").alias("timestamp"),
            first("trip_id").alias("trip_id"),
            first("date").alias("date"),
            first("line_name").alias("line_name"),
            collect_list("municipality_name").alias("municipality_name"))

        #print(f'Total vehicles grouped {df_vehicles_grouped.count()}')

        # load data into gold layer, partition by date
        self.load(df=df_vehicles_grouped, format="parquet", path="/content/lake/gold/vehicles_enriched", partition_column="date")

    def analyze_data(self):

      #Query table "vehicles_enriched" in gold layer
      df_vehicles_enriched = self.extract_from_file(format="parquet", path="/content/lake/gold/vehicles_enriched")
      df_vehicles_enriched_exp = df_vehicles_enriched.withColumn("municipality_name", F.explode("municipality_name"))

      #Aggregate data by municipality_name (array)
      df_vehicles_enriched_agg = df_vehicles_enriched_exp.groupBy("municipality_name")

      #Calculate:
      #    count of vehicles (id) that pass through that municipality
      df_vehicles_enriched_agg.agg(count("id").alias("vehicles_by_municipality")).show(25,False)

      #    sum speed of vehicles
      df_vehicles_enriched_agg.agg(round(sum("speed"),2).alias("total_speed_by_municipality")).show(25,False)

      #What are the top 3 municipalities by vehicles routes
      df_vehicles_enriched_agg.agg(count("route_id").alias("routes_by_municipality")).orderBy(desc("routes_by_municipality")).show(3,False)

      #What are the top 3 municipalities with higher vehicle speed on average
      df_vehicles_enriched_agg.agg(round(avg("speed"),2).alias("avg_speed_by_municipality")).orderBy(desc("avg_speed_by_municipality")).show(3,False)

if __name__ == '__main__':

    # init spark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.master('local').appName('Challenge').getOrCreate()

    #set partition Overwrite Mode to Dynamic
    spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

    print("Starting ETL program")
    etl = ETLTask(spark)

    # run tasks
    print("Running Task - Ingestion Vehicles")
    etl.ingestion_vehicles()

    print("Running Task - Ingestion Lines")
    etl.ingestion_lines()

    print("Running Task - Ingestion Municipalities")
    etl.ingestion_municipalities()

    print("Running Task - Cleansing Vehicles")
    etl.cleansing_vehicles()

    print("Running Task - Cleansing Lines")
    etl.cleansing_lines()

    print("Running Task - Cleansing Municipalities")
    etl.cleansing_municipalities()

    print("Running Task - Enrich Process")
    etl.enrich()

    print("Running Task - Analyze Data")
    etl.analyze_data()

    print("Challenge completed")

Starting ETL program
Running Task - Ingestion Vehicles
Running Task - Ingestion Lines
Running Task - Ingestion Municipalities
Running Task - Cleansing Vehicles
Running Task - Cleansing Lines
Running Task - Cleansing Municipalities
Running Task - Enrich Process
Running Task - Analyze Data
+----------------------+------------------------+
|municipality_name     |vehicles_by_municipality|
+----------------------+------------------------+
|Arruda dos Vinhos     |3                       |
|Odivelas              |128                     |
|Vendas Novas          |3                       |
|Barreiro              |43                      |
|Sintra                |256                     |
|Palmela               |52                      |
|Alcochete             |45                      |
|Alenquer              |3                       |
|Cascais               |74                      |
|Vila Franca de Xira   |70                      |
|Moita                 |36                      |
|Amadora   