In [None]:
# Drive mount to Load Dataset
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import OrdinalEncoder, StandardScaler
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score, classification_report, f1_score, precision_score, recall_score, confusion_matrix
from imblearn.over_sampling import SMOTE
import torch
from torch import nn
from torch.utils.data import TensorDataset, DataLoader
from tqdm.auto import tqdm

In [None]:
# Loading dataset
data_path = "/content/drive/MyDrive/Gradious_Assignments/Mini_Project_ML/gradious_ML_mini_project.csv"
df = pd.read_csv(data_path)

In [None]:
pd.set_option('display.max_columns', None)

In [None]:
# The values the given target can have
df['Outcome'].unique()

### The given dataset has students data given and we have to predict whether a given student has droppedout or has graduated or is still enrolled in the course.

In [None]:
df.head()

In [None]:
# No null values
df.isna().sum()

In [None]:
# Checking Dataset Balance
print(df['Outcome'].value_counts())
print(df['Outcome'].value_counts(normalize=True))

### The given dataset is unbalanced as the number of student entries that have graduated is 3 times of those who are still enrolled.

In [None]:

# Dropping Program ID as it is useless in predictions
df.drop(['Program_ID'], axis = 1, inplace = True)

In [None]:
# Encoding outcome with ordinal encoder as there is an order to our categories
def encoder(df):
  oe = OrdinalEncoder(categories=[['Dropout', 'Enrolled', 'Graduate']])
  df['Outcome'] = oe.fit_transform(df[['Outcome']])
  df['Outcome'] = df['Outcome'].astype('int')
  return df
df = encoder(df)

In [None]:
df.head()

In [None]:
def correlation_plotting(df):
  corr = df.corr(numeric_only=True)
  plt.figure(figsize=(20, 20))
  sns.heatmap(corr, annot=True, fmt=".2f", cmap="coolwarm", square=True)
  plt.title("Correlation HeatMap")
  plt.tight_layout()
  plt.show()
  return corr

corr = correlation_plotting(df)

In [None]:
# Selected best features with a correlation of more than 0.1
def best_features(df, corr):
  target_corr = corr['Outcome']
  target_corr_sorted = target_corr.abs().sort_values(ascending=False)
  best_features = target_corr_sorted[target_corr_sorted > 0.1].index
  return best_features
best_features = best_features(df, corr)

In [None]:
# Box Plots for numeric data columns vs Outcome
def box_plot(df, num_feats, cat_feats, rows=5, col=3):
  fig, axes = plt.subplots(rows, col, figsize=(21, 14))
  axes = axes.flatten()

  total_feats = num_feats+cat_feats
  for i,c in enumerate(total_feats):
    if c in num_feats:
      sns.boxplot(x = 'Outcome', y = c, data = df, ax = axes[i])
      plt.title(f'{c} vs Outcome')
    elif c in cat_feats:
      sns.countplot(x = c, hue = 'Outcome', data = df, ax = axes[i])
      plt.title(f'{c} vs Outcome')
    axes[i].set_title(f'{c} vs Outcome')
  plt.tight_layout()
  plt.show()


num_feats = ['Passed_2nd_Semester','Grade_2nd_Semester','Passed_1st_Semester',
             'Grade_1st_Semester','Enrollment_Age','Admission_Score','Prior_Qualification_Score']
cat_feats = ['Tuition_Fees_UpToDate_Flag','Scholarship_Recipient_Flag','Outstanding_Debts_Flag',
             'Gender_Code','Application_Method','Enrolled_2nd_Semester','Enrolled_1st_Semester',
             'Student_Displacement_Flag']
box_plot(df, num_feats, cat_feats)

