## <u>Notebook by John Uzoma</u>

#### Read the XML text file as a string, then convert to bytes

In [None]:
# Define the input file path
input_file_path = "Files/Bronze/DublinWeatherForecast.txt"

# Read the text file using SparkContext and collect the content as a list of strings
file_lines = spark.sparkContext.textFile(input_file_path).collect()

# Convert the list of lines to a single string with newline characters
file_content = "\n".join(file_lines)

# Convert the string to bytes using UTF-8 encoding
file_bytes = file_content.encode("utf-8")

#### Parse XML data and convert to dataframe

In [None]:
import xml.etree.ElementTree as ET

# Parse XML data
root = ET.fromstring(file_bytes)

# Initialize empty list
data = []

# Find all <time> elements using a loop and capture relevant values
for time in root.findall(".//time"):
    dateTime = time.get("from") if time is not None else None
    location = time.find("./location")
    temperature = location.find("./temperature").get("value") if location.find("./temperature") is not None else None
    windDirection = location.find("./windDirection").get("name") if location.find("./windDirection") is not None else None
    windSpeed = location.find("./windSpeed").get("mps") if location.find("./windSpeed") is not None else None
    windGust = location.find("./windGust").get("mps") if location.find("./windGust") is not None else None
    globalRadiation = location.find("./globalRadiation").get("value") if location.find("./globalRadiation") is not None else None
    humidity = location.find("./humidity").get("value") if location.find("./humidity") is not None else None
    pressure = location.find("./pressure").get("value") if location.find("./pressure") is not None else None
    cloudiness = location.find("./cloudiness").get("percent") if location.find("./cloudiness") is not None else None
    dewpointTemperature = location.find("./dewpointTemperature").get("value") if location.find("./dewpointTemperature") is not None else None

    # Append values to list
    data.append((
        dateTime, 
        temperature, 
        windDirection, 
        windSpeed, 
        windGust, 
        globalRadiation, 
        humidity, 
        pressure,
        cloudiness,
        dewpointTemperature
    ))

# Define column names
columns = [
    "DateTime",
    "Temperature_celsius",
    "WindDirection",
    "WindSpeed_mps",
    "WindGust_mps",
    "GlobalRadiation_wpsqm",
    "Humidity_percent",
    "Pressure_hPa",
    "Cloudiness_percent",
    "DewpointTemperature_celsius"
]

# Create the DataFrame with column names, dropping any null records
df1 = spark.createDataFrame(data, columns).na.drop()

In [None]:
# Initialize empty list
data = []

# Find all <time> elements using a loop and capture relevant values
for time in root.findall(".//time"):
    forecastFrom = time.get("from") if time is not None else None
    forecastTo = time.get("to") if time is not None else None
    location = time.find("./location")
    precipitation = location.find("./precipitation").get("value") if location.find("./precipitation") is not None else None
    symbol = location.find("./symbol").get("id") if location.find("./symbol") is not None else None

    # Append values to list
    data.append((
        forecastFrom,
        forecastTo,
        precipitation,
        symbol
    ))

# Define column names
columns = [
    "ForecastFrom",
    "ForecastTo",
    "Precipitation_mm",
    "WeatherType"
]

# Create the DataFrame with column names, dropping any null records
df2 = spark.createDataFrame(data, columns).na.drop()

In [None]:
# Perform the inner join between df1 and df2, keeping all columns from both dataframes
joined_df = df1.join(df2, df1["DateTime"] == df2["ForecastTo"], "inner").select(df1["*"], df2["*"])

#### Convert datatype(s)

In [None]:
from pyspark.sql.functions import col, to_timestamp, from_utc_timestamp
from pyspark.sql.types import FloatType

# Store to-be timestamp columns in a list
timestamp_cols = ["DateTime", "ForecastFrom", "ForecastTo"]

