In [None]:
import magic

# Plotting and miscellaneous imports
import os
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec

%matplotlib inline

In [None]:
import warnings
warnings.filterwarnings("ignore")

import torch
import numpy as np
import scanpy as sc
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split

path='./Baron.h5'
data = sc.read_h5ad(path)
sc.pp.normalize_total(data, target_sum=1e4)
sc.pp.log1p(data)

In [None]:
X_all = data.X
y_all = data.obs.values[:,0]

print(X_all.shape)
print(y_all.shape)

print(np.mean(X_all))
X_all[np.isnan(X_all)] = 0
print(np.mean(X_all))

print(data.obs.values.shape)
print(data.obs.values[0:5,:])

X_train, X_test, y_train, y_test = train_test_split(X_all, y_all, test_size=0.2, random_state=1)
# X_valid, X_test, y_valid, y_test = train_test_split(X_valid_test, y_valid_test, test_size=0.5, random_state=1)

print(X_train.shape)
# print(X_valid.shape)
print(X_test.shape)
print(y_train.shape)
# print(len(y_valid))
print(y_test.shape)

class CellDataset(torch.utils.data.Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X)
        self.y = y

    def __len__(self):
        return len(self.y)

    def __getitem__(self, index):
        return self.X[index], self.y[index]

train_set = CellDataset(X_train, y_train)
# valid_set = CellDataset(X_valid, y_valid)
test_set = CellDataset(X_test, y_test)

train_loader = DataLoader(dataset=train_set, batch_size=128, shuffle=True,num_workers=0)
# valid_loader = DataLoader(dataset=valid_set, batch_size=128, shuffle=False,num_workers=0)
test_loader = DataLoader(dataset=test_set, batch_size=128, shuffle=False, num_workers=0)

import numpy as np
n_clusters = len(np.unique(y_train))

print(len(np.unique(y_train)))
# print(len(np.unique(y_valid)))
print(len(np.unique(y_test)))

print(np.unique(y_train))
# print(np.unique(y_valid))
print(np.unique(y_test))

In [None]:
magic_op = magic.MAGIC()

In [None]:
import random
from torch import nn

def setup_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True


In [None]:
def l1_distance(imputed_data, original_data):

    return np.mean(np.abs(original_data-imputed_data))

def RMSE(imputed_data, original_data):
    return np.sqrt(np.mean((original_data - imputed_data)**2))


def pearson_corr(imputed_data, original_data):
    Y = original_data
    fake_Y = imputed_data
    fake_Y, Y = fake_Y.reshape(-1), Y.reshape(-1)
    fake_Y_mean, Y_mean = np.mean(fake_Y), np.mean(Y)
    corr = (np.sum((fake_Y - fake_Y_mean) * (Y - Y_mean))) / (
            np.sqrt(np.sum((fake_Y - fake_Y_mean) ** 2)) * np.sqrt(np.sum((Y - Y_mean) ** 2)))
    return corr


In [None]:
def DropData(batch_x, d_rate):
    zero_idx = np.where(batch_x != 0, np.ones(batch_x.shape),
                                       np.zeros(batch_x.shape))
    batch_x_nozero = np.where(batch_x == 0, np.zeros(batch_x.shape)-999, batch_x)
    sample_mask = torch.rand(batch_x_nozero.shape) <= d_rate
    batch_x_drop = np.where(sample_mask, np.zeros(batch_x_nozero.shape), batch_x_nozero)

    final_mask = np.where(batch_x_drop == 0, np.ones(batch_x_drop.shape), np.zeros(batch_x_drop.shape)) * zero_idx
    final_x = np.where(batch_x_drop == -999, np.zeros(batch_x.shape), batch_x_drop)
    
    return final_mask, final_x

In [None]:
def run_model(myseed, epochs,d_rate):
    setup_seed(myseed)
    valid_pcc = []
    valid_l1 = []
    valid_rmse = []
    #best_epoch = 0
    #max_pcc = 0

    np.set_printoptions(threshold=np.inf)
    np.set_printoptions(precision=2)
    np.set_printoptions(suppress=True)
    
    final_mask, final_x = DropData(X_test, d_rate)
    
    x_imp =  magic_op.fit_transform(final_x)
    #mask = np.where(X_test != 0, np.ones(X_test.shape),np.zeros(X_test.shape))
                
    pcc = pearson_corr((final_mask* x_imp), (final_mask* X_test))
    l1 = l1_distance((final_mask* x_imp), (final_mask* X_test))
    rmse = RMSE((final_mask* x_imp), (final_mask* X_test))
                
        
    print(pcc)
    print(l1)
    print(rmse)

    return pcc, l1, rmse

In [None]:
pcc_list = []
l1_list = []
rmse_list = []

for i in range(10):
    print(i)
    myseed = i 
    pcc, l1, rmse = run_model(myseed, X_test,d_rate=0.1)

    pcc_list.append(pcc)
    l1_list.append(l1)
    rmse_list.append(rmse)

In [None]:
import csv

rows=zip(pcc_list,l1_list,rmse_list)
csv_list = "./Baron_Droup0.1_magic.csv"

with open(csv_list, mode='w', newline='') as file: 
    writer = csv.writer(file,delimiter='\t') 
    # 写入每一行 
    for row in rows: 
        writer.writerow(row)