In [0]:
SCHEMA_NAME = "resp_health_db"
spark.sql(f"USE {SCHEMA_NAME}")

spark.sql("SHOW TABLES").show(truncate=False)


In [0]:
from pyspark.sql import functions as F

virus_daily_sql = """
CREATE OR REPLACE TEMP VIEW virus_daily AS
SELECT
  report_date AS date,
  province,
  -- average positivity across viruses where metric_type looks like a percentage
  AVG(CASE
        WHEN LOWER(metric_type) LIKE '%percent%' 
          OR LOWER(metric_type) LIKE '%positiv%'
        THEN metric_value
      END) AS avg_positivity,
  -- sum of other clinical metrics (outbreaks, hospitalizations, etc.)
  SUM(CASE
        WHEN NOT (LOWER(metric_type) LIKE '%percent%' 
               OR LOWER(metric_type) LIKE '%positiv%')
        THEN COALESCE(metric_value, 0)
      END) AS total_clinical_count
FROM respiratory_activity
GROUP BY report_date, province
"""

spark.sql(virus_daily_sql)
spark.sql("SELECT * FROM virus_daily LIMIT 10").show(truncate=False)


In [0]:
weather_daily_sql = """
CREATE OR REPLACE TEMP VIEW weather_daily AS
SELECT
  DATE(timestamp) AS date,
  province,
  AVG(temperature_c)    AS avg_temp,
  AVG(wind_chill_c)     AS avg_wind_chill,
  AVG(humidity_percent) AS avg_humidity
FROM weather_conditions
GROUP BY DATE(timestamp), province
"""

spark.sql(weather_daily_sql)
spark.sql("SELECT * FROM weather_daily LIMIT 10").show(truncate=False)


In [0]:
from pyspark.sql import Window

virus_df   = spark.table("virus_daily")
weather_df = spark.table("weather_daily")

# Join on date + province
joined = (
    virus_df.alias("v")
    .join(weather_df.alias("w"), on=["date", "province"], how="inner")
)

# ----- VIRUS RISK -----
# Combine positivity + clinical counts into a single intensity measure
virus_base = joined.withColumn(
    "virus_intensity",
    F.coalesce(F.col("avg_positivity"), F.lit(0.0)) +
    F.coalesce(F.col("total_clinical_count"), F.lit(0.0))
)

w_prov = Window.partitionBy("province")

virus_scaled = (
    virus_base
    .withColumn("virus_min", F.min("virus_intensity").over(w_prov))
    .withColumn("virus_max", F.max("virus_intensity").over(w_prov))
    .withColumn(
        "virus_risk_score",
        F.when(F.col("virus_max") == F.col("virus_min"), 0.0)
         .otherwise(
             (F.col("virus_intensity") - F.col("virus_min")) /
             (F.col("virus_max")   - F.col("virus_min")) * 100.0
         )
    )
)

# ----- COLD RISK -----
cold_base = virus_scaled.withColumn(
    "cold_index",
    F.when(F.col("avg_wind_chill").isNotNull(), -F.col("avg_wind_chill"))
     .otherwise(-F.col("avg_temp"))
)

cold_scaled = (
    cold_base
    .withColumn("cold_min", F.min("cold_index").over(w_prov))
    .withColumn("cold_max", F.max("cold_index").over(w_prov))
    .withColumn(
        "cold_risk_score",
        F.when(F.col("cold_max") == F.col("cold_min"), 0.0)
         .otherwise(
             (F.col("cold_index") - F.col("cold_min")) /
             (F.col("cold_max")   - F.col("cold_min")) * 100.0
         )
    )
)

# ----- Combined risk -----
scored = cold_scaled.withColumn(
    "combined_risk_score",
    (F.col("virus_risk_score") + F.col("cold_risk_score")) / 2.0
)

scored.select("date","province","virus_risk_score","cold_risk_score","combined_risk_score")\
      .show(10, truncate=False)


In [0]:
from pyspark.sql import functions as F

spark.sql("USE resp_health_db")

virus_df   = spark.table("virus_daily")
weather_df = spark.table("weather_daily")

print("virus_daily rows:", virus_df.count())
print("weather_daily rows:", weather_df.count())

print("\nvirus_daily date range:")
virus_df.select(
    F.min("date").alias("min_date"),
    F.max("date").alias("max_date")
).show()

