In [1]:
!mkdir -p data
!wget https://www.ncei.noaa.gov/pub/data/noaa/isd-history.csv -O data/isd-history.csv


--2025-05-05 22:06:47--  https://www.ncei.noaa.gov/pub/data/noaa/isd-history.csv
Resolving www.ncei.noaa.gov (www.ncei.noaa.gov)... 205.167.25.167, 205.167.25.178, 205.167.25.168, ...
Connecting to www.ncei.noaa.gov (www.ncei.noaa.gov)|205.167.25.167|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2907119 (2.8M) [text/csv]
Saving to: ‘data/isd-history.csv’


2025-05-05 22:06:48 (3.62 MB/s) - ‘data/isd-history.csv’ saved [2907119/2907119]



In [3]:
import os
import time
import urllib.request
import urllib.error
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql import SparkSession

In [2]:
## Script to download all NOAA weather data from 1970 - 2023


NOAA_BASE_URL = "https://www.ncei.noaa.gov/data/global-summary-of-the-day/access"
BASE_PATH = "/home/jovyan/Project/gsod"
HEADERS = {"User-Agent": "Mozilla/5.0"}
YEARS = list(range(1970, 2024))
MAX_THREADS = 10  

os.makedirs(BASE_PATH, exist_ok=True)

def download_file(file_url, local_path, retries=3):
    for attempt in range(retries):
        try:
            if not os.path.exists(local_path):
                urllib.request.urlretrieve(file_url, local_path)
                return True
            else:
                return False  # Already exists
        except urllib.error.HTTPError as e:
            if e.code == 503:
                wait_time = 2 ** attempt
                print(f"⏳ 503 for {file_url} – retrying in {wait_time} sec...")
                time.sleep(wait_time)
            else:
                print(f"❌ HTTP error for {file_url}: {e}")
                break
        except Exception as e:
            print(f"❌ Other error for {file_url}: {e}")
            break
    return False

start_time = time.time()
total_downloaded = 0
failed_downloads = []

try:
    for year in YEARS:
        print(f"\n📅 Processing year: {year}")
        year_path = os.path.join(BASE_PATH, str(year))
        os.makedirs(year_path, exist_ok=True)

        url = f"{NOAA_BASE_URL}/{year}/"
        req = urllib.request.Request(url, headers=HEADERS)

        try:
            with urllib.request.urlopen(req) as response:
                html = response.read().decode("utf-8")
        except Exception as e:
            print(f"❌ Failed to fetch directory listing for {year}: {e}")
            continue

        soup = BeautifulSoup(html, "html.parser")
        csv_links = [link.get("href") for link in soup.find_all("a") if link.get("href", "").endswith(".csv")]
        print(f"🔍 Found {len(csv_links)} CSV files for {year}")

        tasks = []
        with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
            for filename in csv_links:
                file_url = url + filename
                local_file = os.path.join(year_path, filename)
                tasks.append(executor.submit(download_file, file_url, local_file))

            year_downloaded = 0
            for future, filename in zip(as_completed(tasks), csv_links):
                try:
                    result = future.result()
                    if result:
                        total_downloaded += 1
                        year_downloaded += 1
                        if total_downloaded % 2000 == 0:
                            elapsed = int(time.time() - start_time)
                            print(f"✅ Downloaded {total_downloaded} files in {elapsed} seconds")
                except Exception as e:
                    failed_downloads.append(url + filename)
                    print(f"❌ Failed to download {filename}: {e}")

        print(f"📥 Year {year} complete: {year_downloaded}/{len(csv_links)} files downloaded")

except KeyboardInterrupt:
    print("\n⏹️ Interrupted by user")

#  Save failed downloads
if failed_downloads:
    with open("failed_downloads.txt", "w") as f:
        for url in failed_downloads:
            f.write(url + "\n")
    print(f"⚠️ Logged {len(failed_downloads)} failed downloads to 'failed_downloads.txt'")

#  Done
elapsed = int(time.time() - start_time)
print(f"\n✅ All done! Total downloaded: {total_downloaded} files in {elapsed} seconds.")



📅 Processing year: 1970
🔍 Found 2757 CSV files for 1970
📥 Year 1970 complete: 0/2757 files downloaded

📅 Processing year: 1971
🔍 Found 2491 CSV files for 1971
📥 Year 1971 complete: 0/2491 files downloaded

📅 Processing year: 1972
🔍 Found 587 CSV files for 1972
📥 Year 1972 complete: 0/587 files downloaded

📅 Processing year: 1973
🔍 Found 7996 CSV files for 1973
📥 Year 1973 complete: 0/7996 files downloaded

📅 Processing year: 1974
🔍 Found 8191 CSV files for 1974
📥 Year 1974 complete: 0/8191 files downloaded

📅 Processing year: 1975
🔍 Found 8425 CSV files for 1975
📥 Year 1975 complete: 0/8425 files downloaded

📅 Processing year: 1976
🔍 Found 8387 CSV files for 1976
📥 Year 1976 complete: 0/8387 files downloaded

📅 Processing year: 1977
🔍 Found 8940 CSV files for 1977
📥 Year 1977 complete: 0/8940 files downloaded