# Store to-be float columns in a list
float_cols = [
    "Temperature_celsius", 
    "WindSpeed_mps",
    "WindGust_mps",
    "GlobalRadiation_wpsqm",
    "Humidity_percent",
    "Pressure_hPa",
    "Cloudiness_percent",
    "DewpointTemperature_celsius",
    "Precipitation_mm"
]

# Convert relevant columns to Irish timestamp
for col_name in timestamp_cols:
    joined_df = joined_df.withColumn(col_name, from_utc_timestamp(col(col_name), "Europe/Dublin"))

# Convert relevant columns to float
for col_name in float_cols:
    joined_df = joined_df.withColumn(col_name, col(col_name).cast(FloatType()))

#### Add new columns to store previous values of metrics

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

# Define the window specification
windowSpec = Window.orderBy("DateTime")

# Add previous temperature column
joined_df = joined_df.withColumn("PreviousTemperature", lag(col("Temperature_celsius"), 1).over(windowSpec)) \
                .withColumn("PreviousCloudiness", lag(col("Cloudiness_percent"), 1).over(windowSpec)) \
                .withColumn("PreviousWindSpeed", lag(col("WindSpeed_mps"), 1).over(windowSpec)) \
                .withColumn("PreviousWindGust", lag(col("WindGust_mps"), 1).over(windowSpec)) \
                .withColumn("PreviousGlobalRadiation", lag(col("GlobalRadiation_wpsqm"), 1).over(windowSpec)) \
                .withColumn("PreviousHumidity", lag(col("Humidity_percent"), 1).over(windowSpec)) \
                .withColumn("PreviousPressure", lag(col("Pressure_hPa"), 1).over(windowSpec)) \
                .withColumn("PreviousDewpointTemperature", lag(col("DewpointTemperature_celsius"), 1).over(windowSpec)) \
                .withColumn("PreviousPrecipitation", lag(col("Precipitation_mm"), 1).over(windowSpec)) 

#### More transformations

In [None]:
from pyspark.sql.functions import col, substring, to_date

# Split DateTime (Add Time and Date columns)
joined_df = joined_df.withColumn("Time", substring(col("DateTime"), 12, 19)) \
       .withColumn("Date", to_date(col("DateTime")))

# Drop DateTime column
joined_df = joined_df.drop("DateTime")

# Sort the dataframe in ascending order by DateTime
joined_df = joined_df.orderBy(col("Date").asc(), col("Time").asc())

#### Define schema for silver table

In [None]:
from pyspark.sql.types import TimestampType, StringType, FloatType, DateType
from delta.tables import DeltaTable

DeltaTable.createIfNotExists(spark) \
    .tableName("lakehouse.dublinweatherforecast_silver") \
    .addColumn("Date", DateType()) \
    .addColumn("Time", StringType()) \
    .addColumn("Temperature_celsius", FloatType()) \
    .addColumn("PreviousTemperature", FloatType()) \
    .addColumn("WindDirection", StringType()) \
    .addColumn("WindSpeed_mps", FloatType()) \
    .addColumn("PreviousWindSpeed", FloatType()) \
    .addColumn("WindGust_mps", FloatType()) \
    .addColumn("PreviousWindGust", FloatType()) \
    .addColumn("GlobalRadiation_wpsqm", FloatType()) \
    .addColumn("PreviousGlobalRadiation", FloatType()) \
    .addColumn("Humidity_percent", FloatType()) \
    .addColumn("PreviousHumidity", FloatType()) \
    .addColumn("Pressure_hPa", FloatType()) \
    .addColumn("PreviousPressure", FloatType()) \
    .addColumn("Cloudiness_percent", FloatType()) \
    .addColumn("PreviousCloudiness", FloatType()) \
    .addColumn("DewpointTemperature_celsius", FloatType()) \
    .addColumn("PreviousDewpointTemperature", FloatType()) \
    .addColumn("ForecastFrom", TimestampType()) \
    .addColumn("ForecastTo", TimestampType()) \
    .addColumn("Precipitation_mm", FloatType()) \
    .addColumn("PreviousPrecipitation", FloatType()) \
    .addColumn("WeatherType", StringType()) \
    .execute()

