# Install Dependencies
This runs a pip install using the requirements.txt file that installs the dependencies for this notebook. Remove the # when first running this notebook on your machine. Afterward you can add it again.


In [40]:
#!pip install -r requirements.txt

# Docker Setup

remember to navigate into the `./nceiDatabase` directory and execute the command `docker-compose up --build` if it's the first time running this file and `docker-compose up` when have build the docker container previously

# Imports


In [81]:
import numpy as np
import pandas as pd
import psycopg2
from psycopg2 import sql
import os
import requests
from decimal import Decimal


# User Settings

- years: An array of year Numbers that we want to download
- stationsFilePath: the path where the stations data is stored and located
- modifiedStationsFilePath: the path where the modified stations data is stored
- downloadCSVFilePath: where the csv.gz files from the ncei file server should be saved
- modifiedCSVFilePath: where the modified csv files should be saved
- dbname: the name of the database you want to connect to
- dbuser: the user that you want to connect with to the database
- dbpassword: the password for the user
- host: the host address (if running locally use localhost)
- port: the port that the database is exposed on

In [57]:
years = [2000, 2001]
stationsFilePath = "./data/stations/"
modifiedStationsFilePath = "./data/modifiedStations/"
downloadCSVFilePath = "./data/NCEI/ghcn/daily/"
modifiedCSVFilePath = "./data/NCEI/modified/daily/"
dbname = "mydatabase"
dbuser = "myuser"
dbpassword = "mypassword"
host = "localhost"
port = "5432"


# Constants

Don't change these unless you know what you are doing
- httpsLink: The https link to the ncei.noaa folder that contains the zipped daily weather data files



In [58]:
httpsLink = "https://www.ncei.noaa.gov/pub/data/ghcn/daily/by_year/"
possible_years = np.arange(1949, 2020, 1)


# Imports

# Function Definitions for reusability

In [89]:
## These are the Functions to download the data from the website and convert them


def download_stations(file_path_dest="./data/stations/"):
    """
    This function downloads the stations information (readme and ghcnd-stations.txt file) from the NCEI website and saves it in the specified directory
    
    :param file_path_dest: directory where the stations information will be saved
    :return: 
    """
    url = "https://www.ncei.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt"
    response = requests.get(url)
    if response.ok:
        filename = url.rsplit('/', 1)[1]
        print(f"data downloaded. Will be saved as {filename} in {file_path_dest}")
        os.makedirs(file_path_dest, exist_ok=True)
        with open(f"{file_path_dest}{filename}", "wb") as f:
            f.write(response.content)
    else:
        print("An error occured while trying to retrieve the data from the internet.")

    url = "https://www.ncei.noaa.gov/pub/data/ghcn/daily/readme.txt"
    response = requests.get(url)
    if response.ok:
        filename = url.rsplit('/', 1)[1]
        print(f"data downloaded. Will be saved as {filename} in {file_path_dest}")
        os.makedirs(file_path_dest, exist_ok=True)
        with open(f"{file_path_dest}{filename}", "wb") as f:
            f.write(response.content)
    else:
        print("An error occured while trying to retrieve the data from the internet.")

    print("Data downloaded and saved in", file_path_dest)
    return


