In [1]:
import os
import getpass
import pandas as pd
import numpy as np
import csv
import statsmodels.api as sm
import warnings
import copy
import multiprocessing
import traceback
import hashlib
from tqdm import tqdm_notebook as tqdm
from sklearn.model_selection import train_test_split
from itertools import combinations
from scipy import stats
from datetime import datetime
from sklearn.metrics import mean_absolute_error
from datetime import datetime
from dateutil.relativedelta import relativedelta
warnings.filterwarnings("ignore")

try: 
    __file__
except:
    curr_dir = os.path.abspath('')
else:
    curr_dir = os.path.dirname(os.path.abspath(__file__))
    
app_root = curr_dir if os.path.basename(curr_dir) != "src" else os.path.dirname(curr_dir)

if getpass.getuser() == "rainfalld":  # docker daemon
    home = os.path.expanduser("~")
    destdir = home                    # /var/cache/rainfall-predictor
else:
    destdir = os.path.join(app_root,'data','manipulated_data')      # non-docker stay in repository


file = os.path.join(destdir,'rainfalldata.csv')
rd = pd.read_csv(file)
file2 = os.path.join(destdir,'ncrainfalldata.csv')
ncrd = pd.read_csv(file2)
rd.Date = pd.to_datetime(rd.Date)
rd = rd.set_index('Date')
ncrd.Date = pd.to_datetime(ncrd.Date)
ncrd = ncrd.set_index('Date')

In [8]:
import json
# this cell takes the stored exogen dictionary that is stored in the Data_Wrangling_CAP1 jupyter notebook
# that was imported above.
try:
    %store -r exogen
except NameError:
    f = open(os.path.join(destdir,"exogen.json"),"r")
    exogen = json.load(f)      # read from file, passed from Data_Wrangling
    f.close()


### Function Library
Handles parallel-processing model calculation, mean absolute error calculations, and near-real-time calculation storage

In [2]:
def sarima_model_creation(data, p, d, q, P, D, Q, m, exog=None):
    my_order = [p,d,q]
    my_sorder = [P,D,Q,m]
    sarimamod = sm.tsa.statespace.SARIMAX(data, exog, order=my_order, seasonal_order=my_sorder, 
                                          enforce_stationarity=False, enforce_invertibility=False,
                                          initialization='approximate_diffuse')
    model_fit = sarimamod.fit(disp=0)   # start_params=[0, 0, 0, 0, 1])
    return(model_fit)

In [6]:
def model_creation_pred_one_step(train_data, test_data, exotrain=None, exotest=None, progress_bar=None):
    ''' recursively makes forecast based on provided data for the next month
        args: train_data = large data set to base predictions on
              test_data  = decreasing dataset of data to test model
              exotrain   = exogenous location data that matches the same timeframe of train_data but was not included
              exotest    = exogenous location data that matches the same timeframe of test_data but was not included
        returns: A list of all predictions for the location matching the entire test_data timeframe
    '''
    list_one_step = []
    
    nextMonth = model_based_forecast(train_data, exotrain)
    list_one_step.append(nextMonth[0])             # captures prediction
    progress_bar.update()

    # if test data exists
    if len(test_data) > 1:
        # increment data for next month's iteration
        train_data = pd.concat([train_data, test_data[[0]]])
        test_data = test_data.drop(test_data.index[0], axis = 0)
        if exotrain is not None:
            exotrain = pd.concat([exotrain, exotest.iloc[0]])
            exotest = exotest.drop(exotest.index[0], axis = 0)

        # execute & capture future predictions
        futurePredictions = model_creation_pred_one_step(train_data, test_data, exotrain, exotest, progress_bar)
        # add to list
        list_one_step.extend(futurePredictions)
        
    return(list_one_step)

