In [1]:
# Import required libraries
import csv
import os
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
import pymysql
import feather
from urllib.request import urlopen
import json

# Station Details

## Objective

- Clean the station details data
- Enrich the data by adding the weather station location details (e.g. city, state, country etc)
- Load the data into the MySQL

## Load Data

In [340]:
station_details = pd.read_csv("/Users/todddequincey/globalwarming/data/station_details.csv")
geo_info = feather.read_dataframe("/Users/todddequincey/globalwarming/data/geo_info.feather")


### Append Geolocation Details

In [141]:
# Read in API key
key = open("/Users/todddequincey/Desktop/api_key.txt","r")
key = key.readline()

FileNotFoundError: [Errno 2] No such file or directory: '/Users/todddequincey/Desktop/api_key.txt'

In [341]:
# Get the location of each weather station
def getlocation(station_id, lat, lon):
    # Set return vars to none in case of errors. Return None to preserve indexing for joining to df
    station_id = station_id
    street_number = None
    street_name = None
    locality = None
    region1 = None
    region2 = None
    country = None
    post_code = None
    formatted_address = None
    lat = lat
    lon = lon
    new_lat = None
    new_lon = None
    results = {}
    
    # Try to get the geocoded data from Google Maps API
    try:
        url = "https://maps.googleapis.com/maps/api/geocode/json?"
        url += "latlng=%s,%s&sensor=false&key=%s" % (lat, lon, key)
        v = urlopen(url).read()
        j = json.loads(v)

        if j['status'] == 'ZERO_RESULTS':
            # pass to store None results
            pass
        else: 
            address_components = j['results'][0]    
            # Check if individual address elements exist and update vars
            for c in address_components['address_components']:
                if "street_number" in c['types']:
                    street_number = c['long_name']
                if "route" in c['types']:
                    street_name = c['long_name']
                if "locality" in c['types']:
                    locality = c['long_name']
                if "administrative_area_level_1" in c['types']:
                    region1 = c['long_name']            
                if "administrative_area_level_2" in c['types']:
                    region2 = c['long_name']                        
                if "country" in c['types']:
                    country = c['long_name']
                if "postal_code" in c["types"]:
                    post_code = c['long_name']
                    
            # Capture formatted address 
            formatted_address = j['results'][0]['formatted_address']   

            # Capture exact rooftop lat and lon used by Google Maps API
            new_lat = j['results'][0]['geometry']['location']['lat']
            new_lon = j['results'][0]['geometry']['location']['lng']

        # Create dictionary of results
        for i in ('station_id',
                  'street_number', 
                  'street_name', 
                  'locality', 
                  'region1', 
                  'region2', 
                  'country', 
                  'post_code', 
                  'formatted_address', 
                  'lat', 
                  'lon',
                  'new_lat', 
                  'new_lon'):
            results[i] = locals()[i]

        return results
    
    # Set fields to None if any errors and return
    except:
        # Create dictionary of results
        for i in ('station_id',
                  'street_number', 
                  'street_name', 
                  'locality', 
                  'region1', 
                  'region2', 
                  'country', 
                  'post_code', 
                  'formatted_address', 
                  'lat', 
                  'lon',
                  'new_lat', 
                  'new_lon'):
            results[i] = locals()[i]

        return results

In [342]:
# Number of weather stations at lat lon (0,0)
len(station_details[((station_details['Lat'] == 0) & (station_details['Lon'] == 0))])

379

In [343]:
# Remove any rows where lat and lon are both 0 (missing data)
station_details = station_details[~((station_details['Lat'] == 0) & (station_details['Lon'] == 0))]

In [344]:
# Number of weather stations with missing lat or lon (np.NaN)
len(station_details[((station_details['Lat'].isnull()) | (station_details['Lon'].isnull()))])

1205

In [345]:
# Remove any rows which missing lat and lon details (np.NaN)
station_details = station_details[~((station_details['Lat'].isnull()) | (station_details['Lon'].isnull()))]
station_details = station_details.reset_index()

In [346]:
# Column names for dataframe
col_names = ['station_id',
             'street_number',
             'street_name',
             'locality',
             'region1',
             'region2',
             'country',
             'post_code',
             'formatted_address',
             'lat',
             'lon',
             'new_lat',
             'new_lon']

# Create an empty dataframe to store the results
#geo_info = pd.DataFrame(columns=col_names)

# Get geoinfo for each weather station and append to geo_info
#for i in range(0,len(station_details)):
#    results = getlocation(station_details.loc[i, 'StationId'], station_details.loc[i, 'Lat'], station_details.loc[i, 'Lon'])
#    geo_info = geo_info.append(results, ignore_index=True)

