In [1]:
import requests
import os

base_url_1 = "https://www.ncei.noaa.gov/data/global-summary-of-the-day/access/{}/99495199999.csv"
base_url_2 = "https://www.ncei.noaa.gov/data/global-summary-of-the-day/access/{}/72429793812.csv"

years = range(2015, 2024)

base_output_dir = "./weather_data/"

for year in years:
    year_dir = os.path.join(base_output_dir, str(year))
    os.makedirs(year_dir, exist_ok=True)

    for base_url, station_id in [(base_url_1, "99495199999"), (base_url_2, "72429793812")]:
        url = base_url.format(year)
        response = requests.get(url)

        if response.status_code == 200:

            file_path = os.path.join(year_dir, f"{station_id}.csv")
            with open(file_path, "wb") as file:
                file.write(response.content)
            print(f"Downloaded: {file_path}")
        else:
            print(f"Failed to download {url}. Status code: {response.status_code}")

Downloaded: ./weather_data/2015/99495199999.csv
Downloaded: ./weather_data/2015/72429793812.csv
Failed to download https://www.ncei.noaa.gov/data/global-summary-of-the-day/access/2016/99495199999.csv. Status code: 404
Downloaded: ./weather_data/2016/72429793812.csv
Downloaded: ./weather_data/2017/99495199999.csv
Downloaded: ./weather_data/2017/72429793812.csv
Downloaded: ./weather_data/2018/99495199999.csv
Downloaded: ./weather_data/2018/72429793812.csv
Downloaded: ./weather_data/2019/99495199999.csv
Downloaded: ./weather_data/2019/72429793812.csv
Downloaded: ./weather_data/2020/99495199999.csv
Downloaded: ./weather_data/2020/72429793812.csv
Downloaded: ./weather_data/2021/99495199999.csv
Downloaded: ./weather_data/2021/72429793812.csv
Downloaded: ./weather_data/2022/99495199999.csv
Downloaded: ./weather_data/2022/72429793812.csv
Downloaded: ./weather_data/2023/99495199999.csv
Downloaded: ./weather_data/2023/72429793812.csv


In [2]:
import os
import pandas as pd

base_input_dir = "/content/weather_data"
base_output_dir = "/content/cleaned_weather_data"

invalid_values = {
    "MXSPD": 999.9,
    "MAX": 9999.9,
}

for year in range(2015, 2023):
    year_dir = os.path.join(base_input_dir, str(year))

    if not os.path.exists(year_dir):
        print(f"Year directory not found: {year_dir}")
        continue

    for station_id in ["99495199999", "72429793812"]:
        file_path = os.path.join(year_dir, f"{station_id}.csv")

        if not os.path.exists(file_path):
            print(f"File not found: {file_path}")
            continue

        df = pd.read_csv(file_path)

        for column, invalid_value in invalid_values.items():
            if column in df.columns:
                df = df[df[column] != invalid_value]

        output_year_dir = os.path.join(base_output_dir, str(year))
        os.makedirs(output_year_dir, exist_ok=True)

        cleaned_file_path = os.path.join(output_year_dir, f"{station_id}.csv")
        df.to_csv(cleaned_file_path, index=False)
        print(f"Cleaned data saved to: {cleaned_file_path}")


Cleaned data saved to: /content/cleaned_weather_data/2015/99495199999.csv
Cleaned data saved to: /content/cleaned_weather_data/2015/72429793812.csv
File not found: /content/weather_data/2016/99495199999.csv
Cleaned data saved to: /content/cleaned_weather_data/2016/72429793812.csv
Cleaned data saved to: /content/cleaned_weather_data/2017/99495199999.csv
Cleaned data saved to: /content/cleaned_weather_data/2017/72429793812.csv
Cleaned data saved to: /content/cleaned_weather_data/2018/99495199999.csv
Cleaned data saved to: /content/cleaned_weather_data/2018/72429793812.csv
Cleaned data saved to: /content/cleaned_weather_data/2019/99495199999.csv
Cleaned data saved to: /content/cleaned_weather_data/2019/72429793812.csv
Cleaned data saved to: /content/cleaned_weather_data/2020/99495199999.csv
Cleaned data saved to: /content/cleaned_weather_data/2020/72429793812.csv
Cleaned data saved to: /content/cleaned_weather_data/2021/99495199999.csv
Cleaned data saved to: /content/cleaned_weather_data/

In [3]:
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("WeatherAnalysis").getOrCreate()

base_path = "/content/cleaned_weather_data"

hottest_days = {}