def convert_stations(file_path="./data/stations/", file_path_dest="./data/modifiedStations/"):
    """
    This converts the ghcnd-stations.txt file to a csv file that we can load into the database
    
    :param file_path: The file path where the original ghcnd-stations.txt file is located
    :param file_path_dest: The path where the modified station file should be saved
    :return: 
    """

    # two helper functions
    def conv_str(x):
        return str(x)

    def conv_float(x):
        return float(x)

    # we get this information from the readme.txt file
    column_specs = [
        (0, 11),  # ID
        (12, 20),  # LATITUDE
        (21, 30),  # LONGITUDE
        (31, 37),  # ELEVATION
        (38, 40),  # STATE
        (41, 71),  # NAME
        (72, 75),  # GSN FLAG
        (76, 79),  # HCN/CRN FLAG
        (80, 85)  # WMO ID
    ]
    column_names = [
        'ID', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 'STATE', 'NAME',
        'GSN FLAG', 'HCN/CRN FLAG', 'WMO ID'
    ]

    col_conv = {'ID': conv_str,
                'LATITUDE': conv_float,
                'LONGITUDE': conv_float,
                'ELEVATION': conv_float,
                'STATE': conv_str,
                'NAME': conv_str,
                'GSN FLAG': conv_str,
                'HCN/CRN FLAG': conv_str,
                'WMO ID': conv_str,
                }

    path_to_file = f"{file_path}ghcnd-stations.txt"
    df = pd.read_fwf(path_to_file, colspecs=column_specs, names=column_names, converters=col_conv)
    rename_columns = {
        'ID': 'code',
        'LATITUDE': 'lat',
        'LONGITUDE': 'lon',
        'ELEVATION': 'elevation',
        'STATE': 'state',
        'NAME': 'name',
        'GSN FLAG': 'flag1',
        'HCN/CRN FLAG': 'flag2',
        'WMO ID': 'wmo_id'
    }

    df.rename(columns=rename_columns, inplace=True)

    df['wmo_id'] = df['wmo_id'].astype(str)
    df.replace(['None', 'nan'], np.nan, inplace=True)
    df = df.fillna('')

    os.makedirs(file_path_dest, exist_ok=True)
    df.to_csv(f"{file_path_dest}/modified_stations.csv", index=False)
    print(f"saved the modified stations file to {file_path_dest}/modified_stations.csv")
    return


def download_years(array_of_years=None, file_path="./data/climate/script"):
    """
    This function downloads the specified years from the NCEI website and saves them in a csv file in the specified directory
    
    :param array_of_years: an array of years to download
    :param file_path: the directory to save the csv files
    :return: 
    """

    if array_of_years is None:
        array_of_years = [1994]

    for year in array_of_years:
        print(f"...Downloading data from year {year}....")
        url = f"https://www.ncei.noaa.gov/pub/data/ghcn/daily/by_year/{year}.csv.gz"
        response = requests.get(url)
        if response.ok:
            filename = url.rsplit('/', 1)[1]
            print(f"data downloaded. Will be saved as {filename}")
            directory_path = file_path
            os.makedirs(directory_path, exist_ok=True)
            with open(f"{file_path}{filename}", "wb") as f:
                f.write(response.content)

        else:
            print("An error occured while trying to retrieve the data from the internet.")
        print(f"Data from year {year} downloaded and saved.")

    return


def export_downloaded_years(array_of_years=None, file_path='./data/NCEI/ghcn/daily/',
                            file_path_dest="./data/NCEI/modified/daily/"):
    """
    This imports the specified years from the specified source path modifies it and exports them as a csv file in the specified destination directory
    
    :param array_of_years: the years to load and modify
    :param file_path: the directory where the source csv.gz files can be found
    :param file_path_dest: the directory where the modified csv files should be saved
    :return: 
    """

    if array_of_years is None:
        array_of_years = [1994]
    columns = ["stationcode", "datelabel", "param", "value", "mflag", "qflag", "sflag", "time"]
    for year in array_of_years:

        print(f"...Year {year} processing...")
        try:
            print(f"Loading data from year {year}")
            file_path_source = f"{file_path}{year}.csv.gz"
            df = pd.read_csv(file_path_source, names=columns, compression="gzip")
            print(f"Data from year {year} loaded.")
            
            #convert time to int
            df['time'] = df['time'].fillna(1200)
            df['time'] = df['time'].apply(lambda x: int(Decimal(x))) 
            # convert values to float
            df = df.astype({"value": "float32"})

            # cleanse dataset: keep only the parameters of interest, i.e. TMIN, TMAX, PRCP, SNOW
            keep = ["TMIN", "TMAX", "PRCP", "SNOW"]

            df = df[df["param"].isin(keep)]

            scaling_factors = {"TMIN": 0.1, "TMAX": 0.1, "PRCP": 0.1}

            for k, v in scaling_factors.items():
                df.loc[df["param"] == k, "value"] *= v

            df = df.fillna('')

            os.makedirs(file_path_dest, exist_ok=True)
            df.to_csv(f"{file_path_dest}/modified_{year}.csv", index=True, header=False)
            print(f"Export of year {year} finished.")

        except Exception as error:
            print(f"Error: {error}")

    return

