In [0]:
import datetime
import requests
import json # I find out that I need this to read the "return" from the weather API
from pyspark.sql.functions import (
    col,            
    lit,            
    when,           
    to_timestamp,   
    date_format,    
    hour,           
    dayofweek,      
    count,          
    sum             
)
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    DoubleType,
    TimestampType
)

print("Toolbox opened successfully 🧙‍♂️")

In [0]:
df_raw_silver = spark.read.table("personal_projects.gru_airport.bronze_vra")

print("Silver data loaded successfully!!")
display(df_raw_silver)

In [0]:
from pyspark.sql.functions import col, to_timestamp
# Defining the time/date format ANAC use (Day/Month/Year Hour:Minute)
format_anac = "dd/MM/yyyy HH:mm"

df_silver = (
    df_raw_silver
    # here I choose to filter only GRU arrival/departures
    .filter(
        (col("sigla_icao_origem") == "SBGR") | 
        (col("sigla_icao_destino") == "SBGR")
    )
    # converting date >> transforming text in real Timestamp // overwrite the original columns with the new ones
    .withColumn("partida_prevista", to_timestamp(col("partida_prevista"), format_anac))
    .withColumn("partida_real",     to_timestamp(col("partida_real"),     format_anac))
    .withColumn("chegada_prevista", to_timestamp(col("chegada_prevista"), format_anac))
    .withColumn("chegada_real",     to_timestamp(col("chegada_real"),     format_anac))
)

print("Loaded, transformed and filtered data succesfully")
display(df_silver)

In [0]:
# Saving the silver table (Filtered and Transformed)
# Path: Catalog.Schema.TableName
(df_silver.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable("personal_projects.gru_airport.silver_vra"))

print("Silver Table saved successfully!")