In [87]:
import sys
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    col, regexp_extract, regexp_replace, initcap, to_timestamp, to_date
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
base_paths = [
    "s3://allweatherdatastatelaglongwd3/files/"
]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [115]:
mapping_path = "s3://citystatelatlongregion/city_state_lat_long_region/cities_with_region.csv"
output_path_daily = "s3://wd3sampledata/wd3/"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
df_all = spark.read.option("header", True).parquet(*base_paths)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [110]:
df_cleaned = df_all.withColumn(
    "city",
    regexp_extract(col("file_only"), r"/([^/]+?)(?:_\d+)?\.csv$", 1)
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [89]:
df_with_city = df_cleaned.withColumn("city", regexp_replace("city", "%20", " ")) \
               .withColumn("city", initcap("city")) \
               .withColumn("city", regexp_replace("city", " ", ""))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [90]:
df_filled = df_with_city.fillna({"snow_depth": 0})


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [91]:
# ------------------- City Classification -------------------
city_file_counts = df_with_city.groupBy("city") \
    .agg(F.countDistinct("file_only").alias("file_count"))

multi_city_list = city_file_counts.filter("file_count > 1").select("city").rdd.flatMap(lambda x: x).collect()
single_city_list = city_file_counts.filter("file_count = 1").select("city").rdd.flatMap(lambda x: x).collect()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [92]:
# ------------------- Aggregation Function -------------------
def create_daily_agg(df_input):
    df = df_input.withColumn("datetime_ts", to_timestamp("date")) \
                 .withColumn("date", to_date(col("datetime_ts"))) \
                 .withColumn("year", F.year(col("date"))) \
                 .withColumn("month", F.month(col("date")))

    agg_exprs = [
        F.avg("temperature_2m").alias("temperature_2m"),
        F.avg("relative_humidity_2m").alias("relative_humidity_2m"),
        F.avg("dew_point_2m").alias("dew_point_2m"),
        F.avg("apparent_temperature").alias("apparent_temperature"),

        F.sum("precipitation").alias("precipitation"),
        F.sum("rain").alias("rain"),
        F.sum("snowfall").alias("snowfall"),
        F.sum("snow_depth").alias("snow_depth"),

        F.avg("pressure_msl").alias("pressure_msl"),
        F.avg("surface_pressure").alias("surface_pressure"),

        F.avg("cloud_cover").alias("cloud_cover"),
        F.avg("cloud_cover_low").alias("cloud_cover_low"),
        F.avg("cloud_cover_mid").alias("cloud_cover_mid"),
        F.avg("cloud_cover_high").alias("cloud_cover_high"),

        F.avg("wind_speed_10m").alias("wind_speed_10m"),
        F.avg("wind_speed_100m").alias("wind_speed_100m"),
        F.avg("wind_direction_10m").alias("wind_direction_10m"),
        F.avg("wind_direction_100m").alias("wind_direction_100m"),
        F.max("wind_gusts_10m").alias("wind_gusts_10m")
    ]
    df_daily = df.groupBy("date", "year", "month", "city").agg(*agg_exprs)
    return df_daily.orderBy("date", "city")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [93]:
# ==================== MULTI-FILE MERGING ====================
df_multi = df_filled.filter(col("city").isin(multi_city_list))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [94]:
# Merge multiple files by date + city
numeric_cols = [c for c, t in df_multi.dtypes if t in ['double', 'int', 'float', 'long'] and c not in ['date', 'city']]
string_cols = [c for c, t in df_multi.dtypes if t in ['string'] and c not in ['date', 'city']]

merge_exprs = (
    [F.avg(col(c)).alias(c) for c in numeric_cols] +
    [F.first(col(c), ignorenulls=True).alias(c) for c in string_cols]
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [96]:
df_merged = df_multi.groupBy("date", "city").agg(*merge_exprs)
df_12hr_multi = create_daily_agg(df_merged)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [97]:

# ==================== SINGLE FILE CITIES ====================
df_single = df_filled.filter(col("city").isin(single_city_list))
df_12hr_single = create_daily_agg(df_single)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [100]:
df_12hr_single.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['date', 'year', 'month', 'city', 'temperature_2m', 'relative_humidity_2m', 'dew_point_2m', 'apparent_temperature', 'precipitation', 'rain', 'snowfall', 'snow_depth', 'pressure_msl', 'surface_pressure', 'cloud_cover', 'cloud_cover_low', 'cloud_cover_mid', 'cloud_cover_high', 'wind_speed_10m', 'wind_speed_100m', 'wind_direction_10m', 'wind_direction_100m', 'wind_gusts_10m']

In [101]:
# ==================== FINAL UNION ====================
df_12hr_final = df_12hr_multi.unionByName(df_12hr_single).orderBy("date", "city")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [116]:
# ------------------- Mapping Data with Region -------------------
df_mapping = (
    spark.read.option("header", True).csv(mapping_path)
    .withColumn("latitude", col("latitude").cast("double"))
    .withColumn("longitude", col("longitude").cast("double"))
    .withColumn("city", regexp_replace("city", "%20", " "))
    .withColumn("city", initcap("city"))
    .withColumn("city", regexp_replace("city", " ", ""))
    .withColumn("state", initcap("state"))
    .withColumn("region", initcap("region")) 
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [117]:
df_mapped = df_daily_final.join(
    df_mapping.select("city", "state", "latitude", "longitude","region"),
    on="city", how="left"
)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [118]:
df_mapped.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['city', 'date', 'year', 'month', 'temperature_2m', 'relative_humidity_2m', 'dew_point_2m', 'apparent_temperature', 'max_temperature_2m', 'min_temperature_2m', 'precipitation', 'rain', 'snowfall', 'snow_depth', 'pressure_msl', 'surface_pressure', 'cloud_cover', 'cloud_cover_low', 'cloud_cover_mid', 'cloud_cover_high', 'wind_speed_10m', 'wind_speed_100m', 'wind_direction_10m', 'wind_direction_100m', 'wind_gusts_10m', 'state', 'latitude', 'longitude', 'region']

In [119]:
df_mapped.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----------+----+-----+--------------+--------------------+------------------+--------------------+------------------+------------------+-------------+----+--------+----------+------------------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------+-------------------+--------------+----------+--------+---------+------+
|     city|      date|year|month|temperature_2m|relative_humidity_2m|      dew_point_2m|apparent_temperature|max_temperature_2m|min_temperature_2m|precipitation|rain|snowfall|snow_depth|      pressure_msl| surface_pressure|      cloud_cover|   cloud_cover_low|   cloud_cover_mid| cloud_cover_high|   wind_speed_10m|   wind_speed_100m|wind_direction_10m|wind_direction_100m|wind_gusts_10m|     state|latitude|longitude|region|
+---------+----------+----+-----+--------------+--------------------+------------------+--------------------+------------------+------------------+-

In [120]:
df_mapped.write.mode("overwrite").partitionBy("state").parquet(output_path_daily)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…