In [83]:
## These are the functions that establish the connection to the database and can be used to load the modified data into the database

def connect_to_db(db_name="mydatabase", db_user="myuser", db_password="mypassword", db_host="localhost",
                  db_port="5432"):
    """
    This creates a connection to an existing posgresql database using the provided parameteers
    
    :param db_name: the name of the database
    :param db_user: the user that you want to connect with
    :param db_password: the password of the user
    :param db_host: the host of the database
    :param db_port: the port of the database
    :return: connection, cursor if connection is successful
    """

    try:
        connection = psycopg2.connect(
            dbname=db_name,
            user=db_user,
            password=db_password,
            host=db_host,  # Connect to the host where Docker is running
            port=db_port
        )
        cursor = connection.cursor()
        print(f"Connected to database {db_name} with user {db_user} with password {db_password}")

        return connection, cursor

    except Exception as error:
        print(f"Error: {error}")

    return


def insert_copy(file_path="", table_name="", columns=None):
    """
    Inserts the specified csv file into the specified table
    
    :param file_path: the path to the csv file to be inserted into the database
    :param table_name: the name of the table that we want to insert the data into
    :param columns: the columns of the file
    :return: 
    """
    if columns is None:
        columns = []

    connection, cursor = connect_to_db(dbname, dbuser, dbpassword, host, port)
    print("Connection established.")

    try:
        
        copy_command = sql.SQL("""
            COPY {table} ({columns}) FROM STDIN WITH CSV HEADER
        """).format(
            table=sql.Identifier(table_name),
            columns=sql.SQL(',').join(map(sql.Identifier, columns))
        )
        
        print(f"Copying file {file_path} to database {table_name}")
        with open(file_path, 'r') as file:
            cursor.copy_expert(copy_command, file)

        print(f"Insert with copy of file {file_path} to table: {table_name} done.")

        # Commit changes
        connection.commit()

        print(f"Checking if the data was succesfully inserted into the Table.")
        cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
        row_count = cursor.fetchone()[0]
        print(f"Row count after insertion in table {table_name}: {row_count}")

    except Exception as error:
        print(f"Error: {error}")
        if connection:
            connection.rollback()

    finally:
        cursor.close()
        connection.close()

    return


def create_stations_table():
    """
    Creates a Station Table in the database
    
    :return: 
    """
    # establish connection to database
    connection, cursor = connect_to_db(dbname, dbuser, dbpassword, host, port)
    print("Connection established.")

    name = "Station"

    # createe Station table if not exist
    try:
        # Define table name and column names
        table_name = sql.Identifier(name)
        columns = [
            sql.Identifier("id"), sql.Identifier("latitude"), sql.Identifier("longitude"),
            sql.Identifier("elevation"), sql.Identifier("state"), sql.Identifier("name"),
            sql.Identifier("gsn_flag"), sql.Identifier("hcn_crn_flag"), sql.Identifier("wmo_id")
        ]

        # Define column types
        column_types = [
            sql.SQL("VARCHAR(100) PRIMARY KEY"), sql.SQL("FLOAT"), sql.SQL("FLOAT"),
            sql.SQL("FLOAT"), sql.SQL("VARCHAR(100)"), sql.SQL("VARCHAR(100)"),
            sql.SQL("VARCHAR(100)"), sql.SQL("VARCHAR(100)"), sql.SQL("VARCHAR(100)")
        ]

        # Create the SQL command
        create_table_command = sql.SQL('''
            CREATE TABLE IF NOT EXISTS {table} (
                {id} {id_type},
                {latitude} {latitude_type},
                {longitude} {longitude_type},
                {elevation} {elevation_type},
                {state} {state_type},
                {name} {name_type},
                {gsn_flag} {gsn_flag_type},
                {hcn_crn_flag} {hcn_crn_flag_type},
                {wmo_id} {wmo_id_type}
            )
        ''').format(
            table=table_name,
            id=columns[0], id_type=column_types[0],
            latitude=columns[1], latitude_type=column_types[1],
            longitude=columns[2], longitude_type=column_types[2],
            elevation=columns[3], elevation_type=column_types[3],
            state=columns[4], state_type=column_types[4],
            name=columns[5], name_type=column_types[5],
            gsn_flag=columns[6], gsn_flag_type=column_types[6],
            hcn_crn_flag=columns[7], hcn_crn_flag_type=column_types[7],
            wmo_id=columns[8], wmo_id_type=column_types[8]
        )

        # Execute the command
        cursor.execute(create_table_command)

        # Commit changes
        connection.commit()
        print(f"{table_name} table created.")

    except Exception as error:
        print(f"Error: {error}")
        if connection:
            connection.rollback()
    finally:
        cursor.close()
        connection.close()

    return


