# Analyze merged weather data set

## Objectives
- Load merged weather data set
- Compute statistical distribution
- Review the relationship between average hourly humidity and temperature

## Preamble


In [0]:
import os
import math
import mlflow
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
from pyspark.sql.functions import col, count, hour, to_date
sns.set_theme(style="darkgrid")

In [0]:
experiment_name = "/Lab_Project_FruitDropZGS/mlflow_experiment/analyze_merged_weather_data_set_experiment"
mlflow.set_experiment(experiment_name)

In [0]:
run_mlflow = mlflow.start_run()

local_tmp_artifact_dir_path = "/Workspace/Lab_Project_FruitDropZGS/notebook_artifacts/tmp/" + run_mlflow.info.run_id + "/artifacts/"

if not os.path.exists(local_tmp_artifact_dir_path):
  os.makedirs(local_tmp_artifact_dir_path)
  
local_tmp_artifact_dir_path

## Configure parameters

In [0]:
from merged_weather_params_all_stations import params_all_stations
from merged_weather_params_select_stations import params_select_stations

# params = params_all_stations
params = params_select_stations

mlflow.log_params(params)

params

In [0]:
if params["station"]:
  station_condition = col("station").isin(params["station"])
  mlflow.log_params({"station": params["station"]})
else:
  station_condition = col("station").isNotNull() | col("station").isNull()

## Load data

In [0]:
from merged_weather_data_expected_schema import merged_weather_data_expected_schema

merged_weather_data_expected_schema

In [0]:
initial_sdf = spark.table(params["merged_weather_data_set"])

merged_weather_data_set_sdf = initial_sdf.filter(station_condition).cache()

merged_weather_data_set_count_sdf = merged_weather_data_set_sdf.count()
mlflow.log_metric("merge_weather_data_set_count", merged_weather_data_set_count_sdf)

merged_weather_data_set_sdf.printSchema()
merged_weather_data_set_sdf.show()
merged_weather_data_set_count_sdf

In [0]:
assert merged_weather_data_expected_schema == merged_weather_data_set_sdf.schema

## Compute statistical distribution

In [0]:
merged_weather_data_set_summary_sdf = merged_weather_data_set_sdf.summary("count", "mean", "stddev", "min", "25%", "50%", "75%", "90%", "95%", "98%", "99%", "max")
merged_weather_data_set_summary_sdf.show()

In [0]:
merged_weather_data_set_summary_pdf = merged_weather_data_set_summary_sdf.toPandas()
merged_weather_data_set_summary_pdf.to_csv(local_tmp_artifact_dir_path + 'merged_weather_data_summary.csv', index=False)

## Count of duplicate records by station

In [0]:
count_by_datetime_station_sdf = merged_weather_data_set_sdf.groupBy("datetime", "station").count().filter(col("count") > 1)

count_by_datetime_station_sdf.show()
count_by_datetime_station_sdf.count()

In [0]:
duplicate_records_count_by_station_sdf = count_by_datetime_station_sdf.groupBy("station").agg(F.sum("count").alias("count_duplicates"))

duplicate_records_count_by_station_sdf.show()
duplicate_records_count_by_station_sdf.count()

## Count of unique records

In [0]:
unique_weather_sdf = merged_weather_data_set_sdf.dropDuplicates(['datetime', 'station'])

unique_weather_count_sdf = unique_weather_sdf.count()
mlflow.log_metric("unique_weather_count", unique_weather_count_sdf)

unique_weather_sdf.printSchema()
unique_weather_sdf.show()
unique_weather_count_sdf

In [0]:
merged_weather_data_set_sdf = unique_weather_sdf

## CDF diagram for average hourly temperature by station

In [0]:
station_avg_temperature_C_sdf = merged_weather_data_set_sdf.groupBy(
  "station", "avg_temperature_C"
).count().withColumnRenamed(
  "count", "count_observations"
).na.drop().orderBy(
"avg_temperature_C", ascending=False)

