In [0]:
from pyspark.sql.functions import col, from_json, to_date, when, concat, lit, to_timestamp, current_date
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, ArrayType

# Bronze info to read from
bronze_table_path = '/mnt/data/bronze/weather'

# Silver location info
silver_dim_location_path = '/mnt/data/silver/weather_dim_location'
silver_dim_location_name = 'weather.silver_dim_location'

# Silver weather info
silver_weather_path = '/mnt/data/silver/weather_fct_cleaned'
silver_weather_name = 'weather.silver_weather_cleaned'

# Silver air quality info
silver_air_quality_path = '/mnt/data/silver/weather_air_quality_cleaned'
silver_air_quality_name = 'weather.silver_air_quality_cleaned'

In [0]:
bronze_df_full = spark.read.format('delta').load(bronze_table_path)
bronze_df_incremental = bronze_df_full.filter(col("ingestion_date") == current_date()) # Daily we only read new data ingested that day

print(f"Total bronze table records: {bronze_df_full.count()}")
print(f"Found {bronze_df_incremental.count()} new records to process today.")
bronze_df_incremental.show()

In [0]:
# Defining the json schema (only considering the important fields that will be taken to silver layer)
json_schema = StructType([
    StructField("location", StructType([
        StructField("name", StringType(), True),
        StructField("country", StringType(), True),
        StructField("region", StringType(), True),
        StructField("lat", StringType(), True), # Will be converted to Double
        StructField("lon", StringType(), True), # Will be converted to Double
        StructField("timezone_id", StringType(), True),
        StructField("localtime", StringType(), True) # Will be converted to Timestamp
    ]), True),
    StructField("current", StructType([
        StructField("observation_time", StringType(), True), # Will be converted to Timestamp
        StructField("temperature", DoubleType(), True),
        StructField("weather_code", IntegerType(), True),
        StructField("weather_descriptions", ArrayType(StringType(), True), True),
        StructField("air_quality", StructType([
            StructField("co", StringType(), True), # Will be converted to Double
            StructField("no2", StringType(), True), # Will be converted to Double
            StructField("o3", StringType(), True), # Will be converted to Double
            StructField("so2", StringType(), True), # Will be converted to Double
            StructField("pm2_5", StringType(), True), # Will be converted to Double
            StructField("pm10", StringType(), True), # Will be converted to Double
            StructField("us-epa-index", StringType(), True), # Will be converted to Integer
            StructField("gb-defra-index", StringType(), True) # Will be converted to Integer
        ]), True),
        StructField("wind_speed", DoubleType(), True),
        StructField("wind_degree", IntegerType(), True),
        StructField("wind_dir", StringType(), True),
        StructField("pressure", DoubleType(), True),
        StructField("precip", DoubleType(), True),
        StructField("humidity", IntegerType(), True),
        StructField("cloudcover", IntegerType(), True),
        StructField("feelslike", DoubleType(), True),
        StructField("uv_index", IntegerType(), True),
        StructField("visibility", DoubleType(), True)
    ]), True)
])

In [0]:
parsed_df = bronze_df_incremental.withColumn("parsed_json", from_json(col("raw_payload"), json_schema))\
                    .withColumn("city_id",
                                when(col("parsed_json.location.name") == "Gramado", 1)
                                .when(col("parsed_json.location.name") == "Punta Del Este", 2)
                                .when(col("parsed_json.location.name") == "Punta Arenas", 3)
                                .otherwise(None))
                    
display(parsed_df.select("city_id", "raw_payload", "parsed_json"))

In [0]:
# Flattening the nested columns into 3 dataframes (dimension location, weather measures and air quality measures)
silver_dim_location_df = parsed_df.select(
    col("city_id"),
    col("parsed_json.location.name").alias("city_name"),
    col("parsed_json.location.country").alias("country"),
    col("parsed_json.location.region").alias("region"),
    (col("parsed_json.location.lat").cast(DoubleType())).alias("latitude"),
    (col("parsed_json.location.lon").cast(DoubleType())).alias("longitude"),
    col("parsed_json.location.timezone_id").alias("timezone")
)

silver_weather_df = parsed_df.select(
    col("city_id"),
    to_timestamp(concat(col("ingestion_date").cast("string"), lit(" "), col("parsed_json.current.observation_time")), 
                "yyyy-MM-dd hh:mm a").alias("observation_time_utf"), # Gets the observation_time (e.g. "10:30 PM") and transforms to timestamp format and dtype
    (col("parsed_json.location.localtime").cast(TimestampType())).alias("localtime"),
    col("parsed_json.current.temperature").alias("temperature_celsius"),
    col("parsed_json.current.weather_code").alias("weather_code"),
    col("parsed_json.current.weather_descriptions")[0].alias("weather_description"),
    col("parsed_json.current.wind_speed").alias("wind_speed_kph"),
    col("parsed_json.current.wind_degree").alias("wind_degree"),
    col("parsed_json.current.wind_dir").alias("wind_direction"),
    col("parsed_json.current.pressure").alias("pressure_mb"),
    col("parsed_json.current.precip").alias("precipitation_mm"),
    col("parsed_json.current.humidity").alias("humidity"),
    col("parsed_json.current.cloudcover").alias("cloud_cover"),
    col("parsed_json.current.feelslike").alias("feels_like_celsius"),
    col("parsed_json.current.uv_index").alias("uv_index"),
    col("parsed_json.current.visibility").alias("visibility_km"),
    col("ingestion_date") # Keeping the ingestion metadata
)

silver_air_quality_df = parsed_df.select(
    col("city_id"),
    to_timestamp(concat(col("ingestion_date").cast("string"), lit(" "), col("parsed_json.current.observation_time")), 
                "yyyy-MM-dd hh:mm a").alias("observation_time_utf"), # Gets the observation_time (e.g. "10:30 PM") and transforms to timestamp format and dtype
    (col("parsed_json.location.localtime").cast(TimestampType())).alias("localtime"),
    (col("parsed_json.current.air_quality.co").cast(DoubleType())).alias("air_co"),
    (col("parsed_json.current.air_quality.no2").cast(DoubleType())).alias("air_no2"),
    (col("parsed_json.current.air_quality.o3").cast(DoubleType())).alias("air_o3"),
    (col("parsed_json.current.air_quality.so2").cast(DoubleType())).alias("air_so2"),
    (col("parsed_json.current.air_quality.pm2_5").cast(DoubleType())).alias("air_pm2_5"),
    (col("parsed_json.current.air_quality.pm10").cast(DoubleType())).alias("air_pm10"),
    (col("parsed_json.current.air_quality.`us-epa-index`").cast(IntegerType())).alias("air_quality_us_epa_index"),
    (col("parsed_json.current.air_quality.`gb-defra-index`").cast(IntegerType())).alias("air_quality_gb_defra_index"),
    col("ingestion_date") # Keeping the ingestion metadata
)

In [0]:
display(silver_dim_location_df)
display(silver_weather_df)
display(silver_air_quality_df)

In [0]:
# Writing the data to the silver tables
silver_dim_location_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(silver_dim_location_path)
print(f"{silver_dim_location_df.count()} rows added to {silver_dim_location_name}.")

silver_weather_df.write.format("delta").mode("append").option("mergeSchema", "true").save(silver_weather_path)
print(f"{silver_weather_df.count()} rows added to {silver_weather_name}.")

silver_air_quality_df.write.format("delta").mode("append").option("mergeSchema", "true").save(silver_air_quality_path)
print(f"{silver_air_quality_df.count()} rows added to {silver_air_quality_name}.")