def model_based_forecast(train_data, exotrain=None):
    ''' creates model from training data & makes a forecast
        args: train_data = DataFrame to build forecasting model
              exotrain   = DataFrame of exogenous location's rainfall data
        returns: FLOAT value of next month's forecast value
    '''
    mod = sarima_model_creation(train_data, 4, 0, 3, 3, 0, 4, 12, exotrain)
    # if exists, passing exotrain's prevMonth (december, for forecasting jan), otherwise only forcast based on model
    nextMonth = mod.forecast() if exotrain is None else mod.forecast( exotrain.iloc[[-1]] )       # turnary assignment expression
    return(nextMonth)

def maeFinder(train_data, test_data, exotrain=None, exotest=None, pbar=None):
    ''' Function that finds the Mean Absolute Error between test_data and model-based predictions
        args: train_data = large data set to base predictions on
              test_data  = decreasing dataset of data to test model
              exotrain   = exogenous location data that matches the same timeframe of train_data but was not included
              exotest    = exogenous location data that matches the same timeframe of test_data but was not included
              pbar       = Progress Bar object from tqdm, to provide updates to
        returns: FLOAT of Mean Absolute Error value of potential exogenous location when included into model
    '''
    clone_train_data = copy.deepcopy(train_data)
    clone_test_data = copy.deepcopy(test_data)
    clone_exotrain = exotrain if exotrain is None else copy.deepcopy(exotrain)
    clone_exotest = exotest if exotest is None else copy.deepcopy(exotest)
    
    pbar = pbar if pbar is not None else tqdm(total=len(test_data)) # initialize counter
    
    predictions = model_creation_pred_one_step(clone_train_data, clone_test_data, clone_exotrain, clone_exotest, pbar)
    mae = mean_absolute_error(test_data, predictions)
    return(mae)

def find_exmae(exog):
    ''' Standalone task method to find mae of a given exogenous variable.  
        Intended to be used as the function for the process pool and handle memory synchronization
        args: exog = exogenous location data to be evaluated as a potential associated location to model
        returns: Dictionary of exmae with columns
        #bettermae state is saved to json file and updated synchronously across all forked processes
    '''
    extr, extest = train_test_split(exog, test_size=0.2, shuffle=False)
    co = tuple(exog.columns)
    exog_name = ','.join(co)
    
    shaObj = hashlib.sha1( bytes(exog.to_csv(), 'utf-8') )
    data_signature = shaObj.hexdigest()
    
    # process syncrhonization on file read
    lock.acquire()
    try:
        with open(results_filename, 'r') as all_results_file:
            all_results = json.loads(all_results_file.read())
    except:
        all_results = { keymae['city']: {'exogen':{}} }
    finally:
        lock.release()
    
    exog_dict = all_results[keymae['city']]['exogen']
    if exog_name in exog_dict and data_signature == exog_dict[exog_name]['data_source_sha1']:
        exmae = exog_dict[exog_name]['exmae']
        return { "co": co, "exmae": exmae }
    else:
        pass
    
    exmae = maeFinder(tr, test, extr, extest, pbar)
    
    def save_solved_exmae(all_solutions, targetloc, exogloc, exmae, data_hash):
        ''' handler function for adjustment of JSON relating to results_file, 
            see adjustfn for update_JSON_file()
            args: all_solutions = loaded python-equivalent of json from file
                  targetloc = keyword of target location of current keymae
                  exogloc = exogenous location name that improves the model
                  exmae = value of mean absolute value
                  data_hash = sha1 digest of exog data set used to calculate exmae
            returns: dictionary object 
        '''
        all_solutions[targetloc]['exogen'][exogloc] = { 'exmae': exmae, 'data_source_sha1': data_hash }
        return(all_solutions)
    
    lock.acquire()
    try:
        # Update status file with solved exmae
        update_JSON_file(results_filename, save_solved_exmae, (keymae['city'], exog_name, exmae, data_signature))
        
        # Update bettermae array based on solved exmae if exmae is better than keymae
        if exmae < keymae['mae']:
            tmp_filename = tmp_bettermae_filename
            update_JSON_file(tmp_filename, None, (exog_name, exmae))          # Save with default adjuster
            
    finally:
        lock.release()
        
    return { "co": co, "exmae": exmae }


