# Imports

In [None]:
from sklearn.preprocessing import normalize, MinMaxScaler
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
import math
from itertools import compress, combinations, product
from collections import Counter
from sklearn.metrics import balanced_accuracy_score
from statistics import mean
import matplotlib.pyplot as plt
import warnings
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

In [None]:
import os
import fastprogress
import time

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
from torch.utils.data import DataLoader



# gets device
from IPython.display import Markdown, display
def printmd(string):
    display(Markdown(string))

def get_device(cuda_preference=True):
    print('cuda available:', torch.cuda.is_available(), 
          '; cudnn available:', torch.backends.cudnn.is_available(),
          '; num devices:', torch.cuda.device_count())
    
    use_cuda = False if not cuda_preference else torch.cuda.is_available()
    device = torch.device('cuda:0' if use_cuda else 'cpu')
    device_name = torch.cuda.get_device_name(device) if use_cuda else 'cpu'
    print('Using device', device_name)
    return device


# trains network
def train(train_dataloader, optimizer, model, loss_fn, 
                 device, master_bar):

    epoch_loss=[]
    for X, y in fastprogress.progress_bar(train_dataloader, parent=master_bar):
        X, y = X.to(device), y.to(device)
        optimizer.zero_grad()
        model.train()
        
        # Forward
        y_pred = model(X.to(device))

        # Compute loss
        loss = loss_fn(y_pred.to(device), y)
        loss.backward()
        optimizer.step()

        epoch_loss.append(loss.item())
    return np.mean(epoch_loss)


# predicts class
def predict_class(model, X_test):   
    model.eval()
    with torch.no_grad():
      output = model(X_test.to(device))
      pred = torch.argmax(output, 1)
    return pred, output

# trainings wrapper
def run_training(model, loss_fn, lr, eta, train_dataloader, device, num_epochs):
    """ Run model training """
    optimizer = optim.Adam(model.parameters(), lr, weight_decay=eta)

    start_time = time.time()
    master_bar = fastprogress.master_bar(range(num_epochs))
    train_losses = []
    for epoch in master_bar:       
        epoch_train_loss = train(train_dataloader, optimizer, model, loss_fn, device, master_bar)
        train_losses.append(epoch_train_loss)
        master_bar.write(f'Train loss: {epoch_train_loss:.3f}')      
    time_elapsed = np.round(time.time() - start_time, 0).astype(int)
    print(f'Finished training after {time_elapsed} seconds.')
    return

# QFSL model
class QFSL(nn.Module):    
    
    def __init__(self, dim_in, dim_out, n_classes, dim_hidden = 50, hidden=True):
        super(QFSL, self).__init__()
        self.model = []
        if hidden:
          self.model.append(nn.Linear(dim_in, dim_hidden))
          self.model.append(nn.ReLU())
          self.model.append(nn.Linear(dim_hidden, dim_out))
        else:
          self.model.append(nn.Linear(dim_in, dim_out))
        self.model.append(nn.ReLU())
        self.model.append(nn.Linear(dim_out, n_classes))
        self.model = nn.ModuleList(self.model)
    
    def forward(self, x):
        for layer in self.model:
          x = layer(x)
        return x
    # set weights according to mean source information
    def set_weights(self, avg_source):
      self.model[-1].weight = nn.Parameter(torch.FloatTensor(avg_source), False)
      return

# creates class attributes from source
def get_class_attributes(X_source, y_source):
  avg_source = normalize(X_source.groupby(y_source).mean().to_numpy())
  return avg_source