station_avg_temperature_C_sdf.printSchema()
station_avg_temperature_C_sdf.show()
station_avg_temperature_C_sdf.count()

In [0]:
station_avg_temperature_C_pdf = station_avg_temperature_C_sdf.toPandas()

percentiles = np.arange(0, 1.02, 0.02)

grouped = station_avg_temperature_C_pdf.groupby("station")
quantiles_pdf = grouped['avg_temperature_C'].quantile(q=percentiles).unstack()
quantiles_station_avg_temperature_C_pdf = quantiles_pdf.transpose().rename_axis(columns='quantiles')

quantiles_station_avg_temperature_C_pdf

In [0]:
fig, ax = plt.subplots(figsize=(28, 18))

for station in quantiles_station_avg_temperature_C_pdf.columns:
    plt.plot(
    quantiles_station_avg_temperature_C_pdf[station],
    quantiles_station_avg_temperature_C_pdf.index,
    label=station,
    marker="*",
    )

plt.title("CDF diagram for average hourly temperature by station")
plt.ylabel("Quantiles")
plt.xlabel("Average temperature (C)")
plt.legend(title="Station")
plt.grid(True)
plt.show()

fig.savefig(local_tmp_artifact_dir_path + f"cdf_avg_hourly_temp_by_station.png")

## CDF diagram for average hourly humidity by station

In [0]:
station_avg_humidity_percent_sdf = merged_weather_data_set_sdf.groupBy(
  "station", "avg_humidity_percent"
).count().withColumnRenamed(
  "count", "count_observations"
).na.drop().orderBy(
"avg_humidity_percent", ascending=False)

station_avg_humidity_percent_sdf.printSchema()
station_avg_humidity_percent_sdf.show()
station_avg_humidity_percent_sdf.count()

In [0]:
station_avg_humidity_percent_pdf = station_avg_humidity_percent_sdf.toPandas()

percentiles = np.arange(0, 1.02, 0.02)

grouped = station_avg_humidity_percent_pdf.groupby("station")
quantiles_pdf = grouped['avg_humidity_percent'].quantile(q=percentiles).unstack()
quantiles_station_avg_humidity_percent_pdf = quantiles_pdf.transpose().rename_axis(columns='quantiles')

quantiles_station_avg_humidity_percent_pdf

In [0]:
fig, ax = plt.subplots(figsize=(28, 18))

for station in quantiles_station_avg_humidity_percent_pdf.columns:
  plt.plot(
    quantiles_station_avg_humidity_percent_pdf[station],
    quantiles_station_avg_humidity_percent_pdf.index,
    label=station,
    marker="*",
  )

plt.title("CDF diagram for average humidity temperature by station")
plt.ylabel('Quantiles')
plt.xlabel("Average humidity (%)")
plt.legend(title="Station")
plt.grid(True)
plt.show()

fig.savefig(local_tmp_artifact_dir_path + f"cdf_avg_hourly_humidity_by_station.png")

## The relationship between average hourly humidity and temperature

In [0]:
avg_humidity_temperature_count_sdf = merged_weather_data_set_sdf.groupBy("avg_humidity_percent", "avg_temperature_C").count().withColumnRenamed("count", "count_observations")

avg_humidity_temperature_count_sdf.printSchema()
avg_humidity_temperature_count_sdf.show()
avg_humidity_temperature_count_sdf.count()

In [0]:
avg_humidity_temperature_count_pdf = avg_humidity_temperature_count_sdf.toPandas()

fig, ax = plt.subplots(figsize=(28, 18))
avg_humidity_temperature_count_pdf.plot.hexbin(
  "avg_temperature_C",
  "avg_humidity_percent",
  C="count_observations",
  reduce_C_function=np.sum,
  gridsize=50,
  ax=ax,
  edgecolors="grey",
  cmap="inferno",
  mincnt=1
)
ax.grid(True)

