# Final Project

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import keras
import time

from sklearn.preprocessing import OneHotEncoder
from typing import Final
from sklearn.metrics import mean_squared_error, accuracy_score, classification_report, multilabel_confusion_matrix
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split, KFold, GridSearchCV
from keras.wrappers.scikit_learn import KerasClassifier


## Data Preprocessing

In [None]:
# Build the complete dataset

gdp_raw = pd.read_excel('gdp.xlsx').set_axis(
    ['county', '2017', '2018', '2019', '2020', 'rank 2018', 'percent change 2018', 'percent change 2019', 'percent change 2020', 'rank 2020'], axis=1, inplace=False)
gdp_clean: pd.DataFrame = gdp_raw.drop(columns=['rank 2018', '2017', '2018', '2020',
                                                'rank 2020', 'percent change 2018', 'percent change 2019'], inplace=False).iloc[5:3222]

state_names = pd.read_csv('us-counties-2020.csv')['state'].unique()

counties_df = pd.read_csv('complete.csv')
counties_df['2019 raw GDP'] = np.nan  # iloc =5
counties_df['percent change 2020'] = np.nan  # iloc = 6

curr_state = None
for index, row in gdp_clean.iterrows():
    if row[0] in state_names:
        curr_state = row[0]
        continue
    else:
        row_index = counties_df.index[(counties_df['state'] == curr_state) & (
            counties_df['county'] == row[0])].tolist()
        counties_df.iloc[[row_index], [5]] = row[1]  # type: ignore
        counties_df.iloc[[row_index], [6]] = row[2]  # type: ignore


print(f"{len(counties_df['state'].unique())} states")
counties_df['2019 raw GDP'] = counties_df['2019 raw GDP'].astype('float32')
print(counties_df)


In [None]:
encoded_state_names = pd.get_dummies(counties_df['state'])
counties_df = counties_df.drop(columns=['state']).join(encoded_state_names)
counties_df.dropna(axis='index', how='any', inplace=True)

## Building the model

In [None]:
# Setup some constants
DEFAULT_LEARNING_RATE: Final[float] = 0.3
DEFAULT_EPOCHS: Final[int] = 500

In [None]:
def BuildDefaultModel():
    SGD_optimizer: Final = tf.keras.optimizers.SGD(
        learning_rate=DEFAULT_LEARNING_RATE)
    lossFunction = tf.keras.losses.MeanSquaredError()

    model: keras.Sequential = keras.Sequential([
        keras.Input(shape=(59)),
        tf.keras.layers.Dense(
            units=12, activation='ReLU', name='hidden_layer_1'),
        tf.keras.layers.Dense(
            units=12, activation='ReLU', name='hidden_layer_2'),
        tf.keras.layers.Dense(
            units=6, activation='ReLU', name='hidden_layer_3'),
        tf.keras.layers.Dense(
            units=5, activation='ReLU', name='hidden_layer_4'),
        tf.keras.layers.Dense(units=1, activation='ReLU', name='output'),
    ], name='Default_COVID_Classifier')

    model.compile(loss=lossFunction, optimizer=SGD_optimizer,
                  metrics=['accuracy'])
    return model


defaultCovidClassifier = BuildDefaultModel()
print(defaultCovidClassifier.summary())


In [None]:
input_attributes = counties_df.drop(columns=['percent change 2020', 'county'], inplace=False)

input_attributes['2019 raw GDP'] = input_attributes['2019 raw GDP'].astype('float64')
input_attributes['cases'] = input_attributes['cases'].astype('int32')


input_attributes['Positivity Rate'] = input_attributes['cases'] / input_attributes['2020 population']
input_attributes['Death Rate'] = input_attributes['deaths'] / input_attributes['2020 population']
input_attributes.drop(columns=['cases', 'deaths'], inplace=True)

normalizedAttributes: Final[pd.DataFrame] = pd.DataFrame(
    MinMaxScaler()
    .fit(input_attributes)
    .transform(input_attributes)
)
print(input_attributes)

In [None]:
print(normalizedAttributes)

In [None]:
X_Train, X_Test, y_Train, y_Test = train_test_split(
    input_attributes, counties_df['percent change 2020'].astype('float64'), test_size=0.1, random_state=44)

print(X_Train.shape, y_Train.shape)

