In [None]:
import pandas as pd
import numpy as np 
from torch.utils.data import TensorDataset, DataLoader
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [None]:
#add torch reproducibility 

In [None]:
xiang_filtered = pd.read_csv("xiang_filtered.csv")
xiang_filtered_embeddings = torch.load("filtered_embeddings.pt")


In [None]:
unique_extenders = xiang_filtered["Extender"].unique()
#unique_extenders = [x for x in unique_extenders if x != pd.nan]
unique_extenders = ['MM', 'M', 'MA21', 'MA3', 'MA10', 'MA22', 'MA5,7,8,10', 'MA15','MA4','MA6']
xiang_filtered_extenders = xiang_filtered[xiang_filtered["Extender"].isin(unique_extenders)]
unique_extenders.sort() #sort alphabetically 

extenders_enumerated = {x: i for i, x in enumerate(unique_extenders)}
print(len(extenders_enumerated))


In [None]:
unique_orders = xiang_filtered["Order of module"].unique()
#unique_extenders = [x for x in unique_extenders if x != pd.nan]
xiang_filtered_orders = xiang_filtered[xiang_filtered["Order of module"].isin(unique_orders)]
unique_orders.sort() #sort alphabetically 

orders_enumerated = {x: i for i, x in enumerate(unique_orders)}
print(len(orders_enumerated))


In [None]:
xiang_filtered["OrderEnumerated"] = xiang_filtered["Order of module"].map(orders_enumerated)
orders_np = xiang_filtered["OrderEnumerated"].to_numpy()
orders_np = np.array([torch.tensor(x) for x in orders_np])
xiang_filtered_tensor = torch.tensor(orders_np, dtype=torch.long)
xiang_orders_onehot = (lambda x: F.one_hot(x, num_classes=28))(xiang_filtered_tensor)

In [None]:
xiang_filtered["ExtenderEnumerated"] = xiang_filtered["Extender"].map(extenders_enumerated)
extender_np = xiang_filtered["ExtenderEnumerated"].to_numpy()
extender_np = np.array([torch.tensor(x) for x in extender_np])
xiang_filtered_tensor = torch.tensor(extender_np, dtype=torch.long)
xiang_extenders_onehot = (lambda x: F.one_hot(x, num_classes=10))(xiang_filtered_tensor)

In [None]:
xiang_extenders_onehot

In [None]:
xiang_orders_onehot

In [None]:
xiang_filtered

In [None]:
#average pooling. [len_of_seq, 1, 1536]


xiang_filtered_embeddings = [x.mean(dim=1).squeeze(0) for x in xiang_filtered_embeddings]

In [None]:
print(xiang_filtered_embeddings[0].shape)

In [None]:
xiang_filtered_embeddings[1].shape

In [None]:
#adding onehot encoding
#xiang_filtered_embeddings = [torch.cat((xiang_filtered_embeddings[i],xiang_extenders_onehot[i]), dim=0) for i in range(len(xiang_filtered_embeddings))]
#xiang_filtered_embeddings = [torch.cat((xiang_filtered_embeddings[i],xiang_orders_onehot[i]), dim=0) for i in range(len(xiang_filtered_embeddings))]

In [None]:
xiang_filtered_embeddings[0]

In [None]:
#stacking
xiang_embeddings = torch.stack(xiang_filtered_embeddings)

In [None]:
#outputs need to be converted to numerical values.

annotations_unique = xiang_filtered["Annotation"].unique()
annotations_unique.sort() #sort alphabetically 

annotation_enumerated = {x: i for i, x in enumerate(annotations_unique)}
print(annotation_enumerated)

In [None]:
print(xiang_filtered["Annotation"])

In [None]:
xiang_filtered["AnnotationEnumerated"] = xiang_filtered["Annotation"].map(annotation_enumerated)

In [None]:
print(xiang_filtered["AnnotationEnumerated"])

In [None]:
xiang_filtered_np = xiang_filtered["AnnotationEnumerated"].to_list()

In [None]:
#xiang_filtered_tensor = torch.tensor(xiang_filtered_np)
#xiang_filtered_tensor = [torch.tensor(x, dtype=torch.long) for x in xiang_filtered_np]

xiang_filtered_tensor = torch.tensor(xiang_filtered_np, dtype=torch.long)

In [None]:
from sklearn.model_selection import train_test_split

x_train_tensor, x_test_tensor, y_train_tensor, y_test_tensor = train_test_split(
    xiang_embeddings,
    xiang_filtered_tensor,
    test_size = 0.2,
    random_state=1,
    stratify=xiang_filtered_tensor
)

