In [105]:
import pandas as pd
import re
import math
import numpy as np

metadata = pd.read_csv('/mnt/d/airdata/metadata_with_station.csv')
df = pd.read_csv('/mnt/d/airdata/noaa_redownload/la_2017_2021_unique_station.csv')
pm = pd.read_csv('/mnt/d/airdata/pm/la_train.csv')

In [106]:
def clean_weather_data_columns(data, remove_cols):
    #TO-DO: Explore what the columns mean and what can be removed, 
    # and if columns with much missing data are important
    '''
    Removes irrelevant columns and columns with many missing values
    for historical weather dataset. 
    Args: 
        data: weather dataframe
        remove_cols: columns to remove
    Returns: 
        Cleaned dataframe
    '''
    data = data.drop(remove_cols, axis = 1)
    to_drop = []
    for col in data:
        if data[col].isna().sum() > 0.7 * len(col):
            data = data.drop(to_drop, axis = 1)
    return data

def convert_to_int(x, err, acceptable_quality):
    if str(x) in acceptable_quality:
        return acceptable_quality[0]
    try:
        return float(x)
    except ValueError:
        return err

In [107]:
remove_cols = [ 'SOURCE', 'NAME', 'REPORT_TYPE', 'CALL_SIGN', 'QUALITY_CONTROL']
df = clean_weather_data_columns(df, remove_cols)
df['DATE'] = pd.to_datetime(df['DATE'])

In [108]:
def clean_wind(wnd):
    '''
    Parses wind data string into relevant fields. Creates new columns in weather dataframe
    Removes samples with corrupted wind data, or missing wind data. 
    'Wind direction: 0-360 degs
    'Wind speed: m/s
    '''
    acceptable_quality = (0,1,4,5,9) #see docs
    missing_dir, missing_spd = 999, 9999
    direction,quality_dir,_,speed,quality_spd = list(map(lambda x: convert_to_int(x, missing_spd, acceptable_quality), wnd.split(",")))
    wind_dir = direction
    wind_speed = speed
    if quality_dir not in acceptable_quality or direction == missing_dir or direction == missing_spd:
        wind_dir = np.nan
    if quality_spd not in acceptable_quality or speed == missing_spd:
        wind_speed  = np.nan
    return wind_speed, wind_dir
    

def clean_ceiling_height_data(ceiling):
    '''
    The height above ground level (AGL) of the lowest cloud or obscuring
    phenomena layer aloft with 5/8 or more summation total sky cover,
    which may be predominantly opaque, or the vertical visibility into a
    surface-based obstruction. Unlimited = 22000.
    height (m) above ground level of lowest cloud (unlimited = 22000)
    '''
    acceptable_quality = (0,1,4,5,9) #see docs
    missing = 99999
    height,quality, *rest =  list(map(lambda x: convert_to_int(x, missing, acceptable_quality), ceiling.split(",")))
    if quality not in acceptable_quality or height == missing:
            height = np.nan
    return height

def clean_visibility_data(vis):
    '''The horizontal distance at which an object can be seen and identified. (meters)
    '''
    acceptable_quality = (0,1,4,5,9) #see docs
    missing = 99999
    height,quality, *rest = list(map(lambda x: convert_to_int(x, missing, acceptable_quality), vis.split(",")))
    if quality not in acceptable_quality or height == missing:
            height = np.nan
    return height

def clean_temperature_data(tmp): 
    '''Air Temperature data in C'''
    missing = 9999
    acceptable_quality = (0,1,4,5,9,'C','I','M','P','R','U') #see docs
    temperature,quality = list(map(lambda x: convert_to_int(x, missing, acceptable_quality), tmp.split(",")))
    if quality not in acceptable_quality or temperature == missing:
        temperature = np.nan
    return temperature

def clean_pressure_data(pressure): 
    '''The air pressure relative to Mean Sea Level (MSL).
    (Hectopascals)'''
    missing = 99999
    acceptable_quality = (0,1,4,5,9) #see docs
    pressure,quality = list(map(lambda x: convert_to_int(x, missing, acceptable_quality), pressure.split(",")))
    if quality not in acceptable_quality or pressure == missing:
        pressure = np.nan
    return pressure

def clean_dew_point_data(dew): 
    '''The temperature to which a given parcel of air must be cooled
     at constant pressure and water vapor content in order for saturation to occur. (C)'''
    missing = 9999
    acceptable_quality = (0,1,4,5,9,'C','I','M','P','R','U') #see docs
    dp,quality = list(map(lambda x: convert_to_int(x, missing, acceptable_quality), dew.split(",")))
    if quality not in acceptable_quality or dp == missing:
        dp = np.nan
    return dp