def create_climate_table(year):
    """
    Creates a Climate table for the specified year
    
    :param year: int value of the year
    :return: 
    """
    connection, cursor = connect_to_db(dbname, dbuser, dbpassword, host, port)
    print("Connection established.")

    table_name = f"Climate{year}"

    try:

        # Construct the CREATE TABLE command using psycopg2.sql
        create_table_command = sql.SQL('''
            CREATE TABLE IF NOT EXISTS {table} (
                id SERIAL PRIMARY KEY,
                stationcode VARCHAR(50) REFERENCES Station(id),
                datelabel DATE,
                param VARCHAR(10),
                value FLOAT,
                mflag VARCHAR(10),
                qflag VARCHAR(10),
                sflag VARCHAR(10),
                time CHAR(4) 
            )
        ''').format(
            table=sql.Identifier(table_name)
        )

        # Execute the CREATE TABLE command
        cursor.execute(create_table_command)
        print(f"Table {table_name} created successfully.")

        # Commit the transaction
        connection.commit()

    except Exception as error:
        print(f"Error: {error}")
        if connection:
            connection.rollback()

    finally:
        if cursor:
            cursor.close()
        if connection:
            connection.close()

    return table_name


def drop_table(table_name):
    """
    Drops the specified table from the database
    
    :param table_name: The name of the table to bee dropped
    :return: 
    """
    connection, cursor = connect_to_db(dbname, dbuser, dbpassword, host, port)

    try:
        drop_table_command = sql.SQL("DROP TABLE IF EXISTS {}").format(sql.Identifier(table_name))
        cursor.execute(drop_table_command)
        connection.commit()
        print(f"Table {table_name} dropped successfully.")
    except Exception as error:
        print(f"Error: {error}")
        if connection:
            connection.rollback()
    finally:
        cursor.close()
        connection.close()

    return


def count_rows(table_name):
    """
    Prints the count of rows in the specified table.
        
    :param table_name: 
    :return: 
    """
    # connect to database
    connection, cursor = connect_to_db(dbname, dbuser, dbpassword, host, port)
    print("Connection established.")
    try:
        # Construct the COUNT query using psycopg2.sql
        count_command = sql.SQL("SELECT COUNT(*) FROM {table}").format(
            table=sql.Identifier(table_name)
        )

        # Execute the COUNT query
        cursor.execute(count_command)
        row_count = cursor.fetchone()[0]

        # Print the row count
        print(f"The table {table_name} contains {row_count} rows.")

    except Exception as error:
        print(f"Error: {error}")
        if connection:
            connection.rollback()

    finally:
        if cursor:
            cursor.close()
        if connection:
            connection.close()

    return



# Code Execution Example

## first adding the stations

first adding the stations

In [84]:
download_stations(stationsFilePath)

KeyboardInterrupt: 

In [None]:
convert_stations(stationsFilePath, modifiedStationsFilePath)

In [69]:
create_stations_table()