for year in range(2015, 2023):
    year_dir = os.path.join(base_path, str(year))

    if not os.path.exists(year_dir):
        print(f"Year directory not found: {year_dir}")
        continue

    for filename in os.listdir(year_dir):
        if filename.endswith('.csv'):
            file_path = os.path.join(year_dir, filename)

            df = spark.read.csv(file_path, header=True, inferSchema=True)

            if df.rdd.isEmpty():
                print(f"Skipping empty file: {filename}")
                continue

            if "MAX" not in df.columns:
                df = df.withColumn("MAX", F.lit(None))

            max_temp = df.agg(F.max("MAX")).collect()[0][0]

            if max_temp is not None:
                max_day = df.filter(df["MAX"] == max_temp).orderBy(F.desc("DATE")).first()

                if max_day:

                    if year not in hottest_days:
                        hottest_days[year] = (max_day.STATION, max_day.NAME, max_day.DATE, max_day.MAX)

if hottest_days:
    hottest_days_list = [(year, *data) for year, data in hottest_days.items()]
    hottest_days_df = spark.createDataFrame(hottest_days_list, ["YEAR", "STATION", "NAME", "DATE", "MAX"])
    hottest_days_df.show()
else:
    print("No hottest days found across the datasets.")


Skipping empty file: 99495199999.csv
Skipping empty file: 99495199999.csv
+----+-----------+--------------------+----------+----+
|YEAR|    STATION|                NAME|      DATE| MAX|
+----+-----------+--------------------+----------+----+
|2015|99495199999|SEBASTIAN INLET S...|2015-07-28|90.0|
|2016|72429793812|CINCINNATI MUNICI...|2016-07-26|93.9|
|2017|99495199999|SEBASTIAN INLET S...|2017-05-24|88.3|
|2018|99495199999|SEBASTIAN INLET S...|2018-09-15|90.1|
|2019|99495199999|SEBASTIAN INLET S...|2019-09-06|91.6|
|2020|99495199999|SEBASTIAN INLET S...|2020-07-13|91.8|
|2021|72429793812|CINCINNATI MUNICI...|2021-08-25|95.0|
|2022|72429793812|CINCINNATI MUNICI...|2022-06-23|96.1|
+----+-----------+--------------------+----------+----+



In [4]:
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("Coldest Day in March").getOrCreate()

base_path = "/content/cleaned_weather_data"

march_data = []

for year in range(2015, 2023):
    year_dir = os.path.join(base_path, str(year))

    if not os.path.exists(year_dir):
        print(f"Year directory not found: {year_dir}")
        continue

    for filename in os.listdir(year_dir):
        if filename.endswith('.csv'):
            file_path = os.path.join(year_dir, filename)

            df = spark.read.csv(file_path, header=True, inferSchema=True)


            if df.rdd.isEmpty():
                print(f"Skipping empty file: {filename}")
                continue

            if "DATE" not in df.columns or "MIN" not in df.columns:
                print(f"Skipping {filename} due to missing 'DATE' or 'MIN' column.")
                continue

            march_df = df.filter(df["DATE"].contains('-03-'))

            if march_df.rdd.isEmpty():
                continue

            coldest_day = march_df.orderBy(F.asc("MIN")).first()

            if coldest_day is not None:
                march_data.append((coldest_day.STATION, coldest_day.NAME, coldest_day.DATE, coldest_day.MIN))

if march_data:
    coldest_day_df = spark.createDataFrame(march_data, ["STATION", "NAME", "DATE", "MIN"])

    overall_coldest_day = coldest_day_df.orderBy(F.asc("MIN")).first()

    if overall_coldest_day:
        overall_coldest_day_df = spark.createDataFrame([overall_coldest_day], ["STATION", "NAME", "DATE", "MIN"])
        print("\nOverall coldest day in March:")
        overall_coldest_day_df.show()
else:
    print("No March data found across the datasets.")


Skipping empty file: 99495199999.csv
Skipping empty file: 99495199999.csv

Overall coldest day in March:
+-----------+--------------------+----------+---+
|    STATION|                NAME|      DATE|MIN|
+-----------+--------------------+----------+---+
|72429793812|CINCINNATI MUNICI...|2015-03-06|3.2|
+-----------+--------------------+----------+---+