def clean_precipitation_data(rain): 
    '''episode of LIQUID-PRECIPITATION.
    - The quantity of time over which the LIQUID-PRECIPITATION was measured. (hours)
    - The depth of LIQUID-PRECIPITATION that is measured at the time of an observation. (mm)
    Note that there data contains AA1-AA3 fields for multiple precipitation events
    98% of samples do not have more than 1 event (AA2-AA4 are 98% nan), so these are ignored 
    TO-DO: Check above statement on total dataste
    To-DO: how to deal with nan data (no rain event)
    '''
    times, depths, to_drop = [], [], []
    missing_depth, missing_time = 9999, 99
    acceptable_quality = ('0','1','4','5','9','C','I','M','P','R','U') #see docs
    if type(rain) == float:
        return np.nan, np.nan
    time,depth,_,quality = list(map(lambda x: convert_to_int(x, missing_depth, acceptable_quality), rain.split(",")))
    if quality not in acceptable_quality:
        depth = time = np.nan
    if depth == missing_depth or time == missing_time:
        depth = np.nan
    if time == missing_time or time == missing_depth:
        time = np.nan
    return time, depth


def clean(df):
    wind_speed, wind_dir = clean_wind(df['WND'])
    height = clean_ceiling_height_data(df['CIG'])
    vis = clean_visibility_data(df['VIS'])
    temperature = clean_temperature_data(df['TMP'])
    presssure = clean_pressure_data(df['SLP'])
    dew = clean_dew_point_data(df['DEW'])
    time, depth = clean_precipitation_data(df['AA1'])
    
    return wind_speed, wind_dir,height,vis,temperature,presssure,dew,time, depth
    
    
df[['wind_speed','wind_dir', 'ceiling', 'visibility', 'temperature', 'pressure', 'dew', 'duration', 'depth']] = df.apply(clean, axis=1, result_type ='expand')

In [109]:
cleaned_df = df[["STATION", "DATE", "LATITUDE", "LONGITUDE", "ELEVATION", 'wind_speed','wind_dir', 'ceiling', 'visibility', 'temperature', 'pressure', 'dew', 'duration', 'depth']].copy()

In [112]:
def average_wind_vector(speed, direction):
    '''
    Args: 
        wind_data: dataframe with wind direction and speed cols
    Returns: 
        tuple of mean x,y wind vectors
    '''
    #data is calibrated to north as 0 deg ?
    # wind_direction += 90
    x_vect = np.cos(speed) * direction
    y_vect = np.sin(speed) * direction
    return np.nanmean(x_vect), np.nanmean(y_vect)

    
def average_cloud_height(cloud_height):
    sigmoid = lambda x: 1 / (1 + np.exp(-x))
    sigmoid = np.vectorize(sigmoid)
    cloud_height = sigmoid(cloud_height)
    height = np.nanmean(cloud_height)
    return height


In [113]:
def group_fnc(df):
    wind_x, wind_y = average_wind_vector(df['wind_speed'], df['wind_dir'])
    cloud_height = average_cloud_height(df['ceiling'])
    return pd.Series({
        "wind_x": wind_x, "wind_y":wind_y,
        "ceiling": cloud_height,
        "visibility": np.nanmean(df["visibility"]),
        "pressure": np.nanmean(df['pressure']),
        "dew": np.nanmean(df['dew']),
        "precipitation_duration": np.nanmean(df['duration']),
        "precipitation_depth": np.nanmean(df['depth']),
    }) 

grouped_df = cleaned_df.groupby(['STATION', df['DATE'].dt.date]).apply(group_fnc).reset_index(drop=False)
grouped_df['DATE'] = pd.to_datetime(grouped_df['DATE'])

  "pressure": np.nanmean(df['pressure']),
  "precipitation_duration": np.nanmean(df['duration']),
  "precipitation_depth": np.nanmean(df['depth']),
  "dew": np.nanmean(df['dew']),
  return np.nanmean(x_vect), np.nanmean(y_vect)
  height = np.nanmean(cloud_height)


merge the pm dataset with the weather dataset

In [115]:
pm_values = pm.merge(metadata, on='grid_id')[['STATION','day', 'pm25', 'mean_aod','min_aod','max_aod']]
pm_values['day'] = pd.to_datetime(pm_values['day'])
pm_values.rename(columns={'day':"DATE"}, inplace=True)


In [117]:
grouped_df.merge(pm_values, on=['STATION', 'DATE']).to_csv('la_weather_with_pm_per_day.csv', index=False)