📅 Processing year: 1978
🔍 Found 8442 CSV files for 1978
📥 Year 1978 complete: 0/8442 files downloaded

📅 Processing year: 1979
🔍 Found 8550 CSV files for 1979
📥 Year 1979 comple

In [3]:
## Verify size of download

def get_folder_size(path):
    total = 0
    for dirpath, _, filenames in os.walk(path):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            if os.path.isfile(fp):
                total += os.path.getsize(fp)
    return total / (1024 ** 3)  

size_gb = get_folder_size("/home/jovyan/Project/gsod")
print(f"Original CSV folder size: {size_gb:.2f} GB")


Original CSV folder size: 32.70 GB


In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GSOD CSV to Parquet") \
    .config("spark.driver.memory", "6g") \
    .getOrCreate()

spark.range(15).show()



+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
+---+



In [6]:
## Convert to Parquet

from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import substring, lpad, concat_ws, col

schema = StructType([
    StructField("STATION", StringType(), True),
    StructField("DATE", StringType(), True),
    StructField("LATITUDE", DoubleType(), True),
    StructField("LONGITUDE", DoubleType(), True),
    StructField("ELEVATION", DoubleType(), True),
    StructField("NAME", StringType(), True),
    StructField("TEMP", DoubleType(), True),
    StructField("DEWP", DoubleType(), True),
    StructField("SLP", DoubleType(), True),
    StructField("STP", DoubleType(), True),
    StructField("VISIB", DoubleType(), True),
    StructField("WDSP", DoubleType(), True),
    StructField("MXSPD", DoubleType(), True),
    StructField("GUST", DoubleType(), True),
    StructField("MAX", StringType(), True),
    StructField("MIN", StringType(), True),
    StructField("PRCP", StringType(), True),
    StructField("SNDP", DoubleType(), True),
    StructField("FRSHTT", StringType(), True)
])

df = spark.read \
    .option("header", "true") \
    .option("recursiveFileLookup", "true") \
    .schema(schema) \
    .csv("/home/jovyan/Project/gsod")

df = df.withColumn("year", substring("DATE", 1, 4))

#  Load station metadata
meta = spark.read.option("header", "true").option("inferSchema", "true") \
    .csv("data/isd-history.csv")

# Create STATION ID to match GSOD format
meta = meta.withColumn("STATION", concat_ws("",
    lpad(col("USAF").cast("string"), 6, "0"),
    lpad(col("WBAN").cast("string"), 5, "0")
)).withColumnRenamed("ELEV(M)", "ELEV") \
  .select("STATION", "CTRY", "STATE", "LAT", "LON", "ELEV")


#  Join metadata into GSOD DataFrame
df_enriched = df.join(meta, on="STATION", how="left")

# Write to Parquet (with metadata, partitioned by year)
df_enriched.repartition("year").write \
    .mode("overwrite") \
    .partitionBy("year") \
    .parquet("/home/jovyan/Project/gsod_parquet_enriched")

print("Done: CSVs converted to Parquet")



Done: CSVs converted to Parquet


In [7]:
df_enriched = spark.read.parquet("/home/jovyan/Project/gsod_parquet_enriched")
df_enriched.printSchema()
df_enriched.show(5)


root
 |-- STATION: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- ELEVATION: double (nullable = true)
 |-- NAME: string (nullable = true)
 |-- TEMP: double (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- SLP: double (nullable = true)
 |-- STP: double (nullable = true)
 |-- VISIB: double (nullable = true)
 |-- WDSP: double (nullable = true)
 |-- MXSPD: double (nullable = true)
 |-- GUST: double (nullable = true)
 |-- MAX: string (nullable = true)
 |-- MIN: string (nullable = true)
 |-- PRCP: string (nullable = true)
 |-- SNDP: double (nullable = true)
 |-- FRSHTT: string (nullable = true)
 |-- CTRY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- LAT: double (nullable = true)
 |-- LON: double (nullable = true)
 |-- ELEV: double (nullable = true)
 |-- year: integer (nullable = true)

+-----------+----------+---------+----------+---------+--------------------+-

In [11]:
## Data Cleaning

from pyspark.sql.functions import col, regexp_replace, when
# 1. Drop rows with missing essential fields
essential_cols = ["TEMP", "LATITUDE", "LONGITUDE"]
df_clean = df.dropna(subset=essential_cols)

# 2. Filter out placeholder or unrealistic values
df_clean = df_clean.filter((col("TEMP") > -1750) & (col("TEMP") < 1750)) \
                   .filter((col("SLP") < 1100) & (col("SLP") > 800)) \
                   .filter(~col("PRCP").isin(["99.99", "999.9", "99.9", "999.0"]))

# 3. Remove duplicates
df_clean = df_clean.dropDuplicates(["STATION", "DATE"])

# 4. Clean special flags in MAX/MIN and PRCP columns
df_clean = df_clean.withColumn("MAX", regexp_replace(col("MAX"), "[*]", "").cast("double"))
df_clean = df_clean.withColumn("MIN", regexp_replace(col("MIN"), "[*]", "").cast("double"))

# 5. Remove special flags from PRCP column
df_clean = df_clean.withColumn("PRCP", regexp_replace(col("PRCP"), "[A-Z]", "").cast("double"))