In [None]:
# Compile and Train
startTime = time.time()
defaultCovidClassifier.fit(
    x=X_Train, y=y_Train,
    epochs=DEFAULT_EPOCHS,
    validation_data=(X_Test, y_Test))
endTime = time.time()

print(f'Training took {endTime-startTime} seconds')


In [None]:
# Print the MSE, Accuracy Score, and Confusion Matrices

predicted_y: np.ndarray = defaultCovidClassifier.predict(
    X_Test)  # raw continuous outputs
# predicted_y_argmaxed = predicted_y.argmax(axis=1)

print(f'MSE of test set is: {mean_squared_error(predicted_y, y_Test)}')
# print(f'Accuracy: {accuracy_score(predicted_y, y_Test)}')

# print('Precision & Recall:')
# print(classification_report(predicted_y, y_Test))

# This prints an array of 7 matrices, each matrix is 2x2 of [[TT, TF], [FT, FF]]
# Where each index represents the corresponding class name
# in the OneHotEncoding function in the above cell
# print('Confusion Matrix:')
# print(multilabel_confusion_matrix(
#     y_pred=predicted_y, y_true=y_Test.argmax(axis=1)))


In [None]:
def generateValidationData(inputDF: pd.DataFrame, expectedOutputs: np.ndarray,
                           trainIndexes: np.ndarray,
                           testIndexes: np.ndarray):
    X_Train = inputDF.iloc[trainIndexes]
    X_Test = inputDF.iloc[testIndexes]

    y_Train = expectedOutputs[trainIndexes]
    y_Test = expectedOutputs[testIndexes]

    return X_Train, X_Test, y_Train, y_Test


In [None]:
# Do 10-fold validation

KFolder = KFold(n_splits=10)
mseScores: list[float] = []
accuracyScores: list[float] = []

for trainIndexes, testIndexes in KFolder.split(input_attributes):
    X_Train, X_Test, y_Train, y_Test = generateValidationData(
        input_attributes, encodedClassNames, trainIndexes, testIndexes)

    beansClassifier.fit(
        x=X_Train, y=y_Train,
        epochs=DEFAULT_EPOCHS,
        validation_data=(X_Test, y_Test))

    predicted_y: np.ndarray = beansClassifier.predict(X_Test)

    mseScores.append(mean_squared_error(y_Test, predicted_y))
    accuracyScores.append(accuracy_score(
        predicted_y.argmax(axis=1), y_Test.argmax(axis=1)))


In [None]:
print(
    f'Accuracy Scores: {accuracyScores}\nAverage accuracy is: {np.average(accuracyScores)}\n')
print(f'MSE Loss: {mseScores}\nAverage MSE is: {np.average(mseScores)}')


## Exercise 3

In [None]:
def BuildModel(numNodesLayer1=12, numNodesLayer2=3, learningRate=DEFAULT_LEARNING_RATE):
    SGD_optimizer: Final = tf.keras.optimizers.SGD(learning_rate=learningRate)
    lossFunction: Final = tf.keras.losses.MeanSquaredError()

    model: keras.Sequential = keras.Sequential([
        tf.keras.layers.Dense(
            input_dim=6,
            units=numNodesLayer1, activation='ReLU', name='hidden_layer_1'),
        tf.keras.layers.Dense(
            units=numNodesLayer2, activation='ReLU', name='hidden_layer_2'),
        tf.keras.layers.Dense(units=1, activation='ReLU', name='output'),
    ], name='Beans_Classifier')

    model.compile(loss=lossFunction, optimizer=SGD_optimizer,
                  metrics=['accuracy'])
    return model


wrappedCovidClassifier = KerasClassifier(build_fn=BuildModel)

# parameters passed to BuildModel(...)
param_grid = dict(
    nb_epoch=np.array(list(range(500, 1001, 50))),
    learningRate=np.array([0.1, 0.3, 0.6]),
    numNodesLayer1=np.array([12, 13, 14]),
    numNodesLayer2=np.array([3, 4, 5]),
)

grid = GridSearchCV(estimator=wrappedCovidClassifier,
                    param_grid=param_grid, n_jobs=-1, cv=10)

grid_result = grid.fit(X_Train, y_Train)


In [None]:
# Big reveal of best parameters
grid_result.best_params_