fig.savefig(local_tmp_artifact_dir_path + f"avg_humidity_temperature_count_pdf.png")

## The relationship between daily average humidity and temperature across time

In [0]:
avg_humidity_temperature_datetime_sdf = merged_weather_data_set_sdf.select("datetime", "avg_humidity_percent", "avg_temperature_C")

avg_humidity_temperature_datetime_sdf.printSchema()
avg_humidity_temperature_datetime_sdf.show()
avg_humidity_temperature_datetime_sdf.count()

In [0]:
avg_humidity_temperature_datetime_pdf = avg_humidity_temperature_datetime_sdf.toPandas()

resample_avg_humidity_temperature_datetime_pdf = avg_humidity_temperature_datetime_pdf.set_index("datetime").resample("D").mean()

resample_avg_humidity_temperature_datetime_pdf

In [0]:
fig, ax1 = plt.subplots(figsize=(28,18))

ax1.plot(
  resample_avg_humidity_temperature_datetime_pdf.index,
  resample_avg_humidity_temperature_datetime_pdf["avg_humidity_percent"],
  marker="*", 
  color="b"
)

ax1.set_xlabel("datetime")
ax1.set_ylabel("Average Humidity (%)", color="b")
ax1.tick_params("y", colors="b")

ax2 = ax1.twinx()
ax2.plot(
  resample_avg_humidity_temperature_datetime_pdf.index,
  resample_avg_humidity_temperature_datetime_pdf["avg_temperature_C"],
  marker="*",
  color="r"
)

ax2.set_ylabel("Average temperature (C)", color="r")
ax2.tick_params("y", colors="r")

plt.title("Daily average humidity and temperature over time")
plt.show()

fig.savefig(local_tmp_artifact_dir_path + f"daily_avg_humidity_temperature_across_time.png")

## The variance of high hourly temperature by station

In [0]:
high_temp_value_by_station_sdf = merged_weather_data_set_sdf.select("datetime", "station", "avg_temperature_C").filter(merged_weather_data_set_sdf.avg_temperature_C >= params["selected_high_temp_value"])

high_temp_value_by_station_sdf.printSchema()
high_temp_value_by_station_sdf.show()
high_temp_value_by_station_sdf.count()

In [0]:
avg_high_hourly_temp_value_by_station_pdf = high_temp_value_by_station_sdf.toPandas()

fig, ax = plt.subplots(figsize=(18,28))
sns.boxplot(x="avg_temperature_C", y="station", data=avg_high_hourly_temp_value_by_station_pdf, orient="h", ax=ax)
ax.set_title(f"Distribution of average hourly high temperature with {params['selected_high_temp_value']} degree Celsius across stations")
plt.show()

fig.savefig(local_tmp_artifact_dir_path + f"avg_high_hourly_temp_value_by_station.png")

### Percentage of hours with high average temperature by station

In [0]:
total_high_temp_value_by_station_sdf = high_temp_value_by_station_sdf.groupBy("station").agg(
  F.count("datetime").alias("total_hours_of_high_temperature"),
)
total_high_temp_value_by_station_sdf.show()
total_high_temp_value_by_station_sdf.count()

In [0]:
count_total_hours_observation_by_station_sdf = merged_weather_data_set_sdf.filter(col("avg_temperature_C").isNotNull()).groupBy("station").count().withColumnRenamed("count", "total_hours_observation")

count_total_hours_observation_by_station_sdf.show()
count_total_hours_observation_by_station_sdf.count()

In [0]:
joined_sdf = total_high_temp_value_by_station_sdf.join(count_total_hours_observation_by_station_sdf, "station", "left")

joined_sdf.printSchema()
joined_sdf.show()
joined_sdf.count()

In [0]:
percentage_of_total_hourly_high_temp_sdf = joined_sdf.withColumn("percentage", (col("total_hours_of_high_temperature") / col("total_hours_observation")) * 100).orderBy("percentage", ascending=False)

