In [12]:
import os
import csv
import pandas as pd
from tqdm import tqdm
import psycopg2
from sqlalchemy import create_engine

# Output directory for CSV files
data_dir = 'datasets/ghcnd_all'
output_dir = 'datasets/output'  # Directory to store CSV files

engine = create_engine(f'postgresql://mrugeles:@localhost:5432/climate_db')


def load_stations():
    colspecs = [(0, 11), (12, 20), (21, 30), (31, 37), (38, 40), (41, 71), (72, 75), (76, 79), (80, 85)]

    df = pd.read_fwf('datasets/ghcnd-stations.txt', colspecs=colspecs, header=None, names=[
        "id", "latitude", "longitude", "elevation", "state", "name", "gsn_flag", "hcn_crn_flag", "wmo_id"
    ], dtype={
        "id": str,
        "latitude": float,
        "longitude": float,
        "elevation": float,
        "state": str,
        "name": str,
        "gsn_flag": str,
        "hcn_crn_flag": str,
        "wmo_id": str
    })

    df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)
    
    df.to_sql('stations', engine, if_exists='append', index=False)


def load_countries():
    colspecs = [(0, 2), (3, 64)]

    df = pd.read_fwf('datasets/ghcnd-countries.txt', colspecs=colspecs, header=None, names=[
        "code", "name"
    ], dtype={
        "code": str,
        "name": str
    })

    df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)

    df.to_sql('countries', engine, if_exists='append', index=False)

def load_states():
    colspecs = [(0, 2), (3, 64)]

    df = pd.read_fwf('datasets/ghcnd-states.txt', colspecs=colspecs, header=None, names=[
        "code", "name"
    ], dtype={
        "code": str,
        "name": str
    })

    df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)

    df.to_sql('states', engine, if_exists='append', index=False)


def load_inventory():
    colspecs = [(0, 11), (12, 20), (21, 30), (31, 35), (36, 40), (41, 45)]

    df = pd.read_fwf('datasets/ghcnd-inventory.txt', colspecs=colspecs, header=None, names=[
        "id", "latitude", "longitude", "element", "firstyear", "lastyear"
    ], dtype={
        "id": str,
        "latitude": float,
        "longitude": float,
        "element": str,
        "firstyear": int,  # Note: integer data type
        "lastyear": int     # Note: integer data type
    })

    df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)


    df.to_sql('inventory', engine, if_exists='append', index=False)
    
def process_dly_file(file_path):
    # Extract station ID from filename
    station_id = file_path.split('/')[-1].split('.')[0]

    # Base column names and data types
    base_cols = {
        "id": str,
        "year": int,
        "month": int,
        "element": str
    }

    # Read fixed-width file (only base columns initially)
    df = pd.read_fwf(file_path, colspecs=[(0, 11), (11, 15), (15, 17), (17, 21)], names=list(base_cols.keys()), dtype=base_cols)

    # Dynamically add columns for daily values and flags
    for day in range(1, 32):
        value_col = f'value{day}'
        mflag_col = f'mflag{day}'
        qflag_col = f'qflag{day}'
        sflag_col = f'sflag{day}'

        df[value_col] = pd.to_numeric(df.iloc[:, 3 + (day - 1) * 4].astype(str).str.strip(), errors='coerce')
        df[mflag_col] = df.iloc[:, 4 + (day - 1) * 4].astype(str).str.strip()
        df[qflag_col] = df.iloc[:, 5 + (day - 1) * 4].astype(str).str.strip()
        df[sflag_col] = df.iloc[:, 6 + (day - 1) * 4].astype(str).str.strip()

    # Add station ID column
    df.insert(0, 'station_id', station_id)

    # Insert into database
    df.to_sql('weather_data', engine, if_exists='append', index=False)
    
    
load_stations()
load_countries()
load_states()
load_inventory()

engine.dispose()

ProgrammingError: (psycopg2.errors.UndefinedColumn) column "latitude" of relation "inventory" does not exist
LINE 1: INSERT INTO inventory (id, latitude, longitude, element, fir...
                                   ^

[SQL: INSERT INTO inventory (id, latitude, longitude, element, firstyear, lastyear) VALUES (%(id)s, %(latitude)s, %(longitude)s, %(element)s, %(firstyear)s, %(lastyear)s)]
[parameters: ({'id': 'ACW00011604', 'latitude': 17.1167, 'longitude': -61.7833, 'element': 'TMAX', 'firstyear': 1949, 'lastyear': 1949}, {'id': 'ACW00011604', 'latitude': 17.1167, 'longitude': -61.7833, 'element': 'TMIN', 'firstyear': 1949, 'lastyear': 1949}, {'id': 'ACW00011604', 'latitude': 17.1167, 'longitude': -61.7833, 'element': 'PRCP', 'firstyear': 1949, 'lastyear': 1949}, {'id': 'ACW00011604', 'latitude': 17.1167, 'longitude': -61.7833, 'element': 'SNOW', 'firstyear': 1949, 'lastyear': 1949}, {'id': 'ACW00011604', 'latitude': 17.1167, 'longitude': -61.7833, 'element': 'SNWD', 'firstyear': 1949, 'lastyear': 1949}, {'id': 'ACW00011604', 'latitude': 17.1167, 'longitude': -61.7833, 'element': 'PGTM', 'firstyear': 1949, 'lastyear': 1949}, {'id': 'ACW00011604', 'latitude': 17.1167, 'longitude': -61.7833, 'element': 'WDFG', 'firstyear': 1949, 'lastyear': 1949}, {'id': 'ACW00011604', 'latitude': 17.1167, 'longitude': -61.7833, 'element': 'WSFG', 'firstyear': 1949, 'lastyear': 1949}  ... displaying 10 of 747676 total bound parameter sets ...  {'id': 'ZI000067991', 'latitude': -22.217, 'longitude': 30.0, 'element': 'TMIN', 'firstyear': 1951, 'lastyear': 1990}, {'id': 'ZI000067991', 'latitude': -22.217, 'longitude': 30.0, 'element': 'PRCP', 'firstyear': 1951, 'lastyear': 1990})]
(Background on this error at: https://sqlalche.me/e/14/f405)

In [None]:
os.makedirs(output_dir, exist_ok=True)  # Create output directory if not exists

for file_name in tqdm(os.listdir(data_dir)):
    load_dly_data_to_csv(f"{data_dir}/{file_name}", f'{output_dir}/daily/{file_name[:-4]}.csv')