In [None]:
%matplotlib inline
import pandas as pd 
import numpy as np

import torch 
from torch.utils.data import Dataset, DataLoader, random_split
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn import preprocessing



device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
print(f"Using {device} device")


In [None]:

file_url = "http://storage.googleapis.com/download.tensorflow.org/data/heart.csv"
heart_csv_df = pd.read_csv(file_url)

heart_csv_df

In [None]:
class HeartRateDataset (torch.utils.data.Dataset):
    
    def __init__(self, raw_df):

        df_copy = raw_df.copy()

        thal_mappings = {
            "1": 1,
            "2": 2,
            "normal": 3,
            "reversible": 4,
            "fixed": 5,
        }

        ## this is the only string categorical column
        df_copy[f'thal_code'] = df_copy["thal"].map(thal_mappings)
        # remove this since the rest of the columns are ready to be normalized
        df_copy.pop('thal') 

        # set aside the labels
        self.label_column = "target"
        self.labels_ndarray = df_copy.pop(self.label_column).values
       
        scaler = preprocessing.MinMaxScaler()
        self.features_ndarray = scaler.fit_transform(df_copy)
        
    def __len__(self):
        return self.features_ndarray.shape[0]
            
    def __getitem__(self, idx):
        
        features_tensor = torch.tensor(self.features_ndarray[idx], dtype=torch.float32)
        label_tensor = torch.tensor(self.labels_ndarray[idx], dtype=torch.float32)

        return features_tensor, label_tensor 




In [None]:
## TODO consider skip last!


NUM_TEST_RECORDS = 90

overall_ds = HeartRateDataset(heart_csv_df)

train_ds, test_ds = random_split(overall_ds, [.75, .25])

batch_size = int(len(train_ds) / 10)
train_dataloader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_ds, batch_size=batch_size, shuffle=True)

print(f'{len(train_ds)} training records in with batch size {batch_size}, {len(test_ds)} records for test')




In [None]:
DROPOUT_RATE_01 = .2

class HeartRateNN(nn.Module):
    def __init__(self, num_feature_columns):
        super().__init__()
        self.linear_stack = nn.Sequential(
            nn.Linear(num_feature_columns, 32),
            nn.GELU(),
            nn.Dropout(p=DROPOUT_RATE_01), 
            nn.Linear(32, 16),
            nn.GELU(),
            nn.Linear(16, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.linear_stack(x)
    

first_training_record, _ = train_ds[0]
num_feature_columns = first_training_record.shape[-1]

model = HeartRateNN(num_feature_columns)
print(model)

In [None]:


SEED = 123
torch.manual_seed(SEED)


if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)
    torch.cuda.manual_seed_all(SEED)

first_training_record, _ = train_ds[0]
num_feature_columns = first_training_record.shape[-1]

model = HeartRateNN( num_feature_columns )
model.to(device)

loss_fn   = nn.BCELoss()  # binary cross entropy
optimizer = optim.Adam(model.parameters(), lr=0.001)
num_epochs = 300

for epoch in range(num_epochs):
    epoch_correct_count, epoch_pred_count = 0, 0
    for X, y in train_dataloader:

        X = X.to(device)
        y = y.to(device)

        y_pred = model(X)
        y_pred = y_pred.reshape(y.shape)
        loss = loss_fn(y_pred, y)

        y_pred_guess = torch.round(y_pred)
        batch_num_correct = (y == y_pred_guess).sum()
        epoch_correct_count += batch_num_correct
        epoch_pred_count += len(y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch [{epoch+1}/{num_epochs}], {epoch_correct_count} of {epoch_pred_count} correct {(100*epoch_correct_count/epoch_pred_count):.1f} %")



In [None]:

model.eval()  # Set the model to evaluation mode

# Define your loss function (e.g., Binary Cross-Entropy)
criterion = nn.BCEWithLogitsLoss()  # Commonly used for binary classification

# Lists to store results
test_losses = []
all_preds = []
all_labels = []

# Iterate over the test batches
with torch.no_grad():  # Disable gradient calculations
    for inputs, labels in test_dataloader:

        # just make this an array of the labels (instead array of arrays with one element)
        labels = labels.reshape(-1)

        y_pred = model(inputs)
        y_pred = y_pred.reshape(labels.shape)  # make sure it matches

        loss = criterion(y_pred, labels)
        test_losses.append(loss.item())

        y_pred_guess = torch.round(y_pred)
        
        all_preds.extend(y_pred_guess.numpy())
        all_labels.extend(labels.numpy())

# Calculate overall metrics
avg_test_loss = np.mean(test_losses)
accuracy = accuracy_score(all_labels, all_preds)
precision = precision_score(all_labels, all_preds)
recall = recall_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds)

print(f"Average Test Loss: {avg_test_loss:.4f}")
print(f"Test Accuracy: {accuracy:.4f}")
print(f"Test Precision: {precision:.4f}")
print(f"Test Recall: {recall:.4f}")
print(f"Test F1-Score: {f1:.4f}")