In [None]:
# Feature Engineering over the most impactful features
def feature_engineering(df):
  """
  Input : DataFrame
  Output : New DataFrame with added new features"""
  df_fe = df.copy()
  # Average Grade across both the semesters
  df_fe['Overall_Grade'] = (df_fe['Grade_1st_Semester'] + df_fe['Grade_2nd_Semester']) / 2
  # upword or downward grade
  df_fe['Grade_Improvement'] = df_fe['Grade_2nd_Semester'] - df_fe['Grade_1st_Semester']
  # How many semester passed successfully
  df_fe['Sems_Passed'] = (df_fe['Passed_1st_Semester'] + df_fe['Passed_2nd_Semester']) / 2
  # Total Credits Enrolled in
  df_fe['Total_Credits'] = df_fe['Enrolled_1st_Semester'] + df_fe['Enrolled_2nd_Semester']
  # Grouping age into categories
  df_fe['Age_Group'] = pd.cut(df_fe['Enrollment_Age'], bins = [0, 20, 23, 100], labels = ['<20', '20-23', '24+'])
  # Financial Indicators
  df_fe['Tuition_Fees_Flag'] = ((df_fe['Tuition_Fees_UpToDate_Flag'] == 0) | (df_fe['Outstanding_Debts_Flag'] == 1)).astype(int)

  return df_fe

In [None]:
# Function to clip numeric data
def clip_outliers(df, cols = None):
  df_co = df.copy()
  if cols is None:
    cols = df_co.select_dtypes(include='number').columns
  for c in cols:
    q1, q3 = df_co[c].quantile([0.25, 0.75])
    iqr = q3 - q1
    lower, upper = q1 - 1.5 * iqr, q3 + 1.5 * iqr
    df_co[c] = df_co[c].clip(lower, upper)
  return df_co

In [None]:
df_fe = feature_engineering(df)

In [None]:
df_fe.head()

In [None]:
def scaler_encoder(df, num_cols):
  # clipping data for scaling numerical features
  df_clean = clip_outliers(df_fe, cols = num_cols)

  # Scaling with Standard Scaler our numerical features
  scaler = StandardScaler()
  df_clean[num_cols] = scaler.fit_transform(df_clean[num_cols])

  # Encoding Age Group
  df_clean['Age_Group'] = df_clean['Age_Group'].map({'<20': 0, '20-23': 1, '24+': 2})

  return df_clean

# Total numerical columns
num_cols = [
    'Passed_2nd_Semester','Grade_2nd_Semester','Passed_1st_Semester',
    'Grade_1st_Semester','Enrollment_Age','Admission_Score',
    'Prior_Qualification_Score','Overall_Grade','Grade_Improvement',
    'Sems_Passed','Total_Credits', 'Application_Method', 'Mother\'s_Education_Level', 'Application_Sequence', 'Father\'s_Education_Level',
    'Mother\'s_Job_Category', 'Father\'s_Job_Category', 'Local_Unemployment_Rate', 'Enrolled_2nd_Semester', 'Evaluations_2nd_Semester',
    'Enrolled_1st_Semester', 'Evaluations_1st_Semester', 'Credits_1st_Semester', 'Credits_2nd_Semester', 'Prior_Qualification_Code'
]

df_clean = scaler_encoder(df_fe, num_cols)

In [None]:
df_clean.head()

In [None]:
for col in df_clean.columns:
  print(f'{col} -> ')
  print(df_clean[col].unique())
  print('\n')

In [None]:
def data_split(df):
  X = df.drop('Outcome', axis = 1)
  y = df['Outcome']
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.15, random_state = 42)
  return X_train, X_test, y_train, y_test
X_train, X_test, y_train, y_test = data_split(df_clean)

In [None]:
# Setting Training Parameters
num_epochs = 50
batch_size = 64
learning_rate = 0.001
weight_decay = 0.0001

In [None]:
def convert_to_tensor(X_train, X_test, y_train, y_test):
  # convert pandas -> torch.tensor
  X_train_tensor = torch.tensor(X_train.values, dtype = torch.float32)
  X_test_tensor = torch.tensor(X_test.values, dtype = torch.float32)
  y_train_tensor = torch.tensor(y_train.values, dtype = torch.long)
  y_test_tensor = torch.tensor(y_test.values, dtype = torch.long)

  print(y_train_tensor.dtype, y_test_tensor.dtype)
  # Create TensorDataset
  train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
  test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

  # Create DataLoader
  train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True)
  test_loader = DataLoader(test_dataset, batch_size = batch_size, shuffle = False)

  return train_loader, test_loader

In [None]:
# tensorizing
train_loader, test_loader = convert_to_tensor(X_train, X_test, y_train, y_test)

In [None]:
train_loader.sampler.data_source[0]