In [347]:
# Save geoinfo results to feather for future use
#feather.write_dataframe(geo_info, "/Users/todddequincey/globalwarming/data/geo_info.feather")

In [348]:
len(station_details.StationId.unique())

28151

In [349]:
len(geo_info.station_id.unique())

28151

In [350]:
# Left join the Google API weather station data to the original dataset
station_details = pd.concat([station_details, geo_info], axis=1)

In [351]:
# Fill missing values with np.nan
station_details = station_details.fillna(value=pd.np.nan)

In [352]:
# Create a list of the unique weather stations which have missing locality (city) or Country location information
null_records = station_details[station_details['locality'].isnull() | 
                              station_details['country'].isnull()]

In [353]:
# Count of weather stations with missing data
len(null_records['StationId'].unique())

9830

In [354]:
# Summarise the country code of weather stations with missing data (more than 100 missing values)
results = null_records.groupby('CountryId').count()
results = results.loc[results['StationId'] >100,'StationId']
results

CountryId
AR     125
AY     199
BR     918
CA     605
CH     102
FI     134
ID     162
IT     176
KZ     191
NO     396
RS    1068
SW     440
TU     169
UK     578
US     867
Name: StationId, dtype: int64

In [355]:
# Convert weather station id to int for all stations with missing location info
# NOTE: To be removed from the weather station records if they exist
stn_no_loc = null_records.StationId
stn_no_loc = stn_no_loc.unique().astype(int)

In [356]:
# Remove all weather stations with missing locality (city) or country location information from the dataset
# Note: Should have done this BEFORE collecting the location data from the Google API
station_details = station_details[station_details['locality'].notnull()] 
station_details = station_details[station_details['country'].notnull()]

In [357]:
# Reset the index the df
station_details = station_details.reset_index()
#station_details = station_details.reindex()

In [358]:
# Remove unnecessary columns
keep_cols = ['StationId', 'StationName', 'Elevation', 'StartDate', 'EndDate', 'locality', 'region1', 'country', 'new_lat', 'new_lon']
station_details = station_details.loc[:,keep_cols]

In [359]:
# Rename the columns
station_details.columns = ['StationId', 'StationName', 'Elevation', 'StartDate', 'EndDate', 'City', 'State', 'Country', 'Lat', 'Lon']

In [360]:
# Save the cleaned dataset to feather format
feather.write_dataframe(station_details, "/Users/todddequincey/globalwarming/data/station_details.feather")

# Station Records

## Objective

- Combine individual weather station files into single annual files
- Consolidate the daily weather recordings into more meaningful monthly averages
- Complete any required data cleaning 
- Load the data into the MySQL

As the size of the monthly summarised data is expected to fit in memory, the data will be saved into a single dataframe before being imported into MySQL.

If this is not the case, each year of data will be loaded into MySQL separately. 

## Load Data

### Functions

In [361]:
# Get the names of all files in the dir
def create_annual_file(folder_path, year):
    '''
        For all of the CSV files in a given folder, combine all of the CSVs into a single annual file
        
        folder_path = String. The folder path where each of the annual folders are saved
        year = String. The calendar year file being created.
        
        returns a DataFrame
    '''
    # Full path name
    full_path = folder_path + year
    
    # Get file names in dir and convert to a list
    file_names = list(os.listdir(full_path))
    
    # Calculate the number of files in the dir
    no_files = len(file_names)
    
    # Combine the CSVs
    combined_file = pd.concat([pd.read_csv(str(full_path + "/" + f), dtype={'STATION': int}) for f in file_names])
    
    # Return combined_csv df
    return combined_file

In [362]:
# Clean the CSV
def clean_files(df):
    '''
        Clean annual files
    
        df = dataframe returned from the combined_csv function. 
        
        returns a DataFrame
    '''
    
    # Create the station_records df 
    df = df.drop(labels = ['LATITUDE', 
                           'LONGITUDE', 
                           'NAME', 
                           'ELEVATION', 
                           'PRCP_ATTRIBUTES', 
                           'TEMP_ATTRIBUTES', 
                           'DEWP_ATTRIBUTES', 
                           'SLP_ATTRIBUTES', 
                           'STP_ATTRIBUTES', 
                           'VISIB_ATTRIBUTES', 
                           'WDSP_ATTRIBUTES', 
                           'MAX_ATTRIBUTES', 
                           'MIN_ATTRIBUTES', 
                           'PRCP_ATTRIBUTES', 
                           'FRSHTT'], 
                 axis=1)

    # Replace all 999.9 and 9999.9 missing values with np.NaN
    df = df.replace(999.9, np.NaN)
    df = df.replace(9999.9, np.NaN)    
    
    # Rename the columns 
    df.columns = ['StationId', 
                  'Date', 
                  'Temp', 
                  'Dew', 
                  'SLP', 
                  'StationPressure', 
                  'Visib', 
                  'WindSpeed', 
                  'MaxWindSpeed', 
                  'Gust', 
                  'MaxTemp', 
                  'MinTemp', 
                  'Precip', 
                  'SnowDepth']

    return df