In [None]:
'''Step-wise Breakdown of the Code
1. Downloading Weather Data

Purpose: Retrieve weather data for two specific stations (99495199999 and 72429793812) from the NOAA website for the years 2015 to 2023.

Libraries Used:

requests: For making HTTP requests to download CSV files.
os: For file system operations like creating directories and constructing file paths.


Process:

Define base URLs for two weather stations using string formatting.
Iterate over years from 2015 to 2023.
For each year and station:
Construct the URL using the year.
Send an HTTP GET request using requests.get().
If the response status code is 200 (success), save the CSV content to a file in a directory structure (./weather_data/{year}/{station_id}.csv).
Print a success message with the file path or a failure message with the status code (e.g., 404 for not found).




Output Example:

Successful download: Downloaded: ./weather_data/2015/99495199999.csv
Failed download: Failed to download .../2016/99495199999.csv. Status code: 404




2. Cleaning Weather Data

Purpose: Clean the downloaded CSV files by removing rows with invalid or missing data in specific columns.

Libraries Used:

os: For file system operations.
pandas: For reading, manipulating, and saving CSV data as DataFrames.


Process:

Define the input directory (/content/weather_data) and output directory (/content/cleaned_weather_data).
Specify invalid values for columns: MXSPD (999.9) and MAX (9999.9).
Iterate over years 2015 to 2022.
For each year and station:
Check if the year directory and CSV file exist; skip if not.
Load the CSV into a Pandas DataFrame using pd.read_csv().
Filter out rows where MXSPD or MAX equals the invalid values.
Save the cleaned DataFrame to a new CSV file in the output directory (/content/cleaned_weather_data/{year}/{station_id}.csv).
Print the path of the saved cleaned file.




Output Example:

Cleaned data saved to: /content/cleaned_weather_data/2015/99495199999.csv
File not found: /content/weather_data/2016/99495199999.csv




3. Analyzing Weather Data with PySpark - Hottest Day Each Year

Purpose: Identify the hottest day (highest MAX temperature) for each year across the cleaned data.

Libraries Used:

os: For file system operations.
pyspark.sql.SparkSession: For initializing a Spark session.
pyspark.sql.functions (aliased as F): For DataFrame operations like aggregation and ordering.


Process:

Initialize a SparkSession with the app name "WeatherAnalysis".
Define the base path to the cleaned data (/content/cleaned_weather_data).
Create an empty dictionary hottest_days to store results.
Iterate over years 2015 to 2022:
Check if the year directory exists; skip if not.
For each CSV file in the year directory:
Load the CSV into a Spark DataFrame with spark.read.csv(), enabling header and schema inference.
Skip empty files using df.rdd.isEmpty().
Add a MAX column with None if missing using F.lit(None).
Compute the maximum MAX value using df.agg(F.max("MAX")).
Filter the DataFrame for rows matching this maximum, order by DATE descending, and take the first row with first().
Store the station, name, date, and max temperature in hottest_days if not already present for that year.




If data is found, convert the results to a DataFrame and display it with show().


Output Example:
+----+-----------+--------------------+----------+----+
|YEAR|    STATION|                NAME|      DATE| MAX|
+----+-----------+--------------------+----------+----+
|2015|99495199999|SEBASTIAN INLET S...|2015-07-28|90.0|
|2016|72429793812|CINCINNATI MUNICI...|2016-07-26|93.9|
...
+----+-----------+--------------------+----------+----+




4. Analyzing Weather Data with PySpark - Coldest Day in March Across All Years

Purpose: Identify the coldest day (lowest MIN temperature) in March across all years.

Libraries Used: Same as the previous section.

Process:

Initialize a SparkSession with the app name "Coldest Day in March".
Define the base path to the cleaned data (/content/cleaned_weather_data).
Create an empty list march_data to store March-specific results.
Iterate over years 2015 to 2022:
Check if the year directory exists; skip if not.
For each CSV file in the year directory:
Load the CSV into a Spark DataFrame.
Skip empty files or files missing DATE or MIN columns.
Filter for March data using df.filter(df["DATE"].contains('-03-')).
Order by MIN ascending and take the first row with first().
Append the station, name, date, and min temperature to march_data.




If March data is found:
Create a DataFrame from march_data.
Find the overall coldest day by ordering by MIN ascending and taking the first row.
Display the result.




Output Example:
Overall coldest day in March:
+-----------+--------------------+----------+---+
|    STATION|                NAME|      DATE|MIN|
+-----------+--------------------+----------+---+
|72429793812|CINCINNATI MUNICI...|2015-03-06|3.2|
+-----------+--------------------+----------+---+




PySpark Theory Concepts (Detailed Explanation)
1. SparkSession

Definition: The entry point to PySpark's SQL and DataFrame API. It manages the Spark application and provides methods to create DataFrames, execute SQL queries, and configure Spark settings.
Details:
Created with SparkSession.builder.appName("WeatherAnalysis").getOrCreate().
appName: Sets a name for the Spark application, visible in the Spark UI for tracking.
getOrCreate(): Reuses an existing session if available or creates a new one, ensuring resource efficiency.
Internally encapsulates a SparkContext, managing the connection to the Spark cluster and resource allocation.



2. DataFrames

Definition: A distributed collection of data organized into named columns, akin to a table in a relational database or a Pandas DataFrame, but optimized for distributed processing.
Details:
Created with spark.read.csv(file_path, header=True, inferSchema=True):
header=True: Uses the first row as column names.
inferSchema=True: Automatically detects column data types (e.g., integer, string).


Supports SQL-like operations (e.g., filtering, aggregation) executed across a cluster.
Built on top of RDDs but provides a higher-level abstraction for structured data.



3. DataFrame Operations

Filtering:
df.filter(df["DATE"].contains('-03-')): Uses the contains method to match rows where DATE includes "-03-" (March).
Returns a new DataFrame with only the filtered rows.


Aggregation:
df.agg(F.max("MAX")): Computes the maximum value in the MAX column.
Returns a DataFrame with one row and one column; .collect()[0][0] extracts the value.


Ordering:
df.orderBy(F.desc("DATE")): Sorts the DataFrame by DATE in descending order.
F.asc("MIN"): Sorts by MIN in ascending order.
Sorting is a transformation that prepares data for actions like first().


Collecting Data:
df.collect(): Retrieves all rows as a list of Row objects, triggering computation (action).
df.first(): Retrieves the first row as a Row object, also an action.
Row objects allow attribute access (e.g., row.STATION).



4. Functions Module (pyspark.sql.functions)

Definition: A library of built-in functions for manipulating DataFrame columns.
Details:
F.max("MAX"): Returns the maximum value of the MAX column.
F.lit(None): Adds a column with a literal value (e.g., None) for missing columns.
F.asc("MIN") and F.desc("DATE"): Specify ascending or descending sort order.
Functions are lazily evaluated and optimized by Spark’s Catalyst optimizer.



5. RDD (Resilient Distributed Dataset)

Definition: The core data structure in Spark, representing an immutable, partitioned collection of objects that can be processed in parallel.
Details:
df.rdd.isEmpty(): Checks if the DataFrame’s underlying RDD has no data.
DataFrames are built on RDDs, but RDDs are lower-level and less structured.
Used here for emptiness checks, avoiding unnecessary processing.



6. DataFrame Creation from Lists

Definition: Allows creating a DataFrame from a Python list of tuples or Row objects with a specified schema.
Details:
spark.createDataFrame(hottest_days_list, ["YEAR", "STATION", "NAME", "DATE", "MAX"]):
Takes a list of tuples and a list of column names.
Distributes the data across the Spark cluster.


Used to present final results in a tabular format.



7. Lazy Evaluation

Definition: Spark delays execution of transformations until an action is called, optimizing the execution plan.
Details:
Transformations (e.g., filter, orderBy, agg) build a logical plan.
Actions (e.g., collect, first, show) trigger the execution of the plan.
Improves performance by combining operations and minimizing data shuffling.



8. Actions vs. Transformations

Transformations: Create a new DataFrame without immediate computation (e.g., filter, orderBy, withColumn).
Lazy, chainable, and optimized by Spark.


Actions: Trigger computation and return results to the driver (e.g., collect, first, show).
Examples: show() displays the DataFrame, first() retrieves a single row.



9. Handling Missing Data

Details:
Checks for missing columns (e.g., if "MAX" not in df.columns) and adds them with F.lit(None).
Skips files with missing required columns or empty data, ensuring robustness.
Prevents errors during aggregation or filtering.



10. Directory and File Handling with PySpark

Details:
os.path.join: Constructs platform-independent file paths.
os.listdir: Lists CSV files in a directory for processing.
Integrates seamlessly with PySpark’s file reading capabilities.




Summary
This notebook demonstrates a complete weather data analysis pipeline:

Data Acquisition: Downloads CSV files from NOAA using requests.
Data Cleaning: Filters out invalid data using pandas.
Data Analysis: Uses PySpark to:
Find the hottest day each year.
Identify the coldest day in March across all years.



Key PySpark concepts include:

SparkSession: Central to managing Spark applications.
DataFrames: Efficient for structured data processing.
Transformations and Actions: Core to Spark’s lazy evaluation model.
Functions Module: Simplifies complex operations.
Error Handling: Ensures robustness with checks for empty files and missing columns.

This explanation equips you with a thorough understanding of the code and PySpark principles for weather data analysis.'''
