In [1]:
from pyspark.sql.functions import col, trim, initcap, to_timestamp, count, when, split, trim
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType, IntegerType
import re
import pandas as pd

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 3, Finished, Available, Finished)

In [2]:
# Function to report null counts for each column
def report_nulls(df):
    null_counts = df.select([
        count(when(col(c).isNull(), c)).alias(c) for c in df.columns
    ]).toPandas().T
    null_counts.columns = ['Count of nulls']
    print(f"\nNull counts:\n{null_counts}")

# Function to drop duplicates and report counts
def drop_duplicates(df):
    initial_count = df.count()
    df = df.dropDuplicates()
    final_count = df.count()
    print(f"\nDropped {initial_count - final_count} duplicate rows (from {initial_count} to {final_count})")
    return df

# Function to drop rows where specified columns are null + report counts
def drop_na_columns(df, cols):
    if isinstance(cols, str):
        cols = [cols]
        
    initial_count = df.count()

    # Build AND condition: all columns are null
    condition = " AND ".join([f"{c} IS NULL" for c in cols])
    
    # Filter out rows where all cols are null
    df = df.filter(f"NOT ({condition})")
    
    final_count = df.count()
    rows_removed = initial_count - final_count
    col_list_str = ", ".join(cols)
    print(f"\nRemoved {rows_removed} rows where all of these columns are null: {col_list_str}")
    
    return df

# Function to trim string columns
def trim_string_columns(df, cols=None):
    if cols is None:
        cols = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]
    for c in cols:
        df = df.withColumn(c, trim(col(c)))
    print(f"\nTrimmed strings in columns: {cols}")
    return df

# Function to rename columns to PascalCase
def rename_columns(df):
    for col_name in df.columns:
        # Insert spaces before capital letters (handles camelCase, PascalCase)
        spaced = re.sub(r'(?<!^)(?=[A-Z])', ' ', col_name)
        # Replace separators with spaces, split, capitalize each, and join
        clean_name = ''.join(
            word.capitalize() 
            for word in spaced.replace("/", " ").replace("-", " ").replace("_", " ").split()
        )
        df = df.withColumnRenamed(col_name, clean_name)
    print("\nRenamed columns to PascalCase\n")
    return df

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 4, Finished, Available, Finished)

### **Live Times**

In [3]:
# Read table
stg_live_times = spark.read.table("raw_live_times")

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 5, Finished, Available, Finished)

In [4]:
display(stg_live_times.toPandas().head(3))

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 4af6cc1d-38b0-4b6e-afed-97a760ac5cc6)

In [5]:
from pyspark.sql.functions import from_utc_timestamp

# Cast to Integer
stg_live_times = stg_live_times.withColumn("popularTimesLivePercent", col("popularTimesLivePercent").cast(IntegerType()))

# Convert UTC string to PST timestamp and replace original column
stg_live_times = stg_live_times.withColumn(
    'DateTimeCollected', 
    from_utc_timestamp(to_timestamp(col('DateTimeCollected')), 'America/Vancouver')
)

# Apply cleaning steps
report_nulls(stg_live_times)
stg_live_times = drop_duplicates(stg_live_times)
stg_live_times = drop_na_columns(stg_live_times, ["placeId", "popularTimesLivePercent"])
stg_live_times = trim_string_columns(stg_live_times, cols=["placeId"])

# Rename columns
stg_live_times = rename_columns(stg_live_times)
stg_live_times = stg_live_times.withColumnRenamed("placeId", "PlaceID")

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 7, Finished, Available, Finished)


Null counts:
                         Count of nulls
placeId                              14
popularTimesLivePercent           56361
DateTimeCollected                     0

Dropped 1 duplicate rows (from 386471 to 386470)

Removed 14 rows where all of these columns are null: placeId, popularTimesLivePercent

Trimmed strings in columns: ['placeId']

Renamed columns to PascalCase



In [6]:
display(stg_live_times.toPandas().head(10))

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 51cf4e56-761d-40c5-a861-5bddb9d9847e)

In [7]:
from pyspark.sql.functions import col, date_trunc, hour

# Add Datetime column as rounded time
stg_live_times = stg_live_times.withColumn("Datetime", date_trunc("hour", col("DateTimeCollected")))

# Add Hour column
stg_live_times = stg_live_times.withColumn("Hour", hour(col("DateTimeCollected")))


StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 9, Finished, Available, Finished)

In [8]:
display(stg_live_times.toPandas().head(10))

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, af1009a7-75c2-4297-a363-ab9507899072)

In [10]:
report_nulls(stg_live_times)

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 12, Finished, Available, Finished)


Null counts:
                         Count of nulls
PlaceID                               0
PopularTimesLivePercent           56347
DateTimeCollected                     0
Datetime                              0
Hour                                  0


