This script:
1. Reads multiple CSV files from weather station directories
2. Processes the data
3. Uploads combined data to PostgreSQL

In [1]:
import os
import csv
import io
import psycopg2
from psycopg2.extras import execute_values
import time

In [2]:
data_folder = "../data/historical_weather/data"
target_table = "weather_reports"
conn_string = "postgresql://lizavabistsevits:@localhost:5432/taiwan" # database connection string

column_names = {
	"station_code" : "station",
	"obs_date" : "obs_date",
	"Tx" : "temp",
	"TxMaxAbs" : "temp_max",
	"TxMinAbs" : "temp_min",
	"Td" : "temp_dp",
	"RH" : "rel_humidity",
	"WS" : "wind_speed",
	"WD" : "wind_dir",
	"Precp" : "precp"
}

In [17]:
# Get a list of all station codes that exist in the database
with psycopg2.connect(conn_string) as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT code FROM stations")
        station_codes = [row[0] for row in cursor.fetchall()]

print("Existing station codes: ", station_codes)
print("Total number of unique station codes: ", len(station_codes))

Existing station codes:  ['466850', '466881', '466900', '466910', '466920', '466930', '466940', '466950', '466990', '467050', '467080', '467110', '467270', '467280', '467290', '467300', '467350', '467410', '467420', '467441', '467480', '467490', '467530', '467540', '467550', '467571', '467590', '467610', '467620', '467650', '467660', '467790', '467990', 'C0A520', 'C0A530', 'C0A550', 'C0A570', 'C0A640', 'C0A770', 'C0A860', 'C0A870', 'C0A890', 'C0A931', 'C0A940', 'C0A950', 'C0A970', 'C0A980', 'C0A9C0', 'C0A9F0', 'C0AC40', 'C0AC60', 'C0AC70', 'C0AC80', 'C0ACA0', 'C0AD10', 'C0AD30', 'C0AD40', 'C0AD50', 'C0AG80', 'C0AH00', 'C0AH10', 'C0AH30', 'C0AH40', 'C0AH50', 'C0AH70', 'C0AH80', 'C0AH90', 'C0AI00', 'C0AI10', 'C0AI20', 'C0AI30', 'C0AI40', 'C0AJ20', 'C0AJ30', 'C0AJ40', 'C0AJ50', 'C0AJ60', 'C0AJ70', 'C0AJ80', 'C0AJ90', 'C0AK10', 'C0B010', 'C0B020', 'C0B040', 'C0B050', 'C0B060', 'C0C460', 'C0C480', 'C0C490', 'C0C590', 'C0C620', 'C0C630', 'C0C650', 'C0C660', 'C0C670', 'C0C680', 'C0C700', 'C0C

In [None]:
def get_station_files(data_dir):
    """
    Find all station directories and their CSV files.
    Returns a list of tuples: (station_code, file_path)
    """
    station_files = []
    skipped_stations = 0
    
    # Traverse the main data directory
    for station_dir in os.listdir(data_dir):
        station_path = os.path.join(data_dir, station_dir)
        
        # Skip if not a directory
        if not os.path.isdir(station_path):
            continue
            
        # Get station code from directory name
        station_code = station_dir
        
        # Skip if it is not in the station list
        if station_code not in station_codes:
            skipped_stations += 1
            continue
        
        # Find all CSV files in the station directory
        for filename in os.listdir(station_path):
            if filename.endswith('_daily.csv'):
                file_path = os.path.join(station_path, filename)
                station_files.append((station_code, file_path))
    
    #print(f"Skipped {skipped_stations} stations.")        
    
    return station_files

def process_csv_file(station_code, file_path):
    """
    Process a single CSV file and return its data as a list of dictionaries.
    Handles the specific column mapping and adds station_code to each record.
    """
    data = []
    
    with open(file_path, 'r', newline='') as csvfile:
        
        # Read CSV file
        reader = csv.reader(csvfile, delimiter=',')
        
        # Get headers
        headers = next(reader)
        #print("Headers: ", headers)
        
        # Process each row
        for row in reader:
            if not row:  # Skip empty rows
                continue
                
            # Create record with renamed columns
            record = {'station_code': station_code}

            # Add date as first column
            record['obs_date'] = row[0]
            
            # Add remaining columns
            for i, value in enumerate(row[1:], 1):
                if i < len(headers):
                    orig_col = headers[i]
                    if orig_col in column_names:
                        # Handle  NULL values such as -9999.5, -99.5, -9995 etc
                        if value == "" or value.startswith("-99"):
                            record[orig_col] = None
                        else:
                            record[orig_col] = value

            #print("Record:", record)
            
            data.append(record)
            
    return data

def insert_to_postgres(conn, data, table_name):
    """
    Upload data to PostgreSQL using the INSERT command with execute_values.

        conn: PostgreSQL connection
        data: List of dictionaries containing the data
        table_name: Target PostgreSQL table name
    
    Returns number of records inserted.
    """
    if not data:
        return 0
        
    # Get PostgreSQL column names from mapping
    pg_columns = []
    for csv_col, pg_col in column_names.items():
        pg_columns.append(pg_col)
        
    # Prepare data for insertion
    rows = []
    for record in data:
        row = []
        for csv_col in column_names.keys():
            row.append(record.get(csv_col, None))
        rows.append(row)
    
    # Execute INSERT command using execute_values for efficiency
    cursor = conn.cursor()
    
    # Build the INSERT query
    columns_str = ', '.join(pg_columns)
    query = f"INSERT INTO {table_name} ({columns_str}) VALUES %s"

    try:
        execute_values(cursor, query, rows)
    except:
        raise
    finally:
        conn.commit()
        cursor.close()
    return len(data)

def process_weather_data(data_dir=data_folder, table_name=target_table, 
                        connection_string=conn_string, batch_size=10000):
    """
    Main function to process weather data and upload to PostgreSQL.
    """
    start_time = time.time()
    
    # Connect to PostgreSQL
    conn = psycopg2.connect(connection_string)
    print("Connected to PostgreSQL database")
    
    # Get all station files
    print(f"Scanning data directory: {data_dir}")
    station_files = get_station_files(data_dir)
    print(f"Found {len(station_files)} station CSV files")
    
    # Process files
    total_records = 0
    batch_data = []
    
    # Simple progress tracking
    total_files = len(station_files)
    print(f"Processing {total_files} files...")
    
    for i, (station_code, file_path) in enumerate(station_files):
        # Show progress periodically
        if (i+1) % 10 == 0 or i == 0 or i == total_files-1:
            print(f"Processing file {i+1}/{total_files} ({(i+1)/total_files*100:.1f}%) - Records so far: {total_records}")
        
        # Process the CSV file
        file_data = process_csv_file(station_code, file_path)
        batch_data.extend(file_data)
        
        # Upload in batches to reduce memory usage
        if len(batch_data) >= batch_size:
            inserted = insert_to_postgres(conn, batch_data, table_name)
            total_records += inserted
            batch_data = []
    
    # Insert any remaining records
    if batch_data:
        inserted = insert_to_postgres(conn, batch_data, table_name)
        total_records += inserted
    
    # Report summary
    elapsed_time = time.time() - start_time
    print("\n----- SUMMARY -----")
    print(f"Total files processed: {len(station_files)}")
    print(f"Total records inserted: {total_records:,}")
    print(f"Processing time: {elapsed_time:.2f} seconds")
    print(f"Average speed: {total_records/elapsed_time:.2f} records/second")
    
    # Close connection
    conn.close()
    print("PostgreSQL connection closed")
    
    return total_records

In [18]:
# Run the weather station data processor
process_weather_data()

Connected to PostgreSQL database
Scanning data directory: ../data/historical_weather/data
Found 19939 station CSV files
Processing 19939 files...
Processing file 1/19939 (0.0%) - Records so far: 0
Processing file 10/19939 (0.1%) - Records so far: 0
Processing file 20/19939 (0.1%) - Records so far: 0
Processing file 30/19939 (0.2%) - Records so far: 10292
Processing file 40/19939 (0.2%) - Records so far: 10292
Processing file 50/19939 (0.3%) - Records so far: 10292
Processing file 60/19939 (0.3%) - Records so far: 10292
Processing file 70/19939 (0.4%) - Records so far: 20504
Processing file 80/19939 (0.4%) - Records so far: 20504
Processing file 90/19939 (0.5%) - Records so far: 20504
Processing file 100/19939 (0.5%) - Records so far: 20504
Processing file 110/19939 (0.6%) - Records so far: 30544
Processing file 120/19939 (0.6%) - Records so far: 30544
Processing file 130/19939 (0.7%) - Records so far: 30544
Processing file 140/19939 (0.7%) - Records so far: 40791
Processing file 150/19

NameError: name 'skipped_stations' is not defined