# rebalances data
def balance_sampling(X, y, n=100):
    """
    Re-balances data by over-sampling with SMOTE and under-sampling randomly
    :param X: feature matrix
    :param y: labels
    :param n: desired samples per class
    :return: resampled feature matrix, resampled labels
    """
    warnings.filterwarnings('ignore')
    counts = Counter(y)
    under = np.array([], dtype="int32")
    over = np.array([], dtype="int32")
    for i in counts.keys():
        if counts[i] <= n:
            over = np.concatenate((over, np.array([i])))
        else:
            under = np.concatenate((under, np.array([i])))
    if len(over) == 0:
        dict_under = dict(zip(under, [n for i in range(len(under))]))
        under_sam =  RandomUnderSampler(sampling_strategy=dict_under)
        X_under, y_under = under_sam.fit_resample(X, y)
        return X_under, y_under
    elif len(under) == 0:
        dict_over = dict(zip(over, [n for i in range(len(over))]))
        over_sam = SMOTE(sampling_strategy=dict_over)
        X_over, y_over = over_sam.fit_resample(X, y)
        return X_over, y_over
    else:
        if len(over) == 1:
            # Tricks SMOTE into oversampling one class
            pseudo_X = np.full((n, X.shape[1]), 10000)
            pseudo_y = np.full(n, 10000)
            dict_over = dict()
            dict_over[over[0]] = n
            dict_over[10000] = n
            is_over = np.in1d(y, over)
            over_sam = SMOTE(sampling_strategy=dict_over)
            is_over = np.in1d(y, over)
            X_over_, y_over_ = over_sam.fit_resample(np.concatenate((X[is_over], pseudo_X)),
                                                     np.concatenate((y[is_over], pseudo_y)))
            X_over = X_over_[y_over_==over[0]]
            y_over = y_over_[y_over_==over[0]]

        else:
            dict_over = dict(zip(over, [n for i in range(len(over))]))
            over_sam = SMOTE(sampling_strategy=dict_over)
            is_over = np.in1d(y, over)
            X_over, y_over = over_sam.fit_resample(X[is_over], y[is_over])

        if len(under) == 1:
            # Tricks RandomUnderSampler into working with one class
            pseudo_X = np.full((n, X.shape[1]), 10000)
            pseudo_y = np.full(n, 10000)
            dict_under = dict()
            dict_under[under[0]] = n
            dict_under[10000] = n
            is_under = np.in1d(y, under)
            under_sam = RandomUnderSampler(sampling_strategy=dict_under)
            is_under = np.in1d(y, under)
            X_under_, y_under_ = under_sam.fit_resample(np.concatenate((X[is_under], pseudo_X)),
                                                        np.concatenate((y[is_under], pseudo_y)))
            X_under = X_under_[y_under_==under[0]]
            y_under = y_under_[y_under_==under[0]]
        else:
            dict_under = dict(zip(under, [n for i in range(len(under))]))
            under_sam = RandomUnderSampler(sampling_strategy=dict_under)
            is_under = np.in1d(y, under)
            X_under, y_under = under_sam.fit_resample(X[is_under], y[is_under])

        X_combined_sampling = np.concatenate((X_over, X_under))
        y_combined_sampling = np.concatenate((y_over, y_under))
        return X_combined_sampling, y_combined_sampling

# splits unknown cells
def split_masked_cells(X_t, y_t, masked_cells, balance=False, n=500):
    """
    Maskes cells for generalized zero-shot learning
    :param X_t: feature matrix of target data
    :param y_t: labels of target data
    :param masked_cells: list of cells to be masked from data
    :param balance: whether to balance seen train data
    :param n: desired number of samples per class
    :return: features of seen classes, features of unseen classes, labels seen classes, labels unseen classes
    """
    keep = np.in1d(y_t, masked_cells, invert=True)
    X_t_seen = X_t[keep]
    X_t_unseen = X_t[~keep]
    y_seen = y_t[keep]
    y_unseen = y_t[~keep]
    if balance:
        X_t_seen, y_seen = balance_sampling(X_t_seen, y_seen, n)
    return X_t_seen, X_t_unseen, y_seen, y_unseen


# Preparations

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
device = get_device()

cuda available: True ; cudnn available: True ; num devices: 1
Using device Tesla T4


In [None]:
X_source = pd.read_csv("/content/drive/MyDrive/data/lung_mouse_red_scetm.csv", index_col=0)
y_source = pd.read_csv("/content/drive/MyDrive/data/lung_mouse_red_label.csv", index_col=0)["label"]
X_avg = get_class_attributes(X_source, y_source)
X_train = pd.read_csv("/content/drive/MyDrive/data/lung_human_red_train_scetm.csv", index_col=0)
y_train = pd.read_csv("/content/drive/MyDrive/data/lung_human_red_train_label.csv", index_col=0)["label"]
X_test = pd.read_csv("/content/drive/MyDrive/data/lung_human_red_test_scetm.csv", index_col=0)
y_test = pd.read_csv("/content/drive/MyDrive/data/lung_human_red_test_label.csv", index_col=0)["label"].to_numpy()
X_train, y_train = balance_sampling(X_train, y_train, 300)

In [None]:
comb = [1, 2]
X_seen, X_unseen, y_seen, y_unseen = split_masked_cells(X_train, y_train, masked_cells=comb)
scaler = MinMaxScaler(feature_range=(0, 1))
X_norm = scaler.fit_transform(X_seen)
X_norm_tens = torch.FloatTensor(X_norm)
y_seen_tens = torch.LongTensor(y_seen)
X_test_norm = scaler.transform(X_test)
X_test_norm_tens = torch.FloatTensor(X_test_norm)
train_data = []
for i in range(len(y_seen_tens)):
   train_data.append([X_norm_tens[i], y_seen_tens[i]])
batch_size = 64
train_dataloader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle=True,drop_last=False)

Play around...

In [None]:
lr =.0001
num_epochs = 1000

loss_fn = nn.CrossEntropyLoss(reduction="mean")
model = QFSL(50, 50, 11)
model.set_weights(X_avg)
model.to(device)

run_training(model, loss_fn, lr, lam, train_dataloader, device, num_epochs)
pred, output = predict_class(model, X_test_norm_tens)
pred = pred.detach().cpu().numpy()
conf = confusion_matrix(y_test, pred, labels = range(11), normalize = 'true')
ConfusionMatrixDisplay(conf, display_labels = range(11)).plot()