In [11]:
stg_live_times.count()

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 13, Finished, Available, Finished)

386456

In [12]:
display(stg_live_times.toPandas().head(10))

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 14, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 6f1cfb82-aeef-4385-ab5a-c3cc34eee101)

In [13]:
# Save to table
stg_live_times.write.format("delta").mode("overwrite").saveAsTable("stg_live_times")

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 15, Finished, Available, Finished)

### **Weather Historical**

In [14]:
# Read table
stg_weather_historical = spark.read.table("raw_weather_historical")

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 16, Finished, Available, Finished)

In [15]:
display(stg_weather_historical.toPandas().head(10))

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 17, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 47671bb0-b8ad-4e85-89db-ce41bbfa1a6f)

In [16]:
# Final schema preview
stg_weather_historical.printSchema()

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 18, Finished, Available, Finished)

root
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- date_int: string (nullable = true)
 |-- time_24: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- temp: string (nullable = true)
 |-- feelslike: string (nullable = true)
 |-- dew: string (nullable = true)
 |-- humidity: string (nullable = true)
 |-- precip: string (nullable = true)
 |-- precipprob: string (nullable = true)
 |-- preciptype: string (nullable = true)
 |-- snow: string (nullable = true)
 |-- snowdepth: string (nullable = true)
 |-- windgust: string (nullable = true)
 |-- windspeed: string (nullable = true)
 |-- winddir: string (nullable = true)
 |-- sealevelpressure: string (nullable = true)
 |-- cloudcover: string (nullable = true)
 |-- visibility: string (nullable = true)
 |-- solarradiation: string (nullable = true)
 |-- solarenergy: string (nullable = true)
 |-- uvindex: string (nullable = true)
 |-- severerisk: string (nullable = true)
 |-- conditions: string (nullab

In [17]:
from pyspark.sql.functions import to_date, to_timestamp, col

# Drop unwanted columns
stg_weather_historical = stg_weather_historical.drop("stations", "date_int",'time_24')

# Start casting
stg_weather_historical = (
    stg_weather_historical
    # Date and time formatting
    .withColumn("date", to_date("date", "yyyy-MM-dd"))
    .withColumn("datetime", to_timestamp("datetime"))
    
    # Convert to DoubleType
    .withColumn("temp", col("temp").cast(DoubleType()))
    .withColumn("feelslike", col("feelslike").cast(DoubleType()))
    .withColumn("dew", col("dew").cast(DoubleType()))
    .withColumn("humidity", col("humidity").cast(DoubleType()))
    .withColumn("precip", col("precip").cast(DoubleType()))
    .withColumn("snow", col("snow").cast(DoubleType()))
    .withColumn("snowdepth", col("snowdepth").cast(DoubleType()))
    .withColumn("windgust", col("windgust").cast(DoubleType()))
    .withColumn("windspeed", col("windspeed").cast(DoubleType()))
    .withColumn("sealevelpressure", col("sealevelpressure").cast(DoubleType()))
    .withColumn("visibility", col("visibility").cast(DoubleType()))
    .withColumn("solarenergy", col("solarenergy").cast(DoubleType()))
    
    # Convert to IntegerType
    .withColumn("precipprob", col("precipprob").cast(IntegerType()))
    .withColumn("winddir", col("winddir").cast(IntegerType()))
    .withColumn("cloudcover", col("cloudcover").cast(IntegerType()))
    .withColumn("solarradiation", col("solarradiation").cast(IntegerType()))
    .withColumn("uvindex", col("uvindex").cast(IntegerType()))
    .withColumn("severerisk", col("severerisk").cast(IntegerType()))
)

# Apply cleaning steps
report_nulls(stg_weather_historical)
stg_weather_historical = drop_duplicates(stg_weather_historical)
stg_weather_historical = rename_columns(stg_weather_historical)


StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 19, Finished, Available, Finished)


Null counts:
                  Count of nulls
name                           0
date                           0
datetime                       0
temp                           0
feelslike                      0
dew                            0
humidity                       0
precip                         0
precipprob                     0
preciptype                  7258
snow                           0
snowdepth                      0
windgust                       0
windspeed                      0
winddir                        0
sealevelpressure             936
cloudcover                     0
visibility                     0
solarradiation                 0
solarenergy                    0
uvindex                        0
severerisk                     0
conditions                     0
icon                         744

Dropped 48 duplicate rows (from 9143 to 9095)

Renamed columns to PascalCase



In [18]:
display(stg_weather_historical.toPandas().head(10))

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 20, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 75cedce8-a6c5-4b51-a76e-e3d3ae372568)

In [19]:
# Add precip_rain and precip_snow
from pyspark.sql.functions import col, when, lit, dayofweek