In [363]:
# Calculate monthly averages, min and max
def calc_monthly(df):
    # Calculate and add the Year and Month columns to the df
    df['Year'] = pd.DatetimeIndex(df['Date']).year
    df['Month'] = pd.DatetimeIndex(df['Date']).month

    # Drop the date column
    df = df.drop(labels='Date', axis=1)
    
    # Group the rows by StationId, year and month 
    df_grouped = df.groupby(by=['StationId','Year','Month'])

    # Calculate the avg, min and max fields and a field to count how many records are included in each calc
    df_avg = df_grouped.mean()
    df_min = df_grouped.min()
    df_max = df_grouped.max()
    df_count = df_grouped.size().to_frame()

    # Left join df_max, df_min and df_count
    df = df_avg.join(df_max.loc[:, ['MaxTemp', 'MinTemp']], rsuffix='_max')
    df = df.join(df_min.loc[:, ['MaxTemp','MinTemp']], rsuffix='_min')
    df = df.join(df_count)
    
    # Rename Min and Max temp column variants
    col_names = list(df.columns)
    col_names[8] = "AvgMaxTemp"
    col_names[9] = "AvgMinTemp"
    col_names[12] = "MaxTemp"
    col_names[13] = "MaxMinTemp"
    col_names[14] = "MinMaxTemp"
    col_names[15] = "MinTemp"
    col_names[16] = "NumberDailyRecords"
    df.columns = col_names

    # Reset the index of the df so it is no longer grouped
    df = df.reset_index()

    return df

In [364]:
# Run all functions
def run(folder_path, start_year, end_year):
    '''
        Converts individual weather station records into annual summary/file/.
        Cleans the annual summary.
        Calculates monthly summary level data
        
        file_path = String. Path to where all of the folders for each year are saved.
        start_year = Int. Start year to combine data
        end_year = Int. End year to combine data
        
        returns pd.DataFrame
    '''    
    
    # Dataframe to store the results
    results = pd.DataFrame()
    
    # Create range of years
    years = range(start_year, end_year + 1)
    
    for year in years:
        try:
            # Create annual file
            print("Creating annual file for {}...".format(year))
            df = create_annual_file(folder_path, str(year))
            print("{} file created".format(year))

            # Clean annual file
            print("Cleaning {} file...".format(year))
            df = clean_files(df)
            print("{} file cleaned".format(year))    

            # Convert the data into monthly summary
            print("Converting daily records in {} file into monthly summary...".format(year))
            df = calc_monthly(df)
            print("{} converted into monthly summary".format(year))        

            # Append the results to the dataframe
            results = results.append(df)
            print("-----------------------")
        
        # Catch all except clause for any errors
        except Exception as e:
            print("{} error with {}".format(e, year))
            print("-----------------------")

    # Reset the index
    results = results.reset_index(drop=True)
    
    # Print success message
    print("Finished loading data.")
    
    return results

### Read and Save Data

#### Load Pre-Saved Data into Jupyter

In [387]:
# Load data from feather format (fastest read and write format)
station_records = feather.read_dataframe("/Users/todddequincey/globalwarming/data/GSOD/1960-2018_monthly_full.feather")


#### Read in the raw data files

In [388]:
# Run all of the functions and save the df
#station_records = run("/Users/todddequincey/globalwarming/data/GSOD/", 2012, 2018)

# Round the decimals to three places
#station_records = station_records.round(decimals=3)

### Further Cleaning 
Further cleaning is considered necessary to address the issues associated with high level of daily and/or monthly missing weather station records.

#### Missing Daily Records
Any monthly with missing weather station records of 5% or more are to be removed from the dataset. 

In [389]:
# Number of records before data cleaning
len(station_records)

5122512

In [390]:
# Set the number of days in each month (leap year excluded for simplicity)
months_31 = [1,3,5,7,8,10,12]
months_30 = [4,6,9,11]
months_28 = [2]

# Set minimum number of daily records required in each month
days_31 = int(round(31 * 0.95,0))
days_30 = int(round(30 * 0.95,0))
days_28 = int(round(28 * 0.95,0))