percentage_of_total_hourly_high_temp_sdf.printSchema()
percentage_of_total_hourly_high_temp_sdf.show()
percentage_of_total_hourly_high_temp_sdf.count()

## Filter complete time series weather data

In [0]:
from filter_complete_time_series_weather_data import filterCompleteWeatherTimeSeries

complete_timeseries_sdf = filterCompleteWeatherTimeSeries(merged_weather_data_set_sdf).cache()

complete_timeseries_count_sdf = complete_timeseries_sdf.count()
mlflow.log_metric("complete_timeseries_count", complete_timeseries_count_sdf)

complete_timeseries_sdf.printSchema()
complete_timeseries_sdf.show()
complete_timeseries_count_sdf

In [0]:
count_complete_timeseries_sdf = complete_timeseries_sdf.groupBy("datetime").count().withColumnRenamed("count", "count_records").cache()

count_complete_timeseries_sdf.printSchema()
count_complete_timeseries_sdf.show()
count_complete_timeseries_sdf.count()

In [0]:
count_complete_timeseries_pdf = count_complete_timeseries_sdf.toPandas()

count_complete_timeseries_pdf.set_index("datetime", inplace=True)

count_complete_timeseries_by_week_pdf = count_complete_timeseries_pdf.resample('W').count()

count_complete_timeseries_by_week_pdf

In [0]:
count_complete_timeseries_by_week_pdf.to_csv(local_tmp_artifact_dir_path + 'count_complete_timeseries_by_week.csv')

In [0]:
complete_timeseries_by_week_summary_pdf = count_complete_timeseries_by_week_pdf.describe()
complete_timeseries_by_week_summary_pdf

In [0]:
complete_timeseries_by_week_summary_pdf.reset_index().to_csv(local_tmp_artifact_dir_path + 'complete_timeseries_by_week_summary.csv', index=False)

### Daytime time series from complete time series data

In [0]:
daytime_timeseries_sdf = complete_timeseries_sdf.filter((hour("datetime") >= 6) & (hour("datetime") < 20)).cache()

daytime_timeseries_sdf.printSchema()
daytime_timeseries_sdf.show()
daytime_timeseries_sdf.count()

In [0]:
daytime_timeseries_count_sdf = daytime_timeseries_sdf.groupBy("datetime").count().withColumnRenamed("count", "count_records").cache()

daytime_timeseries_count_sdf.printSchema()
daytime_timeseries_count_sdf.show()
daytime_timeseries_count_sdf.count()

In [0]:
daytime_timeseries_count_pdf = daytime_timeseries_count_sdf.toPandas()

daytime_timeseries_count_pdf.set_index("datetime", inplace=True)

count_daytime_timeseries_by_week_pdf = daytime_timeseries_count_pdf.resample('W').count()

count_daytime_timeseries_by_week_pdf

In [0]:
count_daytime_timeseries_by_week_pdf.to_csv(local_tmp_artifact_dir_path + 'count_daytime_timeseries_by_week.csv')

In [0]:
count_daytime_timeseries_by_week_summary_pdf = count_daytime_timeseries_by_week_pdf.describe()
count_daytime_timeseries_by_week_summary_pdf

In [0]:
count_daytime_timeseries_by_week_summary_pdf.reset_index().to_csv(local_tmp_artifact_dir_path + 'count_daytime_timeseries_by_week_summary.csv', index=False)

## What is the difference between missing datetime and high temperature?

In [0]:
merged_weather_data_set_by_station_sdf = merged_weather_data_set_sdf.select(
  "datetime", 
  "station", 
  "avg_temperature_C"
).filter(
  F.col("avg_temperature_C").isNotNull()
).orderBy(
  "datetime", 
  ascending=True
)