def initExmaeWorker(l, kmae, train, testing, list_exoloc, progress_bar):
    ''' Constructor function for creating and establishing initial/global 
        variables across process pool.
        args: l = synchronization lock object
              kmae = global keymae value
              train = training dataframe object to use across processes
              testing = testing dataframe object to use across processes
              list_exoloc = list of exogenous locations related to target location
              progress_bar = tqdm object for visual progress updates
    '''
    global lock
    global keymae
    global tr
    global test
    global l_exoloc
    global pbar
    lock = l
    keymae = kmae
    tr = train
    test = testing
    l_exoloc = list_exoloc
    pbar = progress_bar


def update_JSON_file(filename, adjustfn, arglist=(), kwargs={}, sort=True):
    ''' Generic function to handle JSON file updates.  Reads-in entire file, 
        federates out updates with adjustment fn's, and then overwrites original file completely
        Handles FileNotFoundError & JSONDecodeError automatically.
        args: filename = json-encoded file on disk
              adjustfn = function to perform adjustments to loaded dictionary file
              arglist = positional args to pass on to adjustfn
              kwargs = keyword args to pass on to adjustfn
              sort = flag to auto-sort keys when saving to file [Default = True]
        returns: dictionary object that was updated and saved to file
    '''
    def default_dict_adjustfn(data, key, value):
        ''' Generic default function for updating a basic dictionary data file (top level keys only)
            args: data = dictionary representation of JSON data from file
                  key = key name to enter into dictionary
                  value = value to enter into dictionary[key]
            returns: Updated dictionary with key/value added
        '''
        data[key] = value
        return(data)
    
    def default_list_adjustfn(data, value):
        ''' Generic default function for updating a basic list data file (add to bottom of list)
            args: data = list representation of JSON data from file
                  value = value to append to end of list, list[len(list)] = value
            returns: Updated list with value appended
        '''
        data.append(value)
        return(data)
    
    loaded = False
    while not loaded:
        try:
            file = open(filename, "r+")
            json_data = json.loads(file.read())
        except FileNotFoundError:
            open(filename, "w+").close()       # create file on disk
            continue
        except json.JSONDecodeError:
            json_data = {}
        
        loaded = True
        if adjustfn is not None:
            json_data = adjustfn(json_data, *arglist, **kwargs)
        else:
            if isinstance(json_data, dict):
                json_data = default_dict_adjustfn(json_data, *arglist, **kwargs)
            elif isinstance(json_data, list):
                json_data = default_list_adjustfn(json_data, *arglist, **kwargs)
            else:
                raise ValueError('Unable to adjust JSON since function not provided or file not of type dict or list!')
        
        file.seek(0)                           # Go to first line, first column of file
        file.write( json.dumps(json_data, sort_keys=sort, indent=4) )
        file.truncate()                        # end file here, delete anything after the current file position
        file.close()
    
    return(json_data)

In [10]:
results_filename = os.path.join(destdir,"allMAE.json")
bettermae_results_filename = os.path.join(destdir,"allBetterMAE.json")
tmp_bettermae_filename = os.path.join(destdir, "tmp_bettermae.json")