stg_weather_historical = (
    stg_weather_historical
    .withColumn("Preciptype", when(col("Preciptype").isNull(), lit("none")).otherwise(col("Preciptype")))
    .withColumn("IsRaining", when(col("Preciptype").contains("rain") | col("Icon").contains("rain"), 1).otherwise(0))
    .withColumn("IsSnowing", when(col("Preciptype").contains("snow"), 1).otherwise(0))
)


display(stg_weather_historical.orderBy("Datetime").toPandas().head(10))


StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 21, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a89bd334-577d-49f6-a5d9-436b684cb60f)

In [20]:
# Save to table
stg_weather_historical.write.format("delta").mode("overwrite").saveAsTable("stg_weather_historical")

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 22, Finished, Available, Finished)

### **Weather Forecast**

In [21]:
# Read table
stg_weather_forecast = spark.read.table("raw_weather_forecast")

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 23, Finished, Available, Finished)

In [22]:
display(stg_weather_forecast.toPandas().head(10))

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 24, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 07e3bdeb-a3c8-4653-8e8a-6a6bbf0f4b5a)

In [23]:
# Final schema preview
stg_weather_forecast.printSchema()

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 25, Finished, Available, Finished)

root
 |-- name: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- temp: string (nullable = true)
 |-- feelslike: string (nullable = true)
 |-- dew: string (nullable = true)
 |-- humidity: string (nullable = true)
 |-- precip: string (nullable = true)
 |-- precipprob: string (nullable = true)
 |-- preciptype: string (nullable = true)
 |-- snow: string (nullable = true)
 |-- snowdepth: string (nullable = true)
 |-- windgust: string (nullable = true)
 |-- windspeed: string (nullable = true)
 |-- winddir: string (nullable = true)
 |-- sealevelpressure: string (nullable = true)
 |-- cloudcover: string (nullable = true)
 |-- visibility: string (nullable = true)
 |-- solarradiation: string (nullable = true)
 |-- solarenergy: string (nullable = true)
 |-- uvindex: string (nullable = true)
 |-- severerisk: string (nullable = true)
 |-- conditions: string (nullable = true)
 |-- icon: string (nullable = true)
 |-- stations: string (nullable = true)



In [24]:
from pyspark.sql.functions import to_date, to_timestamp

# Drop unwanted columns
stg_weather_forecast = stg_weather_forecast.drop("stations","severerisk")

# Start casting
stg_weather_forecast = (
    stg_weather_forecast
    # Date and time formatting
    .withColumn("datetime", to_timestamp("datetime"))
    
    # Convert to DoubleType
    .withColumn("temp", col("temp").cast(DoubleType()))
    .withColumn("feelslike", col("feelslike").cast(DoubleType()))
    .withColumn("dew", col("dew").cast(DoubleType()))
    .withColumn("humidity", col("humidity").cast(DoubleType()))
    .withColumn("precip", col("precip").cast(DoubleType()))
    .withColumn("snow", col("snow").cast(DoubleType()))
    .withColumn("snowdepth", col("snowdepth").cast(DoubleType()))
    .withColumn("windgust", col("windgust").cast(DoubleType()))
    .withColumn("windspeed", col("windspeed").cast(DoubleType()))
    .withColumn("sealevelpressure", col("sealevelpressure").cast(DoubleType()))
    .withColumn("visibility", col("visibility").cast(DoubleType()))
    .withColumn("solarenergy", col("solarenergy").cast(DoubleType()))
    
    # Convert to IntegerType
    .withColumn("precipprob", col("precipprob").cast(IntegerType()))
    .withColumn("winddir", col("winddir").cast(IntegerType()))
    .withColumn("cloudcover", col("cloudcover").cast(IntegerType()))
    .withColumn("solarradiation", col("solarradiation").cast(IntegerType()))
    .withColumn("uvindex", col("uvindex").cast(IntegerType()))
    #.withColumn("severerisk", col("severerisk").cast(IntegerType()))
)

# Apply cleaning steps
report_nulls(stg_weather_forecast)
stg_weather_forecast = drop_duplicates(stg_weather_forecast)
stg_weather_forecast = rename_columns(stg_weather_forecast)


StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 26, Finished, Available, Finished)


Null counts:
                  Count of nulls
name                           0
datetime                       0
temp                           0
feelslike                      0
dew                            0
humidity                       0
precip                         0
precipprob                     0
preciptype                   127
snow                           0
snowdepth                      0
windgust                       0
windspeed                      0
winddir                        0
sealevelpressure               0
cloudcover                     0
visibility                     0
solarradiation                 0
solarenergy                    0
uvindex                        0
conditions                     0
icon                           0

Dropped 0 duplicate rows (from 192 to 192)

Renamed columns to PascalCase



In [25]:
display(stg_weather_forecast.toPandas().head(10))

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 27, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d2b447fb-95ce-4ac3-95f9-a7ce0d00535a)