Connected to database mydatabase with user myuser with password mypassword
Connection established.
Identifier('Station') table created.


In [7]:
file_path = f"{modifiedStationsFilePath}modified_stations.csv"
cols = ["id", "latitude", "longitude", "elevation", "state", "name", "gsn_flag", "hcn_crn_flag", "wmo_id"]
insert_copy(file_path, "Station", cols)

Connected to database mydatabase with user myuser with password mypassword
Connection established.
Error: duplicate key value violates unique constraint "Station_pkey"
DETAIL:  Key (id)=(ACW00011604) already exists.
CONTEXT:  COPY Station, line 2



In [8]:
count_rows("Station")

Connected to database mydatabase with user myuser with password mypassword
Connection established.
The table Station contains 125988 rows.


## Adding some example years

now we are adding some years

In [87]:
download_years(years, downloadCSVFilePath)

...Downloading data from year 2000....


KeyboardInterrupt: 

In [90]:
export_downloaded_years(years, downloadCSVFilePath, modifiedCSVFilePath)

...Year 2000 processing...
Loading data from year 2000
Data from year 2000 loaded.
Export of year 2000 finished.
...Year 2001 processing...
Loading data from year 2001
Data from year 2001 loaded.
Export of year 2001 finished.


In [92]:
cols = ["id", "stationcode", "datelabel", "param", "value", "mflag", "qflag", "sflag", "time"]

for year in years:
    table_name = create_climate_table(year)
    insert_copy(file_path=f"{modifiedCSVFilePath}/modified_{year}.csv", table_name=table_name, columns=cols)


Connected to database mydatabase with user myuser with password mypassword
Connection established.
Table Climate2000 created successfully.
Connected to database mydatabase with user myuser with password mypassword
Connection established.
Copying file ./data/NCEI/modified/daily//modified_2000.csv to database Climate2000
Insert with copy of file ./data/NCEI/modified/daily//modified_2000.csv to table: Climate2000 done.
Checking if the data was succesfully inserted into the Table.
Error: relation "climate2000" does not exist
LINE 1: SELECT COUNT(*) FROM Climate2000
                             ^

Connected to database mydatabase with user myuser with password mypassword
Connection established.
The table Climate2000 contains 23914226 rows.
Connected to database mydatabase with user myuser with password mypassword
Connection established.
Table Climate2001 created successfully.
Connected to database mydatabase with user myuser with password mypassword
Connection established.
Copying file ./da

In [93]:
count_rows("Climate2000")

Connected to database mydatabase with user myuser with password mypassword
Connection established.
The table Climate2000 contains 23914226 rows.


In [94]:
count_rows("Climate2001")

Connected to database mydatabase with user myuser with password mypassword
Connection established.
The table Climate2001 contains 23913783 rows.


some additional stuff that is temporarily here


In [91]:
for year in years:
    drop_table(f"Climate{year}")

Connected to database mydatabase with user myuser with password mypassword
Table Climate2000 dropped successfully.
Connected to database mydatabase with user myuser with password mypassword
Table Climate2001 dropped successfully.


In [72]:
columns = ["stationcode", "datelabel", "param", "value", "mflag", "qflag", "sflag", "time"]

df = pd.read_csv(f"{downloadCSVFilePath}2000.csv.gz", names=columns, compression="gzip")


In [73]:
df.head(50)

Unnamed: 0,stationcode,datelabel,param,value,mflag,qflag,sflag,time
0,AE000041196,20000101,TMAX,278,,,I,
1,AE000041196,20000101,TMIN,121,,,I,
2,AE000041196,20000101,TAVG,186,H,,S,
3,AEM00041194,20000101,TMAX,251,,,S,
4,AEM00041194,20000101,TMIN,135,,,S,
5,AEM00041194,20000101,TAVG,194,H,,S,
6,AEM00041217,20000101,TMAX,261,,,S,
7,AEM00041217,20000101,TMIN,130,,,S,
8,AEM00041217,20000101,TAVG,194,H,,S,
9,AEM00041218,20000101,TMAX,261,,,S,