print("\nweather_daily date range:")
weather_df.select(
    F.min("date").alias("min_date"),
    F.max("date").alias("max_date")
).show()

print("\nvirus_daily provinces:")
virus_df.select("province").distinct().orderBy("province").show(truncate=False)

print("\nweather_daily provinces:")
weather_df.select("province").distinct().orderBy("province").show(truncate=False)


In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window

spark.sql("USE resp_health_db")

virus_df = spark.table("virus_daily")

virus_df.show(10, truncate=False)
print("virus_daily rows:", virus_df.count())


virus_intensity metric

In [0]:
# Combine positivity and clinical counts
virus_base = virus_df.withColumn(
    "virus_intensity",
    F.coalesce(F.col("avg_positivity"), F.lit(0.0)) +
    F.coalesce(F.col("total_clinical_count"), F.lit(0.0))
)

# Since province is just 'Canada', we can scale across the whole table
w_all = Window.orderBy("date")

min_val = virus_base.agg(F.min("virus_intensity").alias("min")).collect()[0]["min"]
max_val = virus_base.agg(F.max("virus_intensity").alias("max")).collect()[0]["max"]

print("Intensity min/max:", min_val, max_val)

virus_scaled = virus_base.withColumn(
    "virus_risk_score",
    F.when(F.lit(max_val) == F.lit(min_val), F.lit(0.0))
     .otherwise(
         (F.col("virus_intensity") - F.lit(min_val)) /
         (F.lit(max_val) - F.lit(min_val)) * 100.0
     )
)

virus_scaled.select("date","province","avg_positivity","total_clinical_count","virus_risk_score")\
            .orderBy("date")\
            .show(20, truncate=False)


In [0]:
from pyspark.sql import Window

w_all = Window.partitionBy("province").orderBy("date")


combined scores (virus-only for now)

In [0]:
risk_scores_df = (
    virus_scaled
    .withColumn("cold_risk_score", F.lit(None).cast("double"))     # no weather yet
    .withColumn("combined_risk_score", F.col("virus_risk_score"))  # same as virus
    .withColumn(
        "risk_category",
        F.when(F.col("combined_risk_score") < 25,  "Low")
         .when(F.col("combined_risk_score") < 50, "Moderate")
         .when(F.col("combined_risk_score") < 75, "High")
         .otherwise("Very High")
    )
)

# Add id + created_at to match your existing table structure
risk_scores_df = (
    risk_scores_df
    .withColumn("id", F.row_number().over(w_all).cast("long"))
    .withColumn("created_at", F.current_timestamp())
    .select(
        "id",
        "date",
        "province",
        "virus_risk_score",
        "cold_risk_score",
        "combined_risk_score",
        "risk_category",
        "created_at"
    )
)

risk_scores_df.show(20, truncate=False)
print("Rows:", risk_scores_df.count())


In [0]:
risk_scores_df.show(10, truncate=False)
print("Rows:", risk_scores_df.count())


In [0]:
spark.sql("USE resp_health_db")

spark.sql("DROP TABLE IF EXISTS risk_scores")


In [0]:
risk_scores_df.write.mode("overwrite").saveAsTable("risk_scores")

spark.table("risk_scores").show(10, truncate=False)
print("Final count:", spark.table("risk_scores").count())


In [0]:
display(spark.table("risk_scores"))


In [0]:
# Install Meteostat in this cluster
%pip install meteostat


In [0]:
from datetime import datetime
import pandas as pd
from meteostat import Stations, Daily

from pyspark.sql import functions as F

spark.sql("USE resp_health_db")

# Use the same date range as virus_daily
start = datetime(2025, 8, 30)
end   = datetime(2025, 11, 29)

# 1) Find Canadian weather stations with daily data in that period
stations = (
    Stations()
    .region('CA')              # Canada
    .inventory('daily', (start, end))
)

stations_df = stations.fetch(10)  # get top 10 stations
print(stations_df)


In [0]:
# Pick the first 5 stations for national average
station_ids = stations_df.index[:5].tolist()
station_ids


In [0]:
# Fetch daily data for each station and combine
data_list = []

for sid in station_ids:
    d = Daily(sid, start, end).fetch()
    d["station"] = sid
    data_list.append(d)

weather_pd = pd.concat(data_list)

# Aggregate to national daily average
weather_pd = (
    weather_pd
    .groupby("time")
    .agg({
        "tavg": "mean",   # average temperature
        "tmin": "mean",
        "tmax": "mean",
        "wspd": "mean"    # wind speed
    })
    .reset_index()
)