In [391]:
# Filter the dataframe to ensure no more than 5% missing values in any monthly average
station_records = station_records[
    # Months with 31 days
    ((station_records['Month'].isin(months_31)) & 
    (station_records['NumberDailyRecords'] >= days_31)) |

    # Months with 30 days
    ((station_records['Month'].isin(months_30)) & 
    (station_records['NumberDailyRecords'] >= days_30)) |
    
    # Months with 28 days
    ((station_records['Month'].isin(months_28)) & 
    (station_records['NumberDailyRecords'] >= days_28))
]

In [392]:
# Number of remaining records
len(station_records)

3907240

#### Missing Monthly Records
Any weather station with more than 5% of months which are missing are to be removed from the dataset. 

In [393]:
# Threshold for number of months ()
month_thres = int(round(12 * 0.95,0))
month_thres

11

In [394]:
# Count number of months in each year for each weather station
counter = station_records.groupby(['StationId', 'Year']).count().reset_index()

# Remove redundant columns
counter = counter[["StationId", "Year", "Month"]]

# Number of weather stations which meet the threshold
len(counter[counter['Month'] >= month_thres])

272835

In [395]:
# Add the month counter to the df
station_records = pd.merge(station_records, counter, how="left", on=["StationId", "Year"], suffixes=('','_count'))

In [396]:
# Only keep records which meet the threshold
station_records = station_records[station_records['Month_count'] >= month_thres]

In [397]:
# Drop Month_count columns
station_records = station_records.drop(columns = "Month_count")

In [398]:
# List of weather stationId which meet the threshold
len(station_records)

3227527

# Weather Station Intersection between Datasets

Calculate intersection of weather stations of station_details and station_records

In [399]:
# Unique stations in each dataset
records = set(station_records.StationId.unique().astype(int))
details = set(station_details.StationId.unique().astype(int))

# Intersection of weather stations in both datasets
intersection = records.intersection(details)

In [400]:
# Only keep weather stations in the intersection of the datasets
station_records = station_records[station_records.StationId.isin(intersection)]
station_details = station_details[station_details.StationId.isin(intersection)]

In [401]:
# Number of unique weather stations which remain
len(station_records.StationId.unique())

12068

#### Save the data to feather format

In [402]:
# Save the dataframe to feather format (fastest to read and write for future use in Python)
feather.write_dataframe(station_records, "/Users/todddequincey/globalwarming/data/GSOD/1960-2018_monthly_cleaned.feather")
feather.write_dataframe(station_details, "/Users/todddequincey/globalwarming/data/station_details_cleaned.feather")

## Load Data to MySQL

In [381]:
# Create connection to the database
engine = create_engine('mysql+pymysql://root:@localhost/globalwarming')

In [382]:
# Print progress message
print("Loading station_details data into MySQL...")

# Export data into MySQL
station_details.to_sql(con=engine,
            name='StationDetails', 
            if_exists = 'append', 
            index=False)

# Print success message
print("station_details data loaded to MySQL")

Loading station_details data into MySQL...
station_details data loaded to MySQL


In [403]:
# Print progress message
print("Loading station_records data into MySQL...")

# Export data into MySQL
station_records.to_sql(con=engine,
            name='StationRecords', 
            if_exists = 'append', 
            index=False)

# Print success message
print("station_records data loaded to MySQL")

Loading station_records data into MySQL...
station_records data loaded to MySQL


### Daily Data
Calculate the daily average for individual weather stations. Three year average based on the first three years in the decade (e.g. 1960, 1961, 1962). Calculated for the 60s, 70s and 80s.

In [129]:
# Calculate daily averages
def calc_daily_avg(df, year):
    # Calculate and add the Month and Day columns to the df
    df['Year'] = year
    df['Month'] = pd.DatetimeIndex(df['Date']).month
    df['Day'] = pd.DatetimeIndex(df['Date']).day

    # Drop the date column
    df = df.drop(labels='Date', axis=1)

    # Drop any rows for 29 Feb to avoid leap year issues
    df = df[~((df['Month'] == 2) & (df['Day'] == 29))]
    
    # Group the rows by StationId, month and day
    df_grouped = df.groupby(by=['StationId', 'Month', 'Day'])

    # Calculate the avg 
    df = df_grouped.mean()

    # Reset the index of the df so it is no longer grouped
    df = df.reset_index()

    # Re-construct the date for the average
    df['Date'] = pd.to_datetime(df.loc[:,['Year', 'Month', 'Day']])

    # Drop unneeded columns
    df = df.drop(labels=['Year', 
                         'Month', 
                         'Day', 
                         'Dew', 
                         'SLP', 
                         'StationPressure', 
                         'Visib',
                         'WindSpeed', 
                         'MaxWindSpeed', 
                         'Gust',
                         'Precip',
                         'SnowDepth'], 
                 axis=1)

    # Reorder the columns
    new_order = [0,4,1,2,3]
    df = df[df.columns[new_order]]

    return df

