In [280]:
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, when
from pyspark.sql.utils import AnalysisException

spark = SparkSession.builder.master('local').getOrCreate()

In [281]:
input_files = []
for data_year in range(2015, 2025):
    file_path = f'data/{data_year}/72429793812.csv'
    if os.path.exists(file_path):
        input_files.append(file_path)
    file_path = f'data/{data_year}/99495199999.csv'
    if os.path.exists(file_path):
        input_files.append(file_path)

csv_df = spark.read.csv(input_files, sep=",", header=True)
csv_df = csv_df.withColumn('YEAR', year(col('DATE')))
csv_df = csv_df.withColumn('MONTH', month(col('DATE')))

In [282]:
csv_df.createOrReplaceTempView("WEATHER_DATA")
queried_data = spark.sql("""
    SELECT
        WEATHER_DATA.STATION,
        WEATHER_DATA.NAME AS LOCATION,
        WEATHER_DATA.YEAR,
        COUNT(WEATHER_DATA.TEMP) AS NUM_ENTRIES
    FROM WEATHER_DATA
    GROUP BY WEATHER_DATA.YEAR, WEATHER_DATA.NAME, WEATHER_DATA.STATION
    ORDER BY WEATHER_DATA.YEAR, WEATHER_DATA.STATION
""")
queried_data.show()

+-----------+--------------------+----+-----------+
|    STATION|            LOCATION|YEAR|NUM_ENTRIES|
+-----------+--------------------+----+-----------+
|72429793812|CINCINNATI MUNICI...|2015|        365|
|99495199999|SEBASTIAN INLET S...|2015|        355|
|72429793812|CINCINNATI MUNICI...|2016|        366|
|72429793812|CINCINNATI MUNICI...|2017|        365|
|99495199999|SEBASTIAN INLET S...|2017|        283|
|72429793812|CINCINNATI MUNICI...|2018|        365|
|99495199999|SEBASTIAN INLET S...|2018|        363|
|72429793812|CINCINNATI MUNICI...|2019|        365|
|99495199999|SEBASTIAN INLET S...|2019|        345|
|72429793812|CINCINNATI MUNICI...|2020|        366|
|99495199999|SEBASTIAN INLET S...|2020|        365|
|72429793812|CINCINNATI MUNICI...|2021|        365|
|99495199999|SEBASTIAN INLET S...|2021|        104|
|72429793812|CINCINNATI MUNICI...|2022|        365|
|99495199999|SEBASTIAN INLET S...|2022|        259|
|72429793812|CINCINNATI MUNICI...|2023|        365|
|99495199999

In [283]:
csv_df.filter(csv_df.MAX != 9999.9).createOrReplaceTempView("WEATHER_DATA")
queried_data = spark.sql("""
    SELECT
        STATION,
        LOCATION,
        DATE,
        MAX_TEMP
    FROM (
        SELECT
            WEATHER_DATA.STATION,
            WEATHER_DATA.NAME AS LOCATION,
            WEATHER_DATA.DATE,
            WEATHER_DATA.MAX AS MAX_TEMP,
            ROW_NUMBER() OVER (PARTITION BY WEATHER_DATA.YEAR ORDER BY WEATHER_DATA.MAX DESC) AS ranked_order
        FROM WEATHER_DATA
    ) AS max_temps
    WHERE max_temps.ranked_order = 1
""")
queried_data.show()

+-----------+--------------------+----------+--------+
|    STATION|            LOCATION|      DATE|MAX_TEMP|
+-----------+--------------------+----------+--------+
|72429793812|CINCINNATI MUNICI...|2015-06-12|    91.9|
|72429793812|CINCINNATI MUNICI...|2016-07-24|    93.9|
|72429793812|CINCINNATI MUNICI...|2017-07-22|    91.9|
|72429793812|CINCINNATI MUNICI...|2018-07-04|    96.1|
|72429793812|CINCINNATI MUNICI...|2019-09-30|    95.0|
|72429793812|CINCINNATI MUNICI...|2020-07-05|    93.9|
|72429793812|CINCINNATI MUNICI...|2021-08-12|    95.0|
|72429793812|CINCINNATI MUNICI...|2022-06-14|    96.1|
|72429793812|CINCINNATI MUNICI...|2023-08-23|    96.1|
|72429793812|CINCINNATI MUNICI...|2024-08-30|   100.9|
+-----------+--------------------+----------+--------+



In [284]:
csv_df.filter(csv_df.MIN != 9999.9).createOrReplaceTempView("WEATHER_DATA")
queried_data = spark.sql("""
    SELECT
        STATION,
        NAME AS LOCATION,
        DATE,
        MIN AS MIN_TEMP
    FROM WEATHER_DATA
    INNER JOIN (
        SELECT
            MIN(MIN) AS MIN_TEMP
        FROM WEATHER_DATA
        WHERE MONTH = 3
    ) AS min_data ON min_data.MIN_TEMP = WEATHER_DATA.MIN
""")
queried_data.show()

+-----------+--------------------+----------+--------+
|    STATION|            LOCATION|      DATE|MIN_TEMP|
+-----------+--------------------+----------+--------+
|72429793812|CINCINNATI MUNICI...|2015-03-06|     3.2|
+-----------+--------------------+----------+--------+