In [None]:
NN_input_dim = len(train_loader.sampler.data_source[0][0])

In [None]:
# Outccome Model Architecture
class OutcomeModel(nn.Module):
  def __init__(self, input_dim, hidden_dim1, hidden_dim2, output_dim):
    super(OutcomeModel, self).__init__()
    self.fc1 = nn.Linear(input_dim, hidden_dim1)
    self.batchnorm1 = nn.BatchNorm1d(hidden_dim1)

    self.fc2 = nn.Linear(hidden_dim1, hidden_dim2)
    self.batchnorm2 = nn.BatchNorm1d(hidden_dim2)

    self.out = nn.Linear(hidden_dim2, output_dim)
    self.relu = nn.ReLU()
    self.dropout = nn.Dropout(p=0.3)

  def forward(self, x):
    x = self.relu(self.batchnorm1(self.fc1(x)))
    x = self.dropout(x)
    x = self.relu(self.batchnorm2(self.fc2(x)))
    x = self.dropout(x)
    return self.out(x)

In [None]:
# Instantiating MLP model
model = OutcomeModel(input_dim = NN_input_dim, hidden_dim1 = 128, hidden_dim2 = 64, output_dim = 3)

In [None]:
# Optimizer and Loss Function
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate, weight_decay = weight_decay)
loss_fn = nn.CrossEntropyLoss()

In [None]:
# scheduler to reduce learning rate with drop in validation loss progress
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)

In [None]:
def training_function(model, train_loader, val_loader, optimizer, loss_fn, epochs=50, scheduler=None,
                      patience = 7, device = 'cpu'):

  model.to(device)
  best_validation_loss = float('inf')
  epochs_without_improvement = 0
  best_model_state = model.state_dict() # Initialize best_model_state here

  for epoch in range(epochs):
    model.train()
    training_loss = 0

    # Training loop with tqdm
    for X_batch, y_batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}", leave=False):
      X_batch, y_batch = X_batch.to(device), y_batch.to(device)

      optimizer.zero_grad()
      output = model(X_batch)

      loss = loss_fn(output, y_batch)
      loss.backward()
      optimizer.step()

      training_loss += loss.item()

    # Validation Loop
    model.eval()
    validation_loss = 0
    correct = 0
    total = 0

    with torch.inference_mode():
      for X_batch, y_batch in val_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)

        output = model(X_batch)
        loss = loss_fn(output, y_batch)
        validation_loss += loss.item()

        preds = torch.argmax(output, dim=1)
        correct += (preds == y_batch).sum().item()
        total += y_batch.size(0)

    avg_train_loss = training_loss / len(train_loader)
    avg_validation_loss = validation_loss / len(val_loader)
    validation_accuracy = correct / total

    if scheduler:
      scheduler.step(avg_validation_loss)

    print(f"Epoch {epoch+1}/{epochs}, Training Loss: {avg_train_loss:.4f}, Validation Loss: {avg_validation_loss:.4f}, Validation Accuracy: {validation_accuracy:.4f}")

    # Early Stopping Check
    if avg_validation_loss < best_validation_loss:
      best_validation_loss = avg_validation_loss
      epochs_without_improvement = 0
      best_model_state = model.state_dict()
    else:
      epochs_without_improvement += 1
      if epochs_without_improvement >= patience:
        print(f"Early stopping at epoch {epoch+1}")
        break

  print("Training Complete. Loading Best Model Weights")
  model.load_state_dict(best_model_state)
  return model

In [None]:
model = training_function(model, train_loader, test_loader, optimizer, loss_fn, scheduler=scheduler)

In [None]:
# Classificaiton Report and Confusion Matrix
model.eval()
all_preds = []
all_labels = []

with torch.inference_mode():
  for X_batch, y_batch in test_loader:
    outputs = model(X_batch)
    preds = torch.argmax(outputs, dim = 1)

    all_preds.extend(preds.cpu().numpy())
    all_labels.extend(y_batch.cpu().numpy())

print("\n Classificaiton Report:")
print(classification_report(all_labels, all_preds))

cm = confusion_matrix(all_labels, all_preds)
sns.heatmap(cm, annot = True, fmt = "d", cmap = "Blues")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.show()