In [84]:
# Run all functions
def run_daily(folder_path, start_year, end_year):
    '''
        Converts individual weather station records into annual summary/file/.
        Cleans the annual summary.
        Calculates individual weather station daily average for the first three year (e.g. daily average of 1960, 1961 and 1962).
        
        file_path = String. Path to where all of the folders for each year are saved.
        start_year = Int. Start year to combine data
        end_year = Int. End year to combine data
        
        returns pd.DataFrame
    '''    
    
    # Dataframe to store the results
    results = pd.DataFrame()
    
    # Create range of years
    years = range(start_year, end_year + 1)
    
    for year in years:
        try:
            # Create annual file
            print("Creating annual file for {}...".format(year))
            df = create_annual_file(folder_path, str(year))
            print("{} file created".format(year))

            # Clean annual file
            print("Cleaning {} file...".format(year))
            df = clean_files(df)
            print("{} file cleaned".format(year))    

            # Append the results to the dataframe
            results = results.append(df)
            print("-----------------------")
        
        # Catch all except clause for any errors
        except Exception as e:
            print("{} error with {}".format(e, year))
            print("-----------------------")

    # Reset the index
    results = results.reset_index(drop=True)
    
    # Print success message
    print("Finished loading data.")
    
    return results

In [88]:
# Read in data
data_60 = run_daily("/Users/todddequincey/Downloads/", 1960, 1962)
data_70 = run_daily("/Users/todddequincey/Downloads/", 1970, 1972)
data_80 = run_daily("/Users/todddequincey/Downloads/", 1980, 1982)

Creating annual file for 1960...
1960 file created
Cleaning 1960 file...
1960 file cleaned
-----------------------
Creating annual file for 1961...
1961 file created
Cleaning 1961 file...
1961 file cleaned
-----------------------
Creating annual file for 1962...
1962 file created
Cleaning 1962 file...
1962 file cleaned
-----------------------
Finished loading data.
Creating annual file for 1970...
1970 file created
Cleaning 1970 file...
1970 file cleaned
-----------------------
Creating annual file for 1971...
1971 file created
Cleaning 1971 file...
1971 file cleaned
-----------------------
Creating annual file for 1972...
1972 file created
Cleaning 1972 file...
1972 file cleaned
-----------------------
Finished loading data.
Creating annual file for 1980...
1980 file created
Cleaning 1980 file...
1980 file cleaned
-----------------------
Creating annual file for 1981...
1981 file created
Cleaning 1981 file...
1981 file cleaned
-----------------------
Creating annual file for 1982...
1

In [131]:
# Calculate the daily averages
df_60 = calc_daily_avg(data_60, 1960)
df_70 = calc_daily_avg(data_70, 1970)
df_80 = calc_daily_avg(data_80, 1980)

In [132]:
# Combine into a single dataset
daily_avg = pd.concat([df_60, df_70, df_80])

In [197]:
# Round to 2 decimals
daily_avg['Temp'] = round(daily_avg['Temp'], 2)
daily_avg['MaxTemp'] = round(daily_avg['MaxTemp'],2)
daily_avg['MinTemp'] = round(daily_avg['MinTemp'],2)

In [198]:
# Save results to feather for future use
#feather.write_dataframe(df_daily_avg, "/Users/todddequincey/globalwarming/data/daily_avg.feather")

# Read in feather data
daily_avg = feather.read_dataframe("/Users/todddequincey/globalwarming/data/daily_avg.feather")

Calculate intersection of weather stations of station_details and station_records

In [199]:
# Unique stations in each dataset
details = set(station_details.StationId.unique().astype(int))
daily = set(daily_avg.StationId.unique().astype(int))

# Intersection of weather stations in both datasets
intersection = details.intersection(daily)

# Only keep weather stations in the intersection of the datasets
daily_avg = daily_avg[daily_avg.StationId.isin(intersection)]

#### Load Data to MySQL

In [200]:
# Create connection to the database
engine = create_engine('mysql+pymysql://root:@localhost/globalwarming')

In [201]:
# Print progress message
print("Loading daily_avg data into MySQL...")

# Export data into MySQL
daily_avg.to_sql(con=engine,
            name='DailyAvg', 
            if_exists = 'append', 
            index=False)

# Print success message
print("daily_avg data loaded to MySQL")

Loading daily_avg data into MySQL...
daily_avg data loaded to MySQL