In [285]:
csv_df.filter(csv_df.PRCP != 99.99).createOrReplaceTempView("WEATHER_DATA")
queried_data = spark.sql("""
    SELECT
        STATION,
        LOCATION,
        YEAR,
        PRECIPITATION
    FROM (
        SELECT
            WEATHER_DATA.STATION,
            WEATHER_DATA.NAME AS LOCATION,
            WEATHER_DATA.YEAR,
            AVG(WEATHER_DATA.PRCP) AS PRECIPITATION,
            ROW_NUMBER() OVER (PARTITION BY WEATHER_DATA.STATION ORDER BY AVG(WEATHER_DATA.PRCP) DESC, WEATHER_DATA.YEAR) AS ranked_order
        FROM WEATHER_DATA
        GROUP BY WEATHER_DATA.YEAR, WEATHER_DATA.STATION, WEATHER_DATA.NAME
        ORDER BY WEATHER_DATA.YEAR, WEATHER_DATA.STATION
    ) AS avg_prcps
    WHERE avg_prcps.ranked_order = 1
    ORDER BY STATION
""")
queried_data.show()

+-----------+--------------------+----+-------------------+
|    STATION|            LOCATION|YEAR|      PRECIPITATION|
+-----------+--------------------+----+-------------------+
|72429793812|CINCINNATI MUNICI...|2018|0.15789041095890405|
|99495199999|SEBASTIAN INLET S...|2015|                0.0|
+-----------+--------------------+----+-------------------+



In [286]:
queried_data = spark.sql("""
    SELECT
        WEATHER_DATA.STATION,
        WEATHER_DATA.NAME AS LOCATION,
        COUNT(WEATHER_DATA.GUST) AS MISSING_GUSTS
    FROM WEATHER_DATA
    WHERE WEATHER_DATA.GUST = 999.9 AND WEATHER_DATA.YEAR = 2024
    GROUP BY WEATHER_DATA.STATION, WEATHER_DATA.NAME
    ORDER BY STATION
""")
queried_data.show()

+-----------+--------------------+-------------+
|    STATION|            LOCATION|MISSING_GUSTS|
+-----------+--------------------+-------------+
|72429793812|CINCINNATI MUNICI...|          137|
|99495199999|SEBASTIAN INLET S...|          133|
+-----------+--------------------+-------------+



In [287]:
queried_data = spark.sql("""
    SELECT
        MONTH,
        AVG(TEMP) AS MEAN,
        MEDIAN(TEMP) AS MEDIAN,
        MODE(TEMP) AS MODE,
        STDDEV_POP(TEMP) AS STDEV
    FROM WEATHER_DATA
    WHERE STATION = 72429793812 AND YEAR = 2020
    GROUP BY MONTH
    ORDER BY MONTH
""")
queried_data.show()

+-----+------------------+------+------+------------------+
|MONTH|              MEAN|MEDIAN|  MODE|             STDEV|
+-----+------------------+------+------+------------------+
|    1| 37.94516129032259|  37.7|  24.7| 8.210097587321375|
|    2|  36.5896551724138|  36.0|  30.8| 7.764168131721883|
|    3|  49.0741935483871|  47.8|  53.2| 8.636642408457773|
|    4|51.779999999999994|  51.1|  53.2| 7.190243389482725|
|    5| 60.89032258064518|  63.7|  73.9| 9.163298280630869|
|    6| 72.54666666666667| 73.95|  74.2| 4.817588147149521|
|    7|              77.6|  77.9|  79.7| 2.299929872703694|
|    8| 73.34516129032258|  73.7|  73.2| 3.431151288931137|
|    9|              66.1| 66.15|  54.7|  6.99861891137577|
|   10|55.193548387096776|  54.0|  59.4| 6.619274664671979|
|   11|48.003333333333345|  47.7|  47.7| 6.711208700541371|
|   12| 35.99354838709677|  35.2|  37.4|6.5347673396336425|
+-----+------------------+------+------+------------------+



In [331]:
csv_df = csv_df.withColumn('WIND_CHILL', when((col('TEMP') < 50) & (col('WDSP') > 3), 35.74 + 0.6215 * col('TEMP') - 35.75 * col('WDSP')**0.16 + 0.4275 * col('TEMP') * col('WDSP')**0.16).otherwise(9999.9))
csv_df.filter(csv_df.TEMP != 9999.9).filter(csv_df.WDSP != 999.9).createOrReplaceTempView("WEATHER_DATA")

queried_data = spark.sql("""
    SELECT
        DATE,
        WIND_CHILL
    FROM WEATHER_DATA
    WHERE STATION = 72429793812 AND YEAR = 2017
    ORDER BY WIND_CHILL, DATE
    LIMIT 10
""")
queried_data.show()

+----------+-------------------+
|      DATE|         WIND_CHILL|
+----------+-------------------+
|2017-01-07|-0.4140156367932173|
|2017-12-31| 2.0339767075993116|
|2017-12-27|  3.820645509123832|
|2017-12-28|  4.533355269061226|
|2017-01-06|  4.868933041653884|
|2017-01-08|  7.929748208036862|
|2017-12-25| 14.285113218297408|
|2017-12-30| 14.539211253038193|
|2017-01-05| 14.748861828163854|
|2017-12-26| 15.688977805634499|
+----------+-------------------+