print(weather_pd.head())
print(weather_pd.tail())


In [0]:
# Convert pandas -> Spark
weather_sdf = spark.createDataFrame(weather_pd)

# Rename 'time' to 'date' and add national province label
weather_sdf = (
    weather_sdf
    .withColumnRenamed("time", "date")
    .withColumn("province", F.lit("Canada"))
)

weather_sdf.createOrReplaceTempView("weather_daily_meteostat")

weather_sdf.show(10, truncate=False)


In [0]:
weather_sdf.select(F.min("date"), F.max("date")).show()


In [0]:
from pyspark.sql import Window

virus_df   = spark.table("virus_daily")             # has date, province='Canada', avg_positivity, total_clinical_count
weather_df = spark.table("weather_daily_meteostat") # date, tavg, tmin, tmax, wspd, province='Canada'

# Join on date + province
joined = (
    virus_df.alias("v")
    .join(weather_df.alias("w"), on=["date", "province"], how="inner")
)

print("Joined rows:", joined.count())
joined.select("date", "province", "avg_positivity", "total_clinical_count", "tavg", "wspd")\
      .orderBy("date")\
      .show(10, truncate=False)


In [0]:
virus_base = joined.withColumn(
    "virus_intensity",
    F.coalesce(F.col("avg_positivity"), F.lit(0.0)) +
    F.coalesce(F.col("total_clinical_count"), F.lit(0.0))
)

w_canada = Window.partitionBy("province")

virus_scaled = (
    virus_base
    .withColumn("virus_min", F.min("virus_intensity").over(w_canada))
    .withColumn("virus_max", F.max("virus_intensity").over(w_canada))
    .withColumn(
        "virus_risk_score",
        F.when(F.col("virus_max") == F.col("virus_min"), 0.0)
         .otherwise(
             (F.col("virus_intensity") - F.col("virus_min")) /
             (F.col("virus_max")   - F.col("virus_min")) * 100.0
         )
    )
)


In [0]:
cold_base = virus_scaled.withColumn(
    "cold_index",
    -F.col("tavg")   # colder (lower tavg) => higher index
)

cold_scaled = (
    cold_base
    .withColumn("cold_min", F.min("cold_index").over(w_canada))
    .withColumn("cold_max", F.max("cold_index").over(w_canada))
    .withColumn(
        "cold_risk_score",
        F.when(F.col("cold_max") == F.col("cold_min"), 0.0)
         .otherwise(
             (F.col("cold_index") - F.col("cold_min")) /
             (F.col("cold_max")   - F.col("cold_min")) * 100.0
         )
    )
)


In [0]:
scored = cold_scaled.withColumn(
    "combined_risk_score",
    0.6 * F.col("virus_risk_score") + 0.4 * F.col("cold_risk_score")
)

scored = scored.withColumn(
    "risk_category",
    F.when(F.col("combined_risk_score") < 25,  "Low")
     .when(F.col("combined_risk_score") < 50, "Moderate")
     .when(F.col("combined_risk_score") < 75, "High")
     .otherwise("Very High")
)

w_all = Window.partitionBy("province").orderBy("date")

risk_scores_df = (
    scored
    .select(
        "date",
        "province",
        "virus_risk_score",
        "cold_risk_score",
        "combined_risk_score",
        "risk_category"
    )
    .withColumn("id", F.row_number().over(w_all).cast("long"))
    .withColumn("created_at", F.current_timestamp())
    .select(
        "id",
        "date",
        "province",
        "virus_risk_score",
        "cold_risk_score",
        "combined_risk_score",
        "risk_category",
        "created_at"
    )
)

risk_scores_df.orderBy("date").show(20, truncate=False)
print("Rows:", risk_scores_df.count())


In [0]:
spark.sql("DROP TABLE IF EXISTS risk_scores")
risk_scores_df.write.mode("overwrite").saveAsTable("risk_scores")

spark.table("risk_scores").show(10, truncate=False)
print("Final count:", spark.table("risk_scores").count())


In [0]:
display(risk_scores_df)


In [0]:
clinical_raw = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("/FileStore/clinical/Clinical data - 2025-12-05.csv")
)

lab_raw = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("/FileStore/lab/Laboratory data - 2025-12-05.csv")
)


In [0]:
clinical_raw.show(5)
lab_raw.show(5)
