<a href="https://colab.research.google.com/github/savinthie/Final_Year_Project_IDP_2024-2025/blob/main/ANN_CNN_IDP_implementation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from tensorflow.keras.layers import Layer
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Layer
import numpy as np

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, KFold
from sklearn.preprocessing import StandardScaler, MinMaxScaler

from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv1D, Flatten, Dense, ReLU, Concatenate, MaxPooling1D
from tensorflow.keras.optimizers import Adam
import joblib

from scipy.special import expit as sigmoid
import matplotlib.pyplot as plt
import joblib


In [None]:
# Data Collection
from google.colab import drive
drive.mount('/content/drive')
df = pd.read_csv('/content/drive/MyDrive/FYP 2024 25/USDataset.csv', header=1)
df = df.fillna(0)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Data Preprocessing
df.columns = [col.lower().replace(' ', '').replace('.', '') for col in df.columns]
cols_to_keep = ['stateabv', 'county', 'family', 'housing', 'food', 'transportation',
                'healthcare', 'othernecessities', 'childcare', 'taxes', 'total',
                'median_family_income', 'num_counties_in_st']
df1 = df[cols_to_keep].copy()

df1['median_family_income'] = df1['median_family_income'].replace(',', '', regex=True).astype(float)

In [None]:
# Feature Engineering
df1['n_parents'] = df1['family'].str.slice(0, 1).astype(int)
df1['n_children'] = df1['family'].str.slice(2, 3).astype(int)
df1['n_members'] = df1['n_parents'] + df1['n_children']

df1['financial_stability'] = df1['median_family_income'] / df1['total']


df1["per_member_cost"] = df1["total"] / df1["n_members"]
df1["child_expense_cost"] = df1["per_member_cost"]*df1["n_children"]
df1["parent_expense_cost"] = df1["per_member_cost"]*df1["n_parents"]
df1["other_expense_cost"] = df1["total"] - (df1["child_expense_cost"]+df1["parent_expense_cost"])

In [None]:
# Splitting the data
X = df1[['total', 'median_family_income', 'num_counties_in_st', 'n_children', 'n_parents', 'n_members']+['per_member_cost','child_expense_cost','parent_expense_cost','other_expense_cost']].values
y_expenses = df1[['housing', 'food', 'transportation', 'healthcare', 'othernecessities', 'childcare', 'taxes']].values

target_col_list = ['housing', 'food', 'transportation', 'healthcare', 'othernecessities', 'childcare', 'taxes']

# Scaling
scaler_X = StandardScaler()

# Load the X scaler
# scaler_X = joblib.load('scaler_X.pkl')

X_scaled = scaler_X.fit_transform(X)

scaler_y = StandardScaler()

# Load the y scaler
# scaler_y = joblib.load('scaler_y.pkl')
y_exp_scaled = scaler_y.fit_transform(y_expenses)



# Save the X scaler
joblib.dump(scaler_X, 'scaler_X.pkl')

# Save the y scaler
joblib.dump(scaler_y, 'scaler_y.pkl')

['scaler_y.pkl']

In [None]:
# Cross-Validation Setup
kf = KFold(n_splits=2, shuffle=True, random_state=100)

# Metrics
def print_metrics(y_true, y_pred, task_name):
    mse = mean_squared_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    print(f"{task_name} - MSE: {mse:.4f}, R2: {r2:.4f}")

def regr_report(x, y):
    mae = round(mean_absolute_error(x, y), 4)
    r2 = round(r2_score(x, y), 4)
    mse = round(mean_squared_error(x, y), 4)
    rmse = round(np.sqrt(mean_squared_error(x, y)), 4)
    return f'MAE: {mae}, R-Squared: {r2}, RMSE: {rmse}, MSE: {mse}'

In [None]:

#Create a hybrid
def create_hybrid_model(input_shape, output_shape, target_col_list):
    input_layer = Input(shape=input_shape)

    # CNN part
    x = Conv1D(filters=8, kernel_size=3, padding='same', activation='relu')(input_layer)
    x = Flatten()(x)

    # ANN part
    x = Dense(128, activation='relu')(x)
    x = Dense(64, activation='relu')(x)
    x = Dense(32, activation='relu')(x)  # Additional layer to emphasize ANN structure

    # Output layers
    output_layers = [Dense(1, activation='linear', name=f'target_{col}')(x) for col in target_col_list]

    model = Model(inputs=input_layer, outputs=output_layers)
    return model


