# LSC Project 2021 Land Sat Temp Machine Learning: Dask on the Midway Cluster (oabreu_sjaisha_gdmorrison)

Chicago's landsat8 raw image bands dataset (2013-2020) for this notebook are available in the `/project2/macs30123/project_landsat/` directory on Midway2.

To run this notebook, you should log in to the Midway Cluster [via ThinLinc](https://midway2.rcc.uchicago.edu/main/). After you're logged in, run the following lines of code in a terminal window from a login node (to load the Python module we've been working with in the class and then install a couple of additional packages we'll be using):
```
module load python/anaconda-2019.03
pip install --user "dask[complete]" dask-jobqueue dask-ml --upgrade
```
Once you have installed these packages, start up a Jupyter Notebook from the login node (you'll be requesting resources via `dask-jobqueue`'s SLURM functions, on which you will run your code):
```
jupyter notebook
```

## Launching a Dask Cluster via SLURM

To request Midway Cluster resources for our Dask Cluster, we have installed [dask-jobqueue](https://jobqueue.dask.org/en/latest/), which can be used to deploy Dask via common job queuing systems like SLURM (used in the Midway Cluster). Furthermore, it allows us to perform these interactions with SLURM an interactive context within a Jupyter Notebook.

Specifically, we'll be using the SLURMCluster function to request resources, using many of the same key words that we provide when write an `sbatch` script or provide arguments for `sinteractive` jobs. For instance, here, we're requesting 10 `broadwl` cores with 40GB of memory that are connected via an `ib0` (InfiniBand) interconnect (which we'll have available to us for 1 hour):

In [None]:
#ACTIVATING CLIENT ON MIDWAY
from dask_jobqueue import SLURMCluster

# Compose SLURM script
cluster = SLURMCluster(queue='broadwl', cores=12, memory='40GB', 
                       processes=10, walltime='06:00:00', interface='ib0',
                       job_extra=['--account=macs30123']
                      )

# Request resources
cluster.scale(jobs=1)

We'll need to wait a bit for our resources to be provisioned, but we can check on the progress of our resources via `squeue` as usual:

In [None]:
#! squeue -u oabreu
! squeue -u syedajaisha

Once our resources have been provisioned, we need to tell Dask that it should use them to run its computations. We can do this, by passing `cluster` into our `dask.distributed` client object.

In [108]:
#ACTIVATING CLIENT ON LOCAL MACHINE
import dask
from dask.distributed import Client

client = Client()
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 55777 instead


0,1
Client  Scheduler: tcp://127.0.0.1:55778  Dashboard: http://127.0.0.1:55777/status,Cluster  Workers: 4  Cores: 4  Memory: 17.18 GB


That's all we need to do! If we want to see an interactive representation of what our workers are doing at any given time, we can click the link above, as in all the other Dask setups. 

We're now ready to start using Dask. Let's start by reading our Landsat Images Dataframe composed of all the features we extracted/engineered from the band data into a Dask DataFrame:

In [None]:
#ONLY RUN THIS ONE IF ON MIDWAY WITH THE NAME OF THE FILE;ONCE WE GET FILE ON MIDWAY, EDIT THE PATH BELOW
import dask.dataframe as dd

df = dd.read_csv('/project2/macs30123/project_landsat/___.csv')
display(df.head())
display(df.dtypes)
display(df.describe().compute())

In [168]:
#FOR TESTING ONLY; DELETE ONCE TRUE DATAFRAME CODE WORKS
def create_test_df():
    ts_2015 = pd.date_range('2015-01-01', '2015-12-31', periods=4).to_series()
    ts_2016 = pd.date_range('2016-01-01', '2016-12-31', periods=12).to_series()
    ts_2017 = pd.date_range('2017-01-01', '2017-12-31', periods=6).to_series()
    ts_2018 = pd.date_range('2018-01-01', '2018-12-31', periods=8).to_series()
    ts_2019 = pd.date_range('2019-01-01', '2019-12-31', periods=24).to_series()
    ts_2020 = pd.date_range('2020-01-01', '2020-12-31', periods=30).to_series()
    ts_all = pd.concat([ts_2015, ts_2016, ts_2017, ts_2018, ts_2019, ts_2020])
    df = pd.DataFrame({'X': np.random.randint(0, 100, size=ts_all.shape), 
                   'Y': np.random.randint(100, 200, size=ts_all.shape)},
                 index=ts_all)
    df['Year'] = df.index.year
    df['month'] = df.index.month
    df = df.reset_index()
    return df

df = create_test_df()
display(df.head())
display(df.dtypes)

Unnamed: 0,index,X,Y,Year,month
0,2015-01-01 00:00:00,45,191,2015,1
1,2015-05-02 08:00:00,54,155,2015,5
2,2015-08-31 16:00:00,71,134,2015,8
3,2015-12-31 00:00:00,94,137,2015,12
4,2016-01-01 00:00:00,39,169,2016,1


index    datetime64[ns]
X                 int64
Y                 int64
Year              int64
month             int64
dtype: object

In [170]:
import pandas as pd
import numpy as np
from sklearn import model_selection
from sklearn import metrics
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.preprocessing import PolynomialFeatures
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import accuracy_score, f1_score, \
                            precision_score, recall_score
from sklearn.exceptions import ConvergenceWarning
from sklearn.model_selection import KFold, cross_val_score
import datetime
import warnings
warnings.filterwarnings("ignore", category=ConvergenceWarning)
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

In [None]:
#DOES NOT WORK DUE TO DEPENDENCY ISSUE LOCALLY, TRY RUNNING IN MIDWAY INSTANC
from dask_ml.linear_model import LinearRegression
from dask_ml.preprocessing import MinMaxScaler

In [171]:
def pipe_normalize(df, scaler=None, outputinc=False, outputcol=None):
    '''
    Normalizes dataframe (adapted from Nick Feamster's normalize function)
    Inputs:
        df (Pandas Dataframe)
        scaler (Scaler) :If scaler is not none, use given scaler's means and sds
                         to normalize (input for test set case); else, set 
                         scaler in function
        outputinc (bool): If output is included, set aside to ensure it does not
                          get normalized, default False
        outputcol (str): If output is included, name of output column, default
                         None
    Returns tuple of:
        Normalized DataFrame and scaler used to normalize DataFrame
    '''
    columns = df.columns
    if outputinc:
        outcomes = df.loc[:,outputcol]
        df = pd.DataFrame(df.drop(outputcol, axis=1))
    if scaler is None:
        scaler = StandardScaler()
        normalized_features = scaler.fit_transform(df) 
    else:
        normalized_features = scaler.transform(df)

    normalized_df = pd.DataFrame(normalized_features)
    if outputinc:
        normalized_df[outputcol] = outcomes.tolist()

    normalized_df.index=df.index
    normalized_df.columns= columns

    return normalized_df, scaler

In [172]:
def train_val_test_split(df, split = default_split, ycol = default_ycol):
    k = len(split)

    df_train = [pd.DataFrame(columns = list(df.columns))]*k
    df_val = [pd.DataFrame(columns = list(df.columns))]*k
    df_test = df[df["Year"] == test_year]

    for i in range(k):
        for train_yr in split[i][0]:
            df_train[i] = df_train[i].append(df[df["Year"] == train_yr])
        df_val[i] = df_val[i].append(df[df["Year"] == split[i][1]])

    df_train_y = [None]*k
    df_train_x = [None]*k
    df_val_y = [None]*k
    df_val_x = [None]*k

    for i in range(k):
        df_train_y[i] = df_train[i][ycol]
        df_train_x[i] = df_train[i].drop(columns = [ycol, "Year"])
        df_val_y[i] = df_val[i][ycol]
        df_val_x[i] = df_val[i].drop(columns = [ycol, "Year"])
        df_test_y = df_test[ycol]
        df_test_x = df_test.drop(columns = [ycol, "Year"])

    return df_train_y, df_train_x, df_val_y, df_val_x, df_test_y, df_test_x

def normalize(df_train_x, df_val_x, df_test_x):
    k = len(df_train_x)
    train_norm = []
    valid_norm = []
    for n in range(k):
        df = pd.concat((df_train_x[n], df_val_x[n]))
        df_norm, scaler = pipe_normalize.normalize(df)
        tr_norm = df_norm.loc[df_train_x[n].index,:]
        val_norm = df_norm.loc[df_val_x[n].index,:]
        train_norm.append(tr_norm)
        valid_norm.append(val_norm)
    te_norm, _ = pipe_normalize.normalize(df_test_x, scaler=scaler)
    test_norm = te_norm
    return train_norm, valid_norm, test_norm

def grid_search_time_series_cv(df_train_y, df_train_x, df_val_y, df_val_x,
                               models, p_grid, ret_int_results = False, print = False):
    k = len(df_train_y)
    val_results = [pd.DataFrame(columns = ["Model", "Params", "RMSE", "MAE", "R^2"])]*k

    for i in range(k):
        for model_key in models.keys():
            for params in p_grid[model_key]:
                if print == True:
                    print("Training model:", model_key, "|", params)
                model = models[model_key]
                model.set_params(**params)
                fitted_model = model.fit(df_train_x[i], df_train_y[i])
                test_predictions = fitted_model.predict(df_val_x[i])
                rmse = mean_squared_error(df_val_y[i], test_predictions, squared = False)
                mae = mean_absolute_error(df_val_y[i], test_predictions)
                r2 = r2_score(df_val_y[i], test_predictions)
                val_results[i] = val_results[i].append(pd.DataFrame([[model_key, params, rmse, mae, r2]],
                                                       columns = ["Model", "Params", "RMSE", "MAE", "R^2"]))

    avg_val_results = pd.DataFrame(columns = ["Model", "Params", "RMSE", "RMSE std dev", "MAE", "R^2"])
    avg_val_results["Model"] = val_results[0]["Model"]
    avg_val_results["Params"] = val_results[0]["Params"]
    avg_val_results["RMSE"] = [0]*len(val_results[0])
    avg_val_results["RMSE std dev"] = [0]*len(val_results[0])
    avg_val_results["MAE"] = [0]*len(val_results[0])
    avg_val_results["R^2"] = [0]*len(val_results[0])
    for i in range(k):
        avg_val_results["RMSE"] += val_results[i]["RMSE"]/k
        avg_val_results["MAE"] += val_results[i]["MAE"]/k
        avg_val_results["R^2"] += val_results[i]["R^2"]/k
    avg_val_results = avg_val_results.reset_index().drop(columns = ["index"])
    l0 = list(val_results[0]["RMSE"])
    l1 = list(val_results[1]["RMSE"])
    l2 = list(val_results[0]["RMSE"])
    for i in range(len(avg_val_results)):
        avg_val_results.iloc[i, [3]] = np.std([l0[i], l1[i], l2[i]])

    if ret_int_results == True:
        return avg_val_results, val_results
    else:
        return avg_val_results

def select_best_model(avg_val_results, selection_param = default_selection_param):
    best_model = avg_val_results[avg_val_results[selection_param] == avg_val_results[selection_param].min()].iloc[0]
    return best_model

def select_model(avg_val_results, row):
    chosen_model = avg_val_results.iloc[row]
    return chosen_model

def test_model(df_train_y, df_train_x, df_val_y, df_val_x, df_test_y, df_test_x,
               chosen_model, models):
    k = len(df_train_y)
    model = models[chosen_model["Model"]]
    model.set_params(**chosen_model["Params"])

    df_tv_x = pd.concat([df_train_x[k-1], df_val_x[k-1]])
    df_tv_y = pd.concat([df_train_y[k-1], df_val_y[k-1]])

    fitted_model = model.fit(df_tv_x, df_tv_y)
    test_predictions = fitted_model.predict(df_test_x)
    rmse = mean_squared_error(df_test_y, test_predictions, squared = False)
    mae = mean_absolute_error(df_test_y, test_predictions)
    r2 = r2_score(df_test_y, test_predictions)
    test_results = {"RMSE" : rmse, "MAE" : mae, "r^2" :r2}
    return test_results

def choose_and_test_model(df, models, p_grid, n_splits = 3, ycol = default_ycol, selection_param = default_selection_param):
    df_train_y, df_train_x, df_val_y, df_val_x, df_test_y, df_test_x = train_val_test_split(df, split, ycol)
    df_train_x, df_val_x, df_test_x = normalize(df_train_x, df_val_x, df_test_x)
    avg_val_results = grid_search_time_series_cv(df_train_y, df_train_x, df_val_y, df_val_x, models, p_grid)
    best = select_best_model(avg_val_results, selection_param)
    test_results = test_model(df_train_y, df_train_x, df_val_y, df_val_x, df_test_y, df_test_x, best, models)
    return test_results, best

In [173]:
default_split = {0: [[2015], 2016],
                 1: [[2015, 2016], 2017],
                 2: [[2015, 2016, 2017], 2018],
                 3: [[2015, 2016, 2017, 2018], 2019],
                 4: [[2015, 2016, 2017, 2018, 2019], 2020]}
                 
test_year = 2020
default_ycol = "Y"
default_selection_param = "RMSE"

In [174]:
models = {"LinearRegression" : LinearRegression(),
          "Ridge" : Ridge(),
          "Lasso" : Lasso(),
          "ElasticNet" : ElasticNet()}

p_grid = {"LinearRegression" : [{}],
          "Ridge" : [{"alpha" : x} for x in [.1, .5, 1, 5, 10, 50, 100, 500, 1000]],
          "Lasso" : [{"alpha" : x} for x in [.1, .5, 1, 5, 10, 50, 100, 500, 1000]],
          "ElasticNet" : [{"alpha" : x,
                         "l1_ratio" : y} 
                          for x in [.1, 1, 10, 100, 1000] 
                          for y in [.1, .3, .5, .7, .9]]}

In [175]:
top_models = []
test_results, best = grid_search.choose_and_test_model(test_df, models, p_grid)
top_models.append(test_results)
display(top_models)

[{'RMSE': 27.428745039133098,
  'MAE': 22.780952380952378,
  'r^2': -0.0038345257630534313}]

In [161]:
#DO NOT RUN: ONEL'S WORKING EDITS
def time_train_test_split(df, n_splits):
    """
    Splits a dataframe into training, validation, and testing sets for time series
    machine learning.

	Inputs:
	df(Dataframe): dataframe structure with features for machine learning
        	splits; must have a 'Year' column or have the index be a datetime object 
        	and must have at least three years of data for the splits.

        n_splits (int): number of splits by years of data; typically = 
        	number of years - 2

	Outputs:
        splits (dict): a dictionary of key:value pairs with each component
		representing a sequentially increasing (by years) set of dataframes 
        to be used as training/validation and testing sets.
    """

    if (n_splits < 3):
        raise ValueError("Number of splits must be (3) or more.")

    if "Year" not in df.columns:
        if is_datetime(df.index):
            df['Year'] = df.index.year
            df = df.reset_index()
        else:
            raise TypeError("Index is not of datetime type to create year column.")
    
    year_list = df['Year'].unique().tolist()
    splits = {'train': [], 'validation':[], 'test': []}

    for idx, yr in enumerate(year_list[:-2]):
        train_yr = year_list[:idx+1]
        valid_yr = year_list[idx+1:idx+2]
        test_yr = [year_list[idx+2]]
        print('TRAIN: {}, VALIDATION: {}, TEST: {}'.format(train_yr, valid_yr, test_yr))
        
        splits['train'].append(df.loc[df.Year.isin(train_yr), :])
        splits['validation'].append(df.loc[df.Year.isin(valid_yr), :])
        splits['test'].append(df.loc[df.Year.isin(test_yr), :])
    return splits

test_df = create_test_df()
split_df_dict = time_train_test_split(test_df, 3)
split_df_dict['test'][0]