merged_weather_data_set_by_station_sdf.printSchema()
merged_weather_data_set_by_station_sdf.show()
merged_weather_data_set_by_station_sdf.count()

In [0]:
grouped_weather_temperature_by_station_sdf = merged_weather_data_set_by_station_sdf.withColumn(
  "date", to_date("datetime")
).groupBy(
  "date",
  "station"
).agg(
  F.mean("avg_temperature_C").alias("daily_avg_temperature_C")
)
grouped_weather_temperature_by_station_sdf.printSchema()
grouped_weather_temperature_by_station_sdf.show()
grouped_weather_temperature_by_station_sdf.count()

### Daily average temperature by station

In [0]:
grouped_weather_temperature_by_station_pdf = grouped_weather_temperature_by_station_sdf.toPandas()

fig, ax = plt.subplots(figsize=(28, 16))

sns.lineplot(x="date", y="daily_avg_temperature_C", data=grouped_weather_temperature_by_station_pdf, hue="station", ax=ax)

ax.set_title("Daily average temperature by station")
ax.set_xlabel("Date")
ax.set_ylabel("Daily average temperature")
ax.grid(True)
ax.legend()

plt.show()
fig.savefig(local_tmp_artifact_dir_path + f"daily_average_temperature_by_station.png")

### Missing datetime from original data sources

In [0]:
from pyspark.sql.functions import expr

start_datetime = merged_weather_data_set_by_station_sdf.select(F.min("datetime")).first()[0]
end_datetime = merged_weather_data_set_by_station_sdf.select(F.max("datetime")).first()[0]

total_hours = (end_datetime - start_datetime).total_seconds() // 3600

complete_hourly_timeseries_sdf = spark.range(total_hours).select(
    (F.lit(start_datetime) + F.expr("INTERVAL 1 HOUR") * F.col("id")).alias("complete_datetime")
)

complete_hourly_timeseries_sdf.printSchema()
complete_hourly_timeseries_sdf.show()
complete_hourly_timeseries_sdf.count()

In [0]:
distinct_station_sdf = merged_weather_data_set_by_station_sdf.select("station").distinct()

complete_hourly_timeseries_with_station_sdf = complete_hourly_timeseries_sdf.crossJoin(distinct_station_sdf).withColumnRenamed("station", "WeatherStation")

complete_hourly_timeseries_with_station_sdf.printSchema()
complete_hourly_timeseries_with_station_sdf.show()
complete_hourly_timeseries_with_station_sdf.count()

In [0]:
joint_complete_timeseries_with_weather_features_sdf = complete_hourly_timeseries_with_station_sdf.join(
  merged_weather_data_set_by_station_sdf,
  (complete_hourly_timeseries_with_station_sdf["complete_datetime"] == merged_weather_data_set_by_station_sdf["datetime"]) &
  (complete_hourly_timeseries_with_station_sdf["WeatherStation"] == merged_weather_data_set_by_station_sdf["station"]),
  "left"
).drop("station").withColumnRenamed("WeatherStation", "station")

joint_complete_timeseries_with_weather_features_sdf.printSchema()
joint_complete_timeseries_with_weather_features_sdf.show()
joint_complete_timeseries_with_weather_features_sdf.count()

In [0]:
joint_complete_timeseries_with_weather_features_sdf.groupBy("station").count().show()

### Count of missing hourly datetime by station

In [0]:
joint_complete_timeseries_with_weather_features_pdf = joint_complete_timeseries_with_weather_features_sdf.toPandas()

missing_datetime_count_pdf = joint_complete_timeseries_with_weather_features_pdf.groupby("station")["datetime"].apply(lambda x: x.isnull().sum())
missing_datetime_count_pdf = missing_datetime_count_pdf.reset_index(name = "count_missing_hourly_datetime").sort_values(by="count_missing_hourly_datetime", ascending=False)
missing_datetime_count_pdf