def exogenous_var(data, ncloc, l_exoloc):
    ''' Function to evaluate an location model completely.  First, it finds
        a keymae of the current data frame about a location with 20% data split.
        Secondly, it spawns a pool of processes (# of CPU cores) to calculate each potential
        exogenous location's potential improvement of the model.  Each exmae is printed to
        stdout and if improved, it is stored into the bettermae dictionary.  The NC location
        does not complete until all exmaes have been calculated.
        args: data  = entire dataframe of locations and rainfall amounts over time
              ncloc = Name of NC location (matches column in data)
              l_exoloc = list of exogenous locations to the ncloc parameter
    '''
    dat = data[ncloc]
    tr, test = train_test_split(dat, test_size=0.2, shuffle=False)
    keymae = { 'city': ncloc }
    
    shaObj = hashlib.sha1( bytes(dat.to_csv(), 'utf-8') )
    data_signature = shaObj.hexdigest()
    
    try:
        with open(results_filename, 'r') as all_results_file:
            all_results = json.loads(all_results_file.read())
    except:
        all_results = {}
    
    if ncloc in all_results and data_signature == all_results[ncloc]['data_source_sha1']:
        keymae['mae'] = all_results[ncloc]['keymae']
    else:
        keymae['mae'] = maeFinder(tr, test)
        
        # Save calculation to file
        city_data = {'keymae':keymae['mae'],'data_source_sha1': data_signature,'exogen':{}}
        update_JSON_file(results_filename, None, (keymae['city'], city_data) ) # save with default adjuster
        
        # wipe bettermae file of any keymae results since keymae has been recalculated
        def delete_location(data, location):
            if location in data:
                del data[location]
            return(data)
            
        update_JSON_file(bettermae_results_filename, delete_location, (keymae['city'],))
        
    # process exmaes
    print('keymae of: '+ ncloc +' = '+str(keymae['mae']))
    
    poolLock = multiprocessing.Lock()
    
    def on_success(result):
        print('exmae = {}'.format(result["co"]) + ' '+ str(result["exmae"]))
        progressbar.update() # update counter of completion
    
    def on_error(err):
        print("ERROR: {}".format(err))
        traceback.print_exception(type(err), err, err.__traceback__)
    
    process_limit = multiprocessing.cpu_count()-1   # 1 cpu is needed for basic OS functions
    progressbar = tqdm(total=len(l_exoloc))  # initialize counter (regular)
#     progressbar = tqdm(total=(len(l_exoloc)*len(test))) # initialize counter (multiple exmaes at once, fails do to process collision)
    pool = multiprocessing.Pool(processes=process_limit, initializer=initExmaeWorker, initargs=(poolLock, keymae, tr, test, l_exoloc, progressbar))
    for exog in l_exoloc:
        pool.apply_async(find_exmae, args=(exog,), kwds={}, callback=on_success, error_callback=on_error)
    
    pool.close()      # no more tasks can be added for the pool to accomplish
    pool.join()       # tell parent to wait until all tasks are accomplished by the process pool
    
    
    # Evaluate & save found bettermae
    if os.path.isfile(tmp_bettermae_filename):
        tmp_bettermae_file = open(tmp_bettermae_filename, 'r')
        improvement_exog = json.loads(tmp_bettermae_file.read())
        tmp_bettermae_file.close()
        os.remove(tmp_bettermae_file.name)                               # tmp file cleanup
        
        all_results_file = open(results_filename, 'r')
        all_results = json.loads(all_results_file.read())
        all_results_file.close()
            
        filtered_results = all_results[keymae['city']]
        filtered_results['exogen'] = {}                              # reset dictionary
        for key,value in improvement_exog.items():                   # fill exogen dictionary with valuable vars
            filtered_results['exogen'][key] = value
            
        all_bettermae = update_JSON_file(bettermae_results_filename, None, (keymae['city'], filtered_results)) # save with default adjuster
        print("Improvement_exog: "+keymae['city']+": {}".format(json.dumps(all_bettermae[keymae['city']], indent=4)))
    
    return()


In [23]:
def exog_combinations(df, exoe):
    ''' This function takes the dataframe of rain data and the list of exogenous variables from a single NC
    location and then returns a list of dataframes that contains all of the rainfall data for just the 
    exogenous variables
    '''
    lo_dfs = []
    if len(exoe) == 1:
        lo_dfs.append(df.loc[:,exoe])
    if len(exoe) > 1:
        lo_dfs.append(df.loc[:,exoe])
        for ex in exoe:
            lo_dfs.append(df.loc[:,[ex]])
        if len(exoe) >2:
            for i in range(2, len(exoe)):
                combolist = list(combinations(exoe,i))
                for c in combolist:
                    lo_dfs.append(df.loc[:,c])
    return(lo_dfs)