#### Optimize delta table writes

In [None]:
 # Enable V-Order
 spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
    
 # Enable automatic Delta optimized write
 spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")

#### Write the dataframe to silver table (upsert operation)

In [None]:
# Update existing records and insert new ones based on a condition defined by the columns: Date and Time
    
deltaTable = DeltaTable.forPath(spark, 'Tables/dublinweatherforecast_silver')    

dfUpdates = joined_df
    
deltaTable.alias('silver') \
  .merge(
    dfUpdates.alias('updates'),
    'silver.Date = updates.Date and silver.Time = updates.Time'
  ) \
   .whenMatchedUpdate(set =
    {
      "Temperature_celsius": "updates.Temperature_celsius",
      "PreviousTemperature": "updates.PreviousTemperature",
      "WindDirection": "updates.WindDirection",
      "WindSpeed_mps": "updates.WindSpeed_mps",
      "PreviousWindSpeed": "updates.PreviousWindSpeed",
      "WindGust_mps": "updates.WindGust_mps",
      "PreviousWindGust": "updates.PreviousWindGust",
      "GlobalRadiation_wpsqm": "updates.GlobalRadiation_wpsqm",
      "PreviousGlobalRadiation": "updates.PreviousGlobalRadiation",
      "Humidity_percent": "updates.Humidity_percent",
      "PreviousHumidity": "updates.PreviousHumidity",
      "Pressure_hPa": "updates.Pressure_hPa",
      "PreviousPressure": "updates.PreviousPressure",
      "Cloudiness_percent": "updates.Cloudiness_percent",
      "PreviousCloudiness": "updates.PreviousCloudiness",
      "DewpointTemperature_celsius": "updates.DewpointTemperature_celsius",
      "PreviousDewpointTemperature": "updates.PreviousDewpointTemperature",
      "ForecastFrom": "updates.ForecastFrom",
      "ForecastTo": "updates.ForecastTo",
      "Precipitation_mm": "updates.Precipitation_mm",
      "PreviousPrecipitation": "updates.PreviousPrecipitation",
      "WeatherType": "updates.WeatherType"
    }
  ) \
 .whenNotMatchedInsert(values =
    {
      "Date": "updates.Date",
      "Time": "updates.Time",
      "Temperature_celsius": "updates.Temperature_celsius",
      "PreviousTemperature": "updates.PreviousTemperature",
      "WindDirection": "updates.WindDirection",
      "WindSpeed_mps": "updates.WindSpeed_mps",
      "PreviousWindSpeed": "updates.PreviousWindSpeed",
      "WindGust_mps": "updates.WindGust_mps",
      "PreviousWindGust": "updates.PreviousWindGust",
      "GlobalRadiation_wpsqm": "updates.GlobalRadiation_wpsqm",
      "PreviousGlobalRadiation": "updates.PreviousGlobalRadiation",
      "Humidity_percent": "updates.Humidity_percent",
      "PreviousHumidity": "updates.PreviousHumidity",
      "Pressure_hPa": "updates.Pressure_hPa",
      "PreviousPressure": "updates.PreviousPressure",
      "Cloudiness_percent": "updates.Cloudiness_percent",
      "PreviousCloudiness": "updates.PreviousCloudiness",
      "DewpointTemperature_celsius": "updates.DewpointTemperature_celsius",
      "PreviousDewpointTemperature": "updates.PreviousDewpointTemperature",
      "ForecastFrom": "updates.ForecastFrom",
      "ForecastTo": "updates.ForecastTo",
      "Precipitation_mm": "updates.Precipitation_mm",
      "PreviousPrecipitation": "updates.PreviousPrecipitation",
      "WeatherType": "updates.WeatherType"
    }
  ) \
  .execute()