mu, sigma = x_train_tensor.mean(0), x_train_tensor.std(0) + 1e-9
x_train_tensor = (x_train_tensor - mu) / sigma
x_test_tensor = (x_test_tensor - mu) / sigma

print("x train len")
print(len(x_train_tensor))
print("y train len")
print(len(y_train_tensor))

In [None]:
'''
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Dropout(0.20),
            nn.Linear(1024, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.20),
            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.20),
            nn.Linear(256,128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(0.20),
'''

In [None]:
xiang_embeddings[0].shape[0]

In [None]:

#add dropout_rate. ==
class kr_predict(nn.Module):
    def __init__(self):
        super(kr_predict, self).__init__()
        self.hidden = nn.Sequential(
            nn.Linear(xiang_embeddings[0].shape[0], 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512,256),
            nn.ReLU(),
            nn.Dropout(0.3),
        )
        self.out = nn.Linear(256, 9)
    def forward(self, x):
        #x = x.view(x.size(0), -1) # flatten so we're removing 
        x = self.hidden(x)
        x = self.out(x)
        return x      



'''
class kr_predict(nn.Module):
    def __init__(self):
        super(kr_predict, self).__init__()
        self.hidden = nn.Sequential(
            nn.Linear(1536, 512),     # Slightly wider first layer
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 256),
            nn.ReLU(), 
            nn.Dropout(0.3),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
        )
        self.out = nn.Linear(128, 9)
    def forward(self, x):
        #x = x.view(x.size(0), -1) # flatten so we're removing - modified so its done w/ squeeze
        x = self.hidden(x)
        x = self.out(x)
        return x 
'''

In [None]:
#xiang_embeddings[0].shape[1]

In [None]:
model = kr_predict()

In [None]:
#apple silicon

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model.to(device)

if torch.backends.mps.is_available():
    print("MPS is available! Using Apple Silicon GPU.")
else:
    print("MPS is not available. CPU Fallback.")

In [None]:

from collections import Counter
class_counts = Counter(y_train_tensor.numpy())
print("Class distribution:", class_counts)
#holy imbalance.
#total_samples = sum(class_counts)
#class_weights = [total_samples/(len(class_counts)*count) for count in class_counts]
#class_weights = torch.FloatTensor(class_weights)
#print("Class weights:", class_weights)
#loss = nn.CrossEntropyLoss(weight=class_weights.to(device))

loss = nn.CrossEntropyLoss()
adam = optim.Adam(model.parameters(), lr = .00001)

#scheduler = optim.lr_scheduler.CosineAnnealingLR(adam, T_max=50)
#scheduler = optim.lr_scheduler.StepLR(adam, step_size=800, gamma=0.01)


In [None]:
x_train_tensor

In [None]:
x_train_tensor = x_train_tensor
y_train_tensor = (y_train_tensor).long()

In [None]:
print(y_train_tensor)

In [None]:
#use TensorDatset, DataLoader from torch utils

batch_size = 8
train_dataset = TensorDataset(x_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
#dropping last bc we are using batchnorm, so it needs >1 batch size 

for epoch in range(1500):
    model.train()
    epoch_loss = 0.0

    for seqs, anns in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
        seqs = seqs.to(device)
        anns = anns.to(device)
        output = model(seqs)
        output_loss = loss(output, anns)
        adam.zero_grad()
        output_loss.backward()
        adam.step()
        epoch_loss += output_loss.item() * seqs.size(0) #batch size scaling
    avg_loss = epoch_loss / len(train_dataset)
    #scheduler.step()
    print(f"Epoch {epoch+1}: Loss: {avg_loss:.4f}")



#visualize training loss using a graph... same simple implementation for training as shown above. 


In [None]:
test_dataset = TensorDataset(x_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
#dropping last bc we are using batchnorm, so it needs >1 batch size 

all_predictions = []
all_targets = []
def accuracy():
    model.eval()
    correct = 0
    total = 0


    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs) #model predictions [batch_size, 9]
            values, predicted = torch.max(outputs.data, 1)
            #values has highest score for each sample in batch
            #the predicted part has the classes w/ highest score for each sample
            total += targets.size(0) #add batch size
            correct += (predicted == targets).sum().item()

            #for classification report
            all_predictions.extend(predicted.cpu().numpy())
            all_targets.extend(targets.cpu().numpy())
    return 100 * correct/total

accuracy()

In [None]:

from sklearn.metrics import classification_report
import numpy as np


class_names = ['A', 'A1', 'A2', 'B', 'B1', 'B2', 'C', 'C1', 'C2']
print(classification_report(all_targets, all_predictions, target_names=class_names))

In [None]:
xiang_filtered_tensor