### Data Evaluation
Finds combinations of exogenous variable locations and starts model evaluation of combinations

In [24]:
autoExogen = True   # flag for manual use

# Defining set of cities to evaluate
if autoExogen or getpass.getuser() == "rainfalld":       # docker daemon, automatically do all exogen
    todokeys = exogen.keys()
else:    # manual setting of dictionary elements to do
    todokeys = ('ARCOLA, NC', 'HENDERSON 2 NNW, NC', 'LAURINBURG, NC', 'ROANOKE RAPIDS, NC', 'MURFREESBORO, NC', 'LUMBERTON AREA, NC', 'LONGWOOD, NC', 'WHITEVILLE 7 NW, NC', 'CHARLOTTE AREA, NC', 'MOUNT MITCHELL AREA, NC', 'ASHEVILLE AIRPORT, NC', 'BANNER ELK, NC', 'BEECH MOUNTAIN, NC', 'BRYSON CITY 4, NC', 'BREVARD, NC', 'CASAR, NC', 'COWEETA EXP STATION, NC', 'CULLOWHEE, NC', 'FOREST CITY 8 W, NC', 'FRANKLIN, NC', 'GASTONIA, NC', 'GRANDFATHER MTN, NC', 'HENDERSONVILLE 1 NE, NC', 'HIGHLANDS, NC', 'HOT SPRINGS, NC', 'LAKE LURE 2, NC', 'LAKE TOXAWAY 2 SW, NC', 'MARSHALL, NC', 'MONROE 2 SE, NC', 'MOUNT HOLLY 4 NE, NC', 'OCONALUFTEE, NC', 'PISGAH FOREST 3 NE, NC', 'ROBBINSVILLE AG 5 NE, NC', 'ROSMAN, NC', 'SHELBY 2 NW, NC', 'TAPOCO, NC', 'TRYON, NC', 'WAYNESVILLE 1 E, NC', 'BOONE 1 SE, NC', 'DANBURY, NC', 'EDEN, NC', 'MOUNT AIRY 2 W, NC', 'REIDSVILLE 2 NW, NC', 'HAYESVILLE 1 NE, NC', 'MURPHY 4ESE, NC', 'KING, NC')

sub_exogen = {k: exogen[k] for k in todokeys}

In [25]:
from collections import defaultdict
l_o_dfs = defaultdict(list)
for key,value in tqdm(sub_exogen.items()):
    lo_dfs2 = exog_combinations(rd, value)
    l_o_dfs[key] = lo_dfs2


HBox(children=(IntProgress(value=0, max=43), HTML(value='')))

In [26]:
# best_comb = [[4,3,3,4]]
warnings.filterwarnings("ignore")

files=[tmp_bettermae_filename]
while (len(files) > 0):                          # reset results on new run
    try:
        os.remove( files[-1] )
    except FileNotFoundError:                    # ignore since non-exist is the desired state
        pass
    except OSError as err:
        traceback.print_exception(type(err), err, err.__traceback__)
    finally:
        files.pop()
    
for key,value in tqdm(l_o_dfs.items()):
    exogenous_var(rd, key, value)

## Predictions for the next 50 years (2019 - 2069)
Includes exogenous locations outside of North Carolina to improve model predictions

In [3]:
with_exogs = ['WHITEVILLE 7 NW, NC', 'CASAR, NC', 'FOREST CITY 8 W, NC', 'GASTONIA, NC', 'LAKE LURE 2, NC', 
                       'ELIZABETHTOWN, NC', ' MOUNT HOLLY 4 NE, NC','GRANDFATHER MTN, NC']
ncrd2 = ncrd.copy()
ncrd_less = ncrd2.drop(with_exogs,axis=1)