In [0]:
complete_hourly_timeseries_pdf = complete_hourly_timeseries_sdf.toPandas()

complete_count = complete_hourly_timeseries_pdf.count()['complete_datetime']

missing_datetime_count_pdf["complete_hourly_datetime"] = complete_count
missing_datetime_count_pdf['missing_datetime_ratio'] = missing_datetime_count_pdf['count_missing_hourly_datetime'] / missing_datetime_count_pdf['complete_hourly_datetime']

missing_datetime_count_pdf

### Compare count of missing hourly date time and hourly high temperature records

In [0]:
percentage_of_total_hourly_high_temp_pdf = percentage_of_total_hourly_high_temp_sdf.select("station", "total_hours_of_high_temperature").toPandas()

joint_missing_datetime_with_total_hourly_high_temp_pdf = percentage_of_total_hourly_high_temp_pdf.merge(missing_datetime_count_pdf, on="station")

joint_missing_datetime_with_total_hourly_high_temp_pdf.sort_values(by="missing_datetime_ratio", ascending=False, inplace=True)

joint_missing_datetime_with_total_hourly_high_temp_pdf

In [0]:
joint_missing_datetime_with_total_hourly_high_temp_pdf.reset_index().to_csv(local_tmp_artifact_dir_path + 'missing_hourly_datetime_with_total_hourly_high_temp.csv', index=False)

## Are the missing data underestimate the cumulative weather stress? 

In [0]:
joint_complete_timeseries_with_weather_features_pdf.set_index("complete_datetime")

grouped_station = joint_complete_timeseries_with_weather_features_pdf.groupby("station").resample("W", on= "complete_datetime")

weekly_missing_cumulative_weather_stress_pdf = grouped_station.agg({
  "avg_temperature_C": lambda x: (x >= 32).sum(),
  "datetime": lambda x:  x.isnull().sum(),
  "complete_datetime": "count"
}).rename(columns = {
  "avg_temperature_C": "total_hourly_high_temperture",
  "datetime": "total_missing_hourly_datetime",
  "complete_datetime": "total_complete_hourly_datetime",
})

weekly_missing_cumulative_weather_stress_pdf = weekly_missing_cumulative_weather_stress_pdf.reset_index().sort_values(by=["station", "complete_datetime"])
weekly_missing_cumulative_weather_stress_pdf

In [0]:
weekly_missing_cumulative_weather_stress_pdf.reset_index().to_csv(local_tmp_artifact_dir_path + 'weekly_missing_cumulative_weather_stress.csv', index=False)

In [0]:
weekly_missing_cumulative_weather_stress_pdf['missing_hourly_datetime_ratio'] = (
    weekly_missing_cumulative_weather_stress_pdf['total_missing_hourly_datetime'] /
    weekly_missing_cumulative_weather_stress_pdf['total_complete_hourly_datetime']
)

weekly_missing_cumulative_weather_stress_pdf

In [0]:
filterd_weekly_missing_cumulative_weather_stress_pdf = weekly_missing_cumulative_weather_stress_pdf[
 (weekly_missing_cumulative_weather_stress_pdf["complete_datetime"].dt.month >= 5) & 
 (weekly_missing_cumulative_weather_stress_pdf["complete_datetime"].dt.month <= 10)
]

filterd_weekly_missing_cumulative_weather_stress_pdf

In [0]:
filterd_weekly_missing_cumulative_weather_stress_pdf.reset_index().to_csv(local_tmp_artifact_dir_path + 'filterd_weekly_missing_cumulative_weather_stress.csv', index=False)

## Wrap up

In [0]:
mlflow.log_artifacts(local_tmp_artifact_dir_path)

In [0]:
# Remove tmp directory file
tmp_dir = os.path.dirname(os.path.dirname(local_tmp_artifact_dir_path))
dbutils.fs.rm("file:" + tmp_dir, recurse=True)

In [0]:
# End MLflow run
mlflow.end_run()