In [26]:
# Add precip_rain and precip_snow
from pyspark.sql.functions import col, when, lit, dayofweek

stg_weather_forecast = (
    stg_weather_forecast
    .withColumn("Preciptype", when(col("Preciptype").isNull(), lit("none")).otherwise(col("Preciptype")))
    .withColumn("IsRaining", when(col("Preciptype").contains("rain") | col("Icon").contains("rain"), 1).otherwise(0))
    .withColumn("IsSnowing", when(col("Preciptype").contains("snow"), 1).otherwise(0))
)

display(stg_weather_forecast.orderBy("Datetime").toPandas().head(10))

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 28, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 64625a06-86bf-4126-98c1-b39913d8f1ee)

In [27]:
# Final schema preview
stg_weather_forecast.printSchema()

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 29, Finished, Available, Finished)

root
 |-- Name: string (nullable = true)
 |-- Datetime: timestamp (nullable = true)
 |-- Temp: double (nullable = true)
 |-- Feelslike: double (nullable = true)
 |-- Dew: double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Precip: double (nullable = true)
 |-- Precipprob: integer (nullable = true)
 |-- Preciptype: string (nullable = true)
 |-- Snow: double (nullable = true)
 |-- Snowdepth: double (nullable = true)
 |-- Windgust: double (nullable = true)
 |-- Windspeed: double (nullable = true)
 |-- Winddir: integer (nullable = true)
 |-- Sealevelpressure: double (nullable = true)
 |-- Cloudcover: integer (nullable = true)
 |-- Visibility: double (nullable = true)
 |-- Solarradiation: integer (nullable = true)
 |-- Solarenergy: double (nullable = true)
 |-- Uvindex: integer (nullable = true)
 |-- Conditions: string (nullable = true)
 |-- Icon: string (nullable = true)
 |-- IsRaining: integer (nullable = false)
 |-- IsSnowing: integer (nullable = false)



In [28]:
# Save to table
stg_weather_forecast.write.format("delta").mode("overwrite").saveAsTable("stg_weather_forecast")

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 30, Finished, Available, Finished)

### **Weather Combined**

In [29]:
from pyspark.sql.functions import to_date, min, max
from datetime import timedelta

# Read both tables from Silver layer
stg_weather_historical = spark.read.table("stg_weather_historical")
stg_weather_forecast = spark.read.table("stg_weather_forecast")

# Get last date from historical data
last_date = stg_weather_historical.select(max(to_date("Datetime"))).first()[0]

# Calculate the expected first date for forecast
expected_forecast_start = last_date + timedelta(days=1)

# Dynamically trim forecast data to start from the expected date
stg_weather_forecast = stg_weather_forecast.filter(
    to_date("Datetime") >= expected_forecast_start
)

# Get actual first date from forecast after trimming
first_date = stg_weather_forecast.select(min(to_date("Datetime"))).first()[0]

# Check continuity
if first_date == expected_forecast_start:
    print(f"Continuous: historical ends {last_date}, forecast starts {first_date}")
    
    # Get common columns and preserve order from historical
    common_cols = [col for col in stg_weather_historical.columns if col in stg_weather_forecast.columns]

    # Select only common columns
    weather_historical_common = stg_weather_historical.select(common_cols)
    weather_forecast_common = stg_weather_forecast.select(common_cols)
    
    # Merge the datasets
    stg_weather_merged = weather_historical_common.unionByName(weather_forecast_common)

    # Rename columns
    stg_weather_merged = stg_weather_merged.withColumnRenamed("Name", "Location")

    # Display the first 10 rows sorted by Datetime
    display(stg_weather_merged.orderBy("Datetime").toPandas().head(10))

else:
    print(f" Gap detected! Expected forecast to start on {expected_forecast_start}, but got {first_date}")
    print(" Merge skipped due to non-continuous dates.")


StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 31, Finished, Available, Finished)

Continuous: historical ends 2025-05-15, forecast starts 2025-05-16


SynapseWidget(Synapse.DataFrame, f6bf9135-6a21-4651-b140-ec9e03cf5cf6)

In [30]:
# Count rows in each DataFrame
historical_count = weather_historical_common.count()
forecast_count = weather_forecast_common.count()
merged_count = stg_weather_merged.count()

# Print results
print(f"Row counts:")
print(f"- Historical Weather     : {historical_count}")
print(f"- Forecast Weather       : {forecast_count}")
print(f"- Merged Weather (Total) : {merged_count}")


StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 32, Finished, Available, Finished)

Row counts:
- Historical Weather     : 9095
- Forecast Weather       : 168
- Merged Weather (Total) : 9263


In [31]:
# Save to table
stg_weather_merged.write.format("delta").mode("overwrite").saveAsTable("stg_weather_merged")

StatementMeta(, e56a11a8-f336-4209-937a-3f6d96169759, 33, Finished, Available, Finished)