In [10]:
def prediction_fx(data, begin, end):
    base = datetime.strptime(begin,'%Y-%m-%d')
    date_list = [base + relativedelta(months=x) for x in range(600)]
    prediction1_df = pd.DataFrame(index=date_list)
    for col in tqdm(data.columns):
        loc = data[col]
        mod_fit1 = sarima_model_creation(loc, 4,0,3,3,0,4,12)
        point_predictions = pd.DataFrame(mod_fit1.predict(start=begin, end=end), columns=[col])
        future_pred1 = mod_fit1.get_prediction(start=begin, end=end)
        future_pred1_ci = future_pred1.conf_int(alpha=0.5)
        point_predictions_df = pd.merge(point_predictions, future_pred1_ci, left_index=True, right_index=True)
        prediction1_df = pd.merge(prediction1_df, point_predictions_df, left_index=True, right_index=True)
    return(prediction1_df)

In [11]:
pre_df = prediction_fx(ncrd_less, '2019-05-01', '2069-05-01')
pre_df.head(10)

In [5]:
exo_var_dict2 = {
    'WHITEVILLE 7 NW, NC': rd[[' LORIS 2 S, SC']],
    'CASAR, NC': rd[['GAFFNEY 6 E, SC']],
    'FOREST CITY 8 W, NC': rd[['GAFFNEY 6 E, SC']],
    'GASTONIA, NC': rd[['FORT MILL 4 NW, SC','GAFFNEY 6 E, SC']],
    'LAKE LURE 2, NC': rd[['CHESNEE 7 WSW, SC']],
    ' MOUNT HOLLY 4 NE, NC': rd[['CHESNEE 7 WSW, SC','GAFFNEY 6 E, SC']],
    'ELIZABETHTOWN, NC': rd[[' LORIS 2 S, SC']],
    'GRANDFATHER MTN, NC': rd[['ELIZABETHTON, TN']]
    
}

In [6]:
def prediction_exog_fx2(data, exog_dict, begin, end):
    base = datetime.strptime(begin,'%Y-%m-%d')
    date_list = [base + relativedelta(months=x) for x in range(600)]
    prediction_df = pd.DataFrame(index = date_list)
    pred_val_df = pd.DataFrame(index = date_list)
    exog_predictions_df = pd.DataFrame(index = date_list)
    for key,value in tqdm(exog_dict.items()):
        loc = data[key]
        mod_fit1 = sarima_model_creation(loc, 4,0,3,3,0,4, 12,exog=value)
        if value.shape[1] > 1:
            shap = value.shape[1]
            for i in range(shap):
                exog_mod_fit = sarima_model_creation(value.iloc[:,i],4,0,3,3,0,4,12)
                e_preds2 = pd.DataFrame(exog_mod_fit.predict(start=begin, end=end))
                if i is 0:
                    exog_predictions_df = e_preds2
                else:
                    exog_predictions_df = pd.merge(exog_predictions_df, e_preds2, left_index=True, 
                                                   right_index=True)
        else:
            exog_mod_fit = sarima_model_creation(value, 4,0,3,3,0,4,12)
            exog_predictions_df = pd.DataFrame(exog_mod_fit.predict(start=begin, end=end))
        future_pred = mod_fit1.get_prediction(exog=exog_predictions_df,start=begin, end=end)
        future_pred_ci = future_pred.conf_int(alpha=0.5)
        future_pred_val= pd.DataFrame(mod_fit1.predict(exog=exog_predictions_df, start=begin, end=end), 
                                      columns = [key])
        future_pred_full = pd.merge(future_pred_val, future_pred_ci, left_index=True, right_index=True)
        prediction_df = pd.merge(prediction_df, future_pred_full, left_index=True, right_index=True)
    return(prediction_df)

In [7]:
e_ci_df = prediction_exog_fx2(rd, exo_var_dict2, '2019-05-01', '2069-05-01')

In [8]:
e_ci_df.head(10)

In [12]:
merged_ci_vals = pd.merge(pre_df, e_ci_df, left_index=True, right_index=True)

In [None]:
merged_ci_vals.to_csv(os.path.join(destdir,'predictions.csv'))

In [14]:
merged_ci_vals.head(10)