In [None]:
def train_test_val_split(X_scaled, y_exp_scaled, test_size=0.2, val_size=0.1):
    """
    Splits the data into training, validation, and test sets.
    """
    from sklearn.model_selection import train_test_split
    X_train, X_temp, y_train, y_temp = train_test_split(X_scaled, y_exp_scaled, test_size=(test_size + val_size))
    X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=test_size / (test_size + val_size))
    return X_train, X_val, X_test, y_train, y_val, y_test

In [None]:
def cross_val_with_evaluation(X_scaled, y_exp_scaled, target_col_list, scaler_y, num_epochs=200, test_size=0.2, val_size=0.1):
    # Split data into train, validation, and test sets
    X_train, X_val, X_test, y_train, y_val, y_test = train_test_val_split(X_scaled, y_exp_scaled, test_size, val_size)

    # Reshaping data for CNN
    X_train = X_train[..., np.newaxis]
    X_val = X_val[..., np.newaxis]
    X_test = X_test[..., np.newaxis]

    model = create_hybrid_model((X_train.shape[1], 1), y_train.shape[1], target_col_list)
    model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')

    model.summary()

    # Training
    model.fit(X_train, [y_train[:, i] for i in range(y_train.shape[1])],
              validation_data=(X_val, [y_val[:, i] for i in range(y_val.shape[1])]),
              epochs=num_epochs, batch_size=64, verbose=1)

    # Predictions on train, validation, and test sets
    y_train_pred = np.column_stack(model.predict(X_train))
    y_val_pred = np.column_stack(model.predict(X_val))
    y_test_pred = np.column_stack(model.predict(X_test))

    # Inverse scaling
    y_train_pred_original = scaler_y.inverse_transform(y_train_pred)
    y_train_original = scaler_y.inverse_transform(y_train)
    y_val_pred_original = scaler_y.inverse_transform(y_val_pred)
    y_val_original = scaler_y.inverse_transform(y_val)
    y_test_pred_original = scaler_y.inverse_transform(y_test_pred)
    y_test_original = scaler_y.inverse_transform(y_test)

    # Metrics for each target
    for i, target in enumerate(target_col_list):
        print(f'Model Results for {target.capitalize()}:')
        print('Train Data:', regr_report(y_train_original[:, i], y_train_pred_original[:, i]))
        print('Validation Data:', regr_report(y_val_original[:, i], y_val_pred_original[:, i]))
        print('Test Data:', regr_report(y_test_original[:, i], y_test_pred_original[:, i]))
        print()

    return model

# Example execution (requires `X_scaled`, `y_exp_scaled`, `scaler_y`, and `target_col_list` to be defined)
model = cross_val_with_evaluation(X_scaled, y_exp_scaled, target_col_list, scaler_y)
model.save_weights("model_expenses.weights.h5")


Epoch 1/200
[1m344/344[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 10ms/step - loss: 2.0442 - target_childcare_loss: 0.2626 - target_food_loss: 0.1845 - target_healthcare_loss: 0.3345 - target_housing_loss: 0.3239 - target_othernecessities_loss: 0.2459 - target_taxes_loss: 0.2337 - target_transportation_loss: 0.4590 - val_loss: 0.9388 - val_target_childcare_loss: 0.0933 - val_target_food_loss: 0.0455 - val_target_healthcare_loss: 0.1894 - val_target_housing_loss: 0.1658 - val_target_othernecessities_loss: 0.0690 - val_target_taxes_loss: 0.0488 - val_target_transportation_loss: 0.3311
Epoch 2/200
[1m344/344[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 5ms/step - loss: 0.8671 - target_childcare_loss: 0.0867 - target_food_loss: 0.0438 - target_healthcare_loss: 0.1800 - target_housing_loss: 0.1330 - target_othernecessities_loss: 0.0599 - target_taxes_loss: 0.0445 - target_transportation_loss: 0.3192 - val_loss: 0.8616 - val_target_childcare_loss: 0.0859 - val_target_fo