## functions

### encoder generator

In [30]:
# Modified from Portal:
# Zhao J, et al. (2022) Adversarial domain translation networks for integrating large-scale atlas-level single-cell datasets. Nature Computational Science 2(5):317-330.

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np


class encoder(nn.Module):
    def __init__(self, n_input, n_latent):
        super(encoder, self).__init__()
        self.n_input = n_input
        self.n_latent = n_latent
        n_hidden = 512

        self.W_1 = nn.Parameter(torch.Tensor(n_hidden, self.n_input).normal_(mean=0.0, std=0.1))
        self.b_1 = nn.Parameter(torch.Tensor(n_hidden).normal_(mean=0.0, std=0.1))

        self.W_2 = nn.Parameter(torch.Tensor(self.n_latent, n_hidden).normal_(mean=0.0, std=0.1))
        self.b_2 = nn.Parameter(torch.Tensor(self.n_latent).normal_(mean=0.0, std=0.1))

    def forward(self, x):
        h = F.relu(F.linear(x, self.W_1, self.b_1))
        z = F.linear(h, self.W_2, self.b_2)
        return z

class generator(nn.Module):
    def __init__(self, n_input, n_latent):
        super(generator, self).__init__()
        self.n_input = n_input
        self.n_latent = n_latent
        n_hidden = 512

        self.W_1 = nn.Parameter(torch.Tensor(n_hidden, self.n_latent).normal_(mean=0.0, std=0.1))
        self.b_1 = nn.Parameter(torch.Tensor(n_hidden).normal_(mean=0.0, std=0.1))

        self.W_2 = nn.Parameter(torch.Tensor(self.n_input, n_hidden).normal_(mean=0.0, std=0.1))
        self.b_2 = nn.Parameter(torch.Tensor(self.n_input).normal_(mean=0.0, std=0.1))

    def forward(self, z):
        h = F.relu(F.linear(z, self.W_1, self.b_1))
        x = F.linear(h, self.W_2, self.b_2)
        return x

class discriminator(nn.Module):
    def __init__(self, n_input):
        super(discriminator, self).__init__()
        self.n_input = n_input
        n_hidden = 512

        self.W_1 = nn.Parameter(torch.Tensor(n_hidden, self.n_input).normal_(mean=0.0, std=0.1))
        self.b_1 = nn.Parameter(torch.Tensor(n_hidden).normal_(mean=0.0, std=0.1))

        self.W_2 = nn.Parameter(torch.Tensor(n_hidden, n_hidden).normal_(mean=0.0, std=0.1))
        self.b_2 = nn.Parameter(torch.Tensor(n_hidden).normal_(mean=0.0, std=0.1))

        self.W_3 = nn.Parameter(torch.Tensor(1, n_hidden).normal_(mean=0.0, std=0.1))
        self.b_3 = nn.Parameter(torch.Tensor(1).normal_(mean=0.0, std=0.1))

    def forward(self, x):
        h = F.relu(F.linear(x, self.W_1, self.b_1))
        h = F.relu(F.linear(h, self.W_2, self.b_2))
        score = F.linear(h, self.W_3, self.b_3)
        return torch.clamp(score, min=-50.0, max=50.0)

class encoder_site(nn.Module):
    def __init__(self, n_input, n_latent):
        super(encoder_site, self).__init__()
        self.n_input = n_input
        self.n_latent = n_latent
        n_hidd_1 = 1000
        n_hidd_2 = 500
        n_hidd_3 = 50
        n_hidd_4 = 10

        self.fc1 = nn.Linear(n_input, n_hidd_1)
        self.fc1_bn = nn.BatchNorm1d(n_hidd_1)
        self.fc2 = nn.Linear(n_hidd_1, n_hidd_2)
        self.fc2_bn = nn.BatchNorm1d(n_hidd_2)
        self.fc3 = nn.Linear(n_hidd_2, n_hidd_3)
        self.fc3_bn = nn.BatchNorm1d(n_hidd_3)
        self.fc4 = nn.Linear(n_hidd_3, n_hidd_4)
        self.fc4_bn = nn.BatchNorm1d(n_hidd_4)
        self.fc5 = nn.Linear(n_hidd_4, n_latent)

    def forward(self, input):
        h1 = F.relu(self.fc1_bn(self.fc1(input)))
        h2 = F.relu(self.fc2_bn(self.fc2(h1)))
        h3 = F.relu(self.fc3_bn(self.fc3(h2)))
        h4 = F.relu(self.fc4_bn(self.fc4(h3)))
        #return F.relu(self.fc5(h4))
        return self.fc5(h4)

class decoder_site(nn.Module):
    def __init__(self, n_input, n_latent):
        super(decoder_site, self).__init__()
        self.n_input = n_input
        self.n_latent = n_latent
        n_hidd_6 = 10
        n_hidd_7 = 50
        n_hidd_8 = 500
        n_hidd_9 = 1000

        self.fc6 = nn.Linear(n_latent, n_hidd_6)
        self.fc6_bn = nn.BatchNorm1d(n_hidd_6)
        self.fc7 = nn.Linear(n_hidd_6, n_hidd_7)
        self.fc7_bn = nn.BatchNorm1d(n_hidd_7)
        self.fc8 = nn.Linear(n_hidd_7, n_hidd_8)
        self.fc8_bn = nn.BatchNorm1d(n_hidd_8)
        self.fc9 = nn.Linear(n_hidd_8, n_hidd_9)
        self.fc9_bn = nn.BatchNorm1d(n_hidd_9)
        self.fc10 = nn.Linear(n_hidd_9, n_input)

    def forward(self, z):
        h6 = F.relu(self.fc6_bn(self.fc6(z)))
        h7 = F.relu(self.fc7_bn(self.fc7(h6)))
        h8 = F.relu(self.fc8_bn(self.fc8(h7)))
        h9 = F.relu(self.fc9_bn(self.fc9(h8)))
        return self.fc10(h9)

### others

In [1]:
import os
import numpy as np
import scanpy as sc
import pandas as pd
import anndata

import ot

import torch
import torch.nn as nn
import torch.nn.functional as F

loss1 = nn.L1Loss()
loss2 = nn.MSELoss()

def get_max_index(
    vector
):
    """This function Returns the index of the vector's maximum value.

        Args:
            vector: A vector that store values.

        Return:
            max_index: The index of the vector's maximum value.
    """
    max_index=np.where(vector==np.max(vector))[0][0]
    return max_index

def trans_plan_b(
    latent_A,
    latent_B,
    metric='correlation',
    reg=0.1,
    numItermax=10,
    device='cpu'
):
    """This function Returns the optimal transport (OT) plan.

        Args:
            latent_A, latent_B: Two set of data points.
            metric: Metric of OT. Default is 'correlation'.
            reg: The weight of entropy regularized term. Default is 0.1.
            numItermax: Iterations of OT. Default is 10.

        Return:
            plan: The index of the vector's maximum value.
    """
    cost = ot.dist(latent_A.detach().cpu().numpy(), latent_B.detach().cpu().numpy(), metric=metric)
    cost = torch.from_numpy(cost).float().to(device)

    length_A = latent_A.shape[0]
    length_B = latent_B.shape[0]

    P = torch.exp(-cost/reg)

    p_s = torch.ones(length_A, 1) / length_A
    p_t = torch.ones(length_B, 1) / length_B
    p_s = p_s.to(device)
    p_t = p_t.to(device)

    u_s = torch.ones(length_A, 1) / length_A
    u_t = torch.ones(length_B, 1) / length_B
    u_s = u_s.to(device)
    u_t = u_t.to(device)

    for i in range(numItermax):
        p_t = u_t / torch.mm(torch.transpose(P, 0, 1), p_s)
        p_s = u_s / torch.mm(P, p_t)

    plan = torch.transpose(p_t, 0, 1) * P * p_s

    return plan

def rand_projections(
    embedding_dim,
    num_samples=50,
    device='cpu'
):
    """This function generates `num_samples` random samples from the latent space's unit sphere.

        Args:
            embedding_dim (int): embedding dimensionality
            num_samples (int): number of random projection samples

        Return:
            torch.Tensor: tensor of size (num_samples, embedding_dim)
    """

    projections = [w / np.sqrt((w**2).sum())  # L2 normalization
                   for w in np.random.normal(size=(num_samples, embedding_dim))]
    projections = np.asarray(projections)
    return torch.from_numpy(projections).type(torch.FloatTensor).to(device)


def _sliced_wasserstein_distance(
    encoded_samples,
    distribution_samples,
    num_projections=50,
    p=2,
    device='cpu'
):
    """ Sliced Wasserstein Distance between encoded samples and drawn distribution samples.

        Args:
            encoded_samples (toch.Tensor): tensor of encoded training samples
            distribution_samples (torch.Tensor): tensor of drawn distribution training samples
            num_projections (int): number of projections to approximate sliced wasserstein distance
            p (int): power of distance metric
            device (torch.device): torch device (default 'cpu')

        Return:
            torch.Tensor: tensor of wasserstrain distances of size (num_projections, 1)
    """

    # derive latent space dimension size from random samples drawn from latent prior distribution
    embedding_dim = distribution_samples.size(1)
    # generate random projections in latent space
    projections = rand_projections(embedding_dim, num_projections).to(device)
    # calculate projections through the encoded samples
    encoded_projections = encoded_samples.matmul(projections.transpose(0, 1).to(device))
    # calculate projections through the prior distribution random samples
    distribution_projections = (distribution_samples.matmul(projections.transpose(0, 1)))
    # calculate the sliced wasserstein distance by
    # sorting the samples per random projection and
    # calculating the difference between the
    # encoded samples and drawn random samples
    # per random projection
    wasserstein_distance = (torch.sort(encoded_projections.transpose(0, 1), dim=1)[0] -
                            torch.sort(distribution_projections.transpose(0, 1), dim=1)[0])
    # distance between latent space prior and encoded distributions
    # power of 2 by default for Wasserstein-2
    wasserstein_distance = torch.pow(wasserstein_distance, p)
    # approximate mean wasserstein_distance for each projection
    return wasserstein_distance.mean()


def sliced_wasserstein_distance(
    encoded_samples,
    transformed_samples,
    num_projections=50,
    p=2,
    device='cpu'
):
    """ Sliced Wasserstein Distance between encoded samples and drawn distribution samples.

        Args:
            encoded_samples (toch.Tensor): tensor of encoded training samples
            distribution_samples (torch.Tensor): tensor of drawn distribution training samples
            num_projections (int): number of projections to approximate sliced wasserstein distance
            p (int): power of distance metric
            device (torch.device): torch device (default 'cpu')

        Return:
            torch.Tensor: tensor of wasserstrain distances of size (num_projections, 1)
    """
    # derive batch size from encoded samples
    # draw random samples from latent space prior distribution

    # approximate mean wasserstein_distance between encoded and prior distributions
    # for each random projection
    swd = _sliced_wasserstein_distance(encoded_samples, transformed_samples, num_projections, p, device)
    return swd

### model

In [None]:
# Modified from Portal:
# Zhao J, et al. (2022) Adversarial domain translation networks for integrating large-scale atlas-level single-cell datasets. Nature Computational Science 2(5):317-330.
import os
import time
import numpy as np
import scanpy as sc
import pandas as pd
import tables
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from sklearn.decomposition import PCA, IncrementalPCA
import scipy.sparse as sp
from scipy.spatial.distance import cdist
from tqdm import tqdm

#from STALocator.networks import *


class Model3(object):
    def __init__(self,
                 beta=1.0,
                 lambda_Lasso=0.1,
                 resolution = "low", # Resolution of ST dataset. Available options are: "low" and "high".
                 batch_size = 500, # Batch size.
                 train_epoch = 5000, # Epochs.
                 cut_steps = 0.5, # Switch location between integration network and localization network training.
                 seed = 1234, # Random seed.
                 npcs = 30, # Number of top PCs.
                 n_latent = 20, # Dimension of shared latent space on integration network.
                 n_coord = 2, # Dimension of spatial location.
                 sf_coord = 12000, # scaling factor of spatial location.
                 location = "spatial", # storing location of spatial location.
                 rad_cutoff = None, # The distance of the location cell from the nearest spot is used to filter out cells located outside the tissue section.
                 lambdaGAN = 1.0, # The weight of adversarial learning loss on integration network.
                 lambdacos = 20.0, # The weight of cosine similarity on integration network.
                 lambdaAE = 10.0, # The weight of auto-encoder consistency on integration network.
                 lambdaLA = 10.0, # The weight of latent alignment loss on integration network.
                 lambdaSWD = 5.0, # The weight of sliced Wasserstein distance on integration network.
                 lambdalat = 1.0, # The weight of spatial location fitting loss on localization network.
                 lambdarec = 0.01, # The weight of reconstruction loss on localization network.
                 model_path = "models", # Model save path.
                 data_path = "data", # Data save path.
                 result_path = "results", # Result save path.
                 ot = True, # Whether to perform minibatch optimal transport. Available options are: True and False.
                 verbose = True, # Whether to print running information. Available options are: True and False.
                 device = "cpu" # The device of model running. Specific graphic card should be specified if use GPU.
                 ):

        # set random seed
        torch.manual_seed(seed)
        np.random.seed(seed)
        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.benchmark = True

        if resolution not in ['low', 'high']:
            raise ValueError("Resolution must be either 'low' or 'high'.")

        if resolution == "high" and not ot:
            raise ValueError("If resolution is 'high', ot must be True.")

        self.resolution = resolution
        self.batch_size = batch_size
        self.train_epoch = train_epoch
        if self.resolution == "low":
            self.cut_steps = cut_steps
        elif self.resolution == "high":
            self.cut_steps = 1

        self.npcs = npcs
        self.n_latent = n_latent
        self.n_coord = n_coord
        self.sf_coord = sf_coord
        self.location = location
        self.rad_cutoff = rad_cutoff
        self.beta = beta
        self.lambda_Lasso = lambda_Lasso
        self.lambdaGAN = lambdaGAN
        self.lambdacos = lambdacos
        self.lambdaAE = lambdaAE
        self.lambdaLA = lambdaLA
        self.lambdaSWD = lambdaSWD

        self.lambdalat = lambdalat
        self.lambdarec = lambdarec

        self.margin = 5.0

        self.model_path = model_path
        self.data_path = data_path
        self.result_path = result_path

        self.ot = ot
        self.verbose = verbose
        self.device = device

        self.K = 21  # GMM组件数量（根据论文设置）
        self.phi = nn.Parameter(torch.ones(self.K)/self.K)  # 混合系数
        self.mu = nn.Parameter(torch.randn(self.K, self.n_latent))  # 均值
        self.sigma = nn.Parameter(torch.eye(self.n_latent).unsqueeze(0).repeat(self.K,1,1))  # 协方差

    def preprocess(self,
                   adata_A_input, # Anndata object of scRNA-seq data
                   adata_B_input, # Anndata object of ST data
                   hvg_num = 4000, # Number of highly variable genes for each anndata
                   save_embedding = False # Save low-dimensional embeddings or not
                   ):

        adata_A_input.obs["batch"] = "scRNA-seq"
        adata_B_input.obs["batch"] = "ST"

        adata_A = adata_A_input.copy()
        adata_B = adata_B_input.copy()

        if self.verbose:
            print("Finding highly variable genes...")
        sc.pp.highly_variable_genes(adata_A, flavor='seurat_v3', n_top_genes=hvg_num)
        sc.pp.highly_variable_genes(adata_B, flavor='seurat_v3', n_top_genes=hvg_num)
        hvg_A = adata_A.var[adata_A.var.highly_variable == True].sort_values(by="highly_variable_rank").index
        hvg_B = adata_B.var[adata_B.var.highly_variable == True].sort_values(by="highly_variable_rank").index
        # hvg_total = hvg_A & hvg_B
        # hvg_total = list(set(hvg_A) & set(hvg_B))
        hvg_total = hvg_A.intersection(hvg_B)
        print('# overlap highly variable genes is: '+str(len(hvg_total)))
        if len(hvg_total) < 100:
            raise ValueError("The total number of highly variable genes is smaller than 100 (%d). Try to set a larger hvg_num." % len(hvg_total))

        if self.verbose:
            print("Normalizing and scaling...")
        sc.pp.normalize_total(adata_A, target_sum=1e4)
        sc.pp.log1p(adata_A)
        adata_A = adata_A[:, hvg_total]
        sc.pp.scale(adata_A, max_value=10)

        sc.pp.normalize_total(adata_B, target_sum=1e4)
        sc.pp.log1p(adata_B)
        # 1. 读取 svg.csv 文件，并提取第一列数据
        #svg_data = pd.read_csv("svg.csv",header=0)  # 假设 csv 文件没有表头

        #adata_B1=adata_B[:, svg_data['genes']]

        #adata_B = adata_B[:, hvg_total]

        sc.pp.scale(adata_B, max_value=204)

        adata_total = adata_A.concatenate(adata_B, index_unique=None)

        if self.verbose:
            print("Dimensionality reduction via PCA...")
        pca = PCA(n_components=self.npcs, svd_solver="arpack", random_state=0)
        adata_total.obsm["X_pca"] = pca.fit_transform(adata_total.X)

        self.emb_A = adata_total.obsm["X_pca"][:adata_A.shape[0], :self.npcs].copy()
        self.emb_B = adata_total.obsm["X_pca"][adata_A.shape[0]:, :self.npcs].copy()
        #self.emb_B = adata_B1.X
        self.coord_B = adata_B.obsm[self.location][:, :self.n_coord].copy()/self.sf_coord
        self.adata_total = adata_total
        self.adata_A_input = adata_A_input
        self.adata_B_input = adata_B_input

        if not os.path.exists(self.data_path):
            os.makedirs(self.data_path)

        if save_embedding:
            np.save(os.path.join(self.data_path, "lowdim_A.npy"), self.emb_A)
            np.save(os.path.join(self.data_path, "lowdim_B.npy"), self.emb_B)

    def train(self,training_idx_rna,training_idx_st,
              num_projections = 500, # The same as sliced_wasserstein_distance function and _sliced_wasserstein_distance function.
              metric = 'correlation', # The same as trans_plan_b function.
              reg = 0.1, # The same as trans_plan_b function.
              numItermax = 10 # The same as trans_plan_b function.
              ):
        begin_time = time.time()
        if self.verbose:
            print("Begining time: ", time.asctime(time.localtime(begin_time)))
        self.E_A = encoder(self.npcs, self.n_latent).to(self.device)
        self.E_B = encoder(self.npcs, self.n_latent).to(self.device)
        self.G_A = generator(self.npcs, self.n_latent).to(self.device)
        self.G_B = generator(self.npcs, self.n_latent).to(self.device)
        self.D_A = discriminator(self.npcs).to(self.device)
        self.D_B = discriminator(self.npcs).to(self.device)
        self.E_s = encoder_site(self.n_latent, self.n_coord).to(self.device)
        self.D_s = decoder_site(self.n_latent, self.n_coord).to(self.device)
        params_G = list(self.E_A.parameters()) + list(self.E_B.parameters()) + list(self.G_A.parameters()) + list(self.G_B.parameters())
        params_S = list(self.E_s.parameters()) + list(self.D_s.parameters())
        optimizer_G = optim.Adam(params_G, lr=0.001, weight_decay=0.)
        optimizer_S = optim.Adam(params_S, lr=0.001, weight_decay=0.)
        params_D = list(self.D_A.parameters()) + list(self.D_B.parameters())
        optimizer_D = optim.Adam(params_D, lr=0.001, weight_decay=0.)
        self.E_A.train()
        self.E_B.train()
        self.G_A.train()
        self.G_B.train()
        self.E_s.train()
        self.D_s.train()
        self.D_A.train()
        self.D_B.train()
        def gmm_log_likelihood(z):
            """计算GMM负对数似然（公式15）"""
            z = z.unsqueeze(1)  # [batch, 1, latent]
            diff = z - self.mu  # [batch, K, latent]
            cov_inv = torch.stack([torch.inverse(s) for s in self.sigma])  # [K, latent, latent]
            exponent = -0.5 * torch.einsum('bki,kij,bkj->bk', diff, cov_inv, diff)
            log_prob = exponent - 0.5 * torch.logdet(self.sigma).unsqueeze(0) + torch.log(self.phi)
            return -torch.logsumexp(log_prob, dim=1).mean()  # 负对数似然
        N_A = len(training_idx_rna)
        N_B = len(training_idx_st)
        if self.ot:
            plan = np.ones(shape=(N_A, N_B))
            plan = plan / (self.batch_size * self.batch_size)
            plan = torch.from_numpy(plan).float().to(self.device)

        for j in range(self.train_epoch):
            index_A = np.random.choice(training_idx_rna, size=self.batch_size)
            index_B = np.random.choice(training_idx_st, size=self.batch_size)
            x_A = torch.from_numpy(self.emb_A[index_A, :]).float().to(self.device)
            x_B = torch.from_numpy(self.emb_B[index_B, :]).float().to(self.device)
            c_B = torch.from_numpy(self.coord_B[index_B, :]).float().to(self.device)#真实的细胞位置
            z_A = self.E_A(x_A)
            z_B = self.E_B(x_B)
            m_B = self.E_s(z_B)
            s_Brecon = self.D_s(m_B)
            x_AtoB = self.G_B(z_A)
            x_BtoA = self.G_A(z_B)
            x_Arecon = self.G_A(z_A)
            x_Brecon = self.G_B(z_B)
            z_AtoB = self.E_B(x_AtoB)
            z_BtoA = self.E_A(x_BtoA)

            if j < int(self.train_epoch*self.cut_steps):
                optimizer_D.zero_grad()
                if j <= 5:
                    # Warm-up
                    loss_D_A = (torch.log(1 + torch.exp(-self.D_A(x_A))) + torch.log(
                        1 + torch.exp(self.D_A(x_BtoA)))).mean()
                    loss_D_B = (torch.log(1 + torch.exp(-self.D_B(x_B))) + torch.log(
                        1 + torch.exp(self.D_B(x_AtoB)))).mean()
                else:
                    loss_D_A = (torch.log(
                        1 + torch.exp(-torch.clamp(self.D_A(x_A), -self.margin, self.margin))) + torch.log(
                        1 + torch.exp(torch.clamp(self.D_A(x_BtoA), -self.margin, self.margin)))).mean()
                    loss_D_B = (torch.log(
                        1 + torch.exp(-torch.clamp(self.D_B(x_B), -self.margin, self.margin))) + torch.log(
                        1 + torch.exp(torch.clamp(self.D_B(x_AtoB), -self.margin, self.margin)))).mean()
                loss_D = loss_D_A + loss_D_B
                loss_D.backward(retain_graph=True)
                optimizer_D.step()

                loss_AE_A = torch.mean((x_Arecon - x_A) ** 2)
                loss_AE_B = torch.mean((x_Brecon - x_B) ** 2)
                loss_AE = loss_AE_A + loss_AE_B

                loss_cos_A = (1 - torch.sum(F.normalize(x_AtoB, p=2) * F.normalize(x_A, p=2), 1)).mean()
                loss_cos_B = (1 - torch.sum(F.normalize(x_BtoA, p=2) * F.normalize(x_B, p=2), 1)).mean()
                loss_cos = loss_cos_A + loss_cos_B

                loss_LA_AtoB = torch.mean((z_A - z_AtoB) ** 2)
                loss_LA_BtoA = torch.mean((z_B - z_BtoA) ** 2)
                loss_LA = loss_LA_AtoB + loss_LA_BtoA

                loss_SWD = sliced_wasserstein_distance(z_A, z_B, num_projections=num_projections, device=self.device)

                if self.ot:
                    plan_tmp = trans_plan_b(z_A, z_B, metric=metric, reg=reg, numItermax=numItermax, device=self.device)

                    coord_list = [[i, j] for i in index_A for j in index_B]
                    coord_list = np.array(coord_list)
                    plan[coord_list[:, 0], coord_list[:, 1]] = plan_tmp.reshape([self.batch_size * self.batch_size, ])

                if j <= 5:
                    # Warm-up
                    loss_GAN = (torch.log(1 + torch.exp(-self.D_A(x_BtoA))) + torch.log(
                        1 + torch.exp(-self.D_B(x_AtoB)))).mean()
                else:
                    loss_GAN = (torch.log(
                        1 + torch.exp(-torch.clamp(self.D_A(x_BtoA), -self.margin, self.margin))) + torch.log(
                        1 + torch.exp(-torch.clamp(self.D_B(x_AtoB), -self.margin, self.margin)))).mean()

                optimizer_G.zero_grad()
                #loss_G =  self.lambdacos * loss_cos + self.lambdaAE * loss_AE + self.lambdaLA * loss_LA + self.lambdaSWD * loss_SWD
                loss_GMM = gmm_log_likelihood(z_A)  # 仅对scRNA数据

                pi = torch.softmax(torch.mm(z_B, self.mu.T), dim=1)  # [batch, K]
            
                # 计算线性组合损失 (公式17第一项)
                spot_emb_comb = torch.mm(pi, self.mu)  # [batch, latent]
                loss_deconv_lin = torch.mean(torch.norm(z_B - spot_emb_comb, dim=1)**2) / (2 * self.n_latent)
                
                # 计算Lasso正则化 (公式17第二项)
                loss_deconv_l1 = torch.mean(torch.norm(pi, p=1, dim=1))     
                loss_deconv = loss_deconv_lin + self.lambda_Lasso * loss_deconv_l1
                loss_G = 0.1* loss_GMM + self.beta *loss_deconv + self.lambdacos * loss_cos + self.lambdaAE * loss_AE + self.lambdaLA * loss_LA + self.lambdaSWD * loss_SWD
                loss_G.backward()
                optimizer_G.step()

                if not j % 500:
                    if self.verbose:
                        print("step %d, total_loss=%.4f, loss_D=%.4f, loss_GAN=%.4f, loss_AE=%.4f, loss_cos=%.4f, loss_LA=%.4f, loss_SWD=%.4f"
                              % (j, loss_G, loss_D, loss_GAN, loss_AE, loss_cos, loss_LA, loss_SWD))

            else:
                optimizer_S.zero_grad()
                loss_lat = loss1(m_B, c_B) + 0.1 * sliced_wasserstein_distance(m_B, c_B, num_projections=num_projections, device=self.device)
                loss_rec = loss2(s_Brecon, z_B) + 0.1 * loss1(s_Brecon, z_B)
                loss_S = self.lambdalat * loss_lat + self.lambdarec * loss_rec
                loss_S.backward()
                optimizer_S.step()

                if not j % 500:
                    if self.verbose:
                        print("step %d, loss_lat=%.4f, loss_rec=%.4f" % (j, loss_lat, loss_rec))


        end_time = time.time()
        if self.verbose:
            print("Ending time: ", time.asctime(time.localtime(end_time)))
        self.train_time = end_time - begin_time
        if self.verbose:
            print("Training takes %.2f seconds" % self.train_time)

        if self.ot:
            self.plan = plan.detach().cpu().numpy()

        if not os.path.exists(self.model_path):
            os.makedirs(self.model_path)

        state = {'D_A': self.D_A.state_dict(), 'D_B': self.D_B.state_dict(),
                 'E_A': self.E_A.state_dict(), 'E_B': self.E_B.state_dict(),
                 'G_A': self.G_A.state_dict(), 'G_B': self.G_B.state_dict(),
                 'E_s': self.E_s.state_dict(), 'D_s': self.D_s.state_dict()}

        torch.save(state, os.path.join(self.model_path, "ckpt.pth"))
        np.save(os.path.join(self.model_path, "pi.npy"), pi.detach().cpu().numpy())
       


    def eval(self,
             D_score = False, # Whether to output discriminated score.
             save_embedding = False, # Save low-dimensional embeddings or not.
             hvg_num = 4000, # Number of highly variable genes for each anndata.
             retain_prop = 1 # The proportion of cells mapped for each sample of ST dataset when data enhancement of extension.
             ):
        self.E_A = encoder(self.npcs, self.n_latent).to(self.device)
        self.E_B = encoder(self.npcs, self.n_latent).to(self.device)
        self.G_A = generator(self.npcs, self.n_latent).to(self.device)
        self.G_B = generator(self.npcs, self.n_latent).to(self.device)
        self.E_s = encoder_site(self.n_latent, self.n_coord).to(self.device)
        self.D_s = decoder_site(self.n_latent, self.n_coord).to(self.device)
        self.E_A.load_state_dict(torch.load(os.path.join(self.model_path, "ckpt.pth"))['E_A'])
        self.E_B.load_state_dict(torch.load(os.path.join(self.model_path, "ckpt.pth"))['E_B'])
        self.G_A.load_state_dict(torch.load(os.path.join(self.model_path, "ckpt.pth"))['G_A'])
        self.G_B.load_state_dict(torch.load(os.path.join(self.model_path, "ckpt.pth"))['G_B'])
        self.E_s.load_state_dict(torch.load(os.path.join(self.model_path, "ckpt.pth"))['E_s'])
        self.D_s.load_state_dict(torch.load(os.path.join(self.model_path, "ckpt.pth"))['D_s'])
        x_A = torch.from_numpy(self.emb_A).float().to(self.device)
        x_B = torch.from_numpy(self.emb_B).float().to(self.device)

        z_A = self.E_A(x_A)
        z_B = self.E_B(x_B)

        m_A = self.E_s(z_A)
        m_B = self.E_s(z_B)

        x_AtoB = self.G_B(z_A)
        x_BtoA = self.G_A(z_B)

        if self.resolution == "high":
            retain_cell = retain_prop * self.plan.T.shape[1]
            if retain_cell < 5:
                raise ValueError(
                    "The retained proportion is smaller than 5 (%d). Try to set a larger retain_prop." % retain_cell)

        self.latent = np.concatenate((z_A.detach().cpu().numpy(), z_B.detach().cpu().numpy()), axis=0)
        self.adata_total.obsm['latent'] = self.latent
        self.data_Aspace = np.concatenate((self.emb_A, x_BtoA.detach().cpu().numpy()), axis=0)
        self.data_Bspace = np.concatenate((x_AtoB.detach().cpu().numpy(), self.emb_B), axis=0)

        if self.resolution == "low":
            self.map = np.concatenate((m_A.detach().cpu().numpy(), m_B.detach().cpu().numpy()), axis=0)*self.sf_coord
            self.map_A = m_A.detach().cpu().numpy()*self.sf_coord
            self.map_B = m_B.detach().cpu().numpy()*self.sf_coord

        self.adata_total.obs["batch"] = self.adata_total.obs["batch"].replace(["0", "1"], ["scRNA-seq", "ST"])

        if not os.path.exists(self.result_path):
            os.makedirs(self.result_path)

        # Add map results, save adata_total
        if self.resolution == "low":
            self.adata_total.obsm['loc'] = self.map
            self.adata_B_input.obsm["loc"] = self.map_B
            self.adata_A_input.obsm["loc"] = self.map_A
        self.adata_total.write(os.path.join(self.result_path, "adata_total.h5ad"))
        if self.resolution == "low":
            dist_with_spot = cdist(self.adata_A_input.obsm["loc"], self.adata_B_input.obsm["spatial"])
            min_dist = np.min(dist_with_spot, axis=1)
            self.adata_A_keep = self.adata_A_input[min_dist <= self.rad_cutoff]
            if 'spatial' in self.adata_B_input.uns:
                self.adata_A_keep.uns["spatial"] = self.adata_B_input.uns["spatial"]
                self.adata_A_keep.obsm["spatial"] = self.adata_A_keep.obsm["loc"]
            self.adata_A_keep.write(os.path.join(self.result_path, "adata_sc_keep.h5ad"))
            if self.verbose:
                print("Localized scRNA-seq dataset has been saved!")

        # save OT plan
        if self.ot:
            self.plan_df = pd.DataFrame(self.plan, index=self.adata_A_input.obs.index,
                                        columns=self.adata_B_input.obs.index)
            self.plan_df.index = self.plan_df.index.values
            self.plan_df.to_csv(os.path.join(self.result_path, "trans_plan.csv"))

        # save latent with batch and celltype
        self.latent_df = pd.DataFrame(self.latent, index=self.adata_total.obs.index)
        self.latent_df.columns = ["latent_" + str(x) for x in range(1, self.n_latent + 1)]
        self.latent_df["batch"] = self.adata_total.obs["batch"]
        #self.latent_df["cell_type"] = self.adata_total.obs["celltype"]
        self.latent_df.to_csv(os.path.join(self.result_path, "latent.csv"))

        # save cell type scores
        if self.ot:
            sc_celltype = self.latent_df[self.latent_df["batch"] == "scRNA-seq"]
            sc_celltype = sc_celltype["cell_type"].astype("str")
            cluster_name = sc_celltype.unique()
            self.cluster_score = np.zeros(shape=(len(cluster_name), self.adata_B_input.shape[0]))
            self.cluster_score = pd.DataFrame(self.cluster_score, index=cluster_name,
                                              columns=self.adata_B_input.obs.index)
            for i in cluster_name:
                self.cluster_score.loc[i, :] = np.mean(self.plan[np.where(sc_celltype == i), :][0], axis=0)
            self.cluster_score = self.cluster_score.T
            self.cluster_score.to_csv(os.path.join(self.result_path, "cluster_score.csv"))

        # save transfer label, maximum probability
        if self.resolution == "high":
            self.trans_label = self.cluster_score.apply(lambda x: get_max_index(x), axis=1)
            self.trans_label = self.trans_label.replace(range(len(self.trans_label.unique())), self.cluster_score.columns)
            self.trans_label = pd.DataFrame(self.trans_label)
            self.trans_label.columns = ["transfer_label"]
            self.trans_label.index = self.adata_B_input.obs.index
            self.trans_label.to_csv(os.path.join(self.result_path, "trans_label.csv"))

        # save transfer data, probability mapping
        if self.resolution == "high":
            retain_cell = retain_prop * self.plan.shape[1]
            plan_filt = self.plan * (np.argsort(np.argsort(self.plan)) >= self.plan.shape[1] - retain_cell)
            self.plan_norm = plan_filt.T / np.sum(plan_filt, 1)
            self.plan_norm = sp.csr_matrix(self.plan_norm)
            self.data_pm = self.plan_norm @ self.adata_A_input.X
            self.adata_ST_pm = sc.AnnData(X=self.data_pm, obs=self.adata_B_input.obs, var=self.adata_A_input.var, obsm=self.adata_B_input.obsm)
            self.adata_ST_pm.write(os.path.join(self.result_path, "adata_ST_pm.h5ad"))
            if self.verbose:
                print("Enhanced ST dataset has been saved!")

        if D_score:
            self.D_A = discriminator(self.npcs).to(self.device)
            self.D_B = discriminator(self.npcs).to(self.device)
            self.D_A.load_state_dict(torch.load(os.path.join(self.model_path, "ckpt.pth"))['D_A'])
            self.D_B.load_state_dict(torch.load(os.path.join(self.model_path, "ckpt.pth"))['D_B'])

            score_D_A_A = self.D_A(x_A)
            score_D_B_A = self.D_B(x_AtoB)
            score_D_B_B = self.D_B(x_B)
            score_D_A_B = self.D_A(x_BtoA)

            self.score_Aspace = np.concatenate((score_D_A_A.detach().cpu().numpy(), score_D_A_B.detach().cpu().numpy()), axis=0)
            self.score_Bspace = np.concatenate((score_D_B_A.detach().cpu().numpy(), score_D_B_B.detach().cpu().numpy()), axis=0)

        if save_embedding:
            np.save(os.path.join(self.result_path, "latent_A.npy"), z_A.detach().cpu().numpy())
            np.save(os.path.join(self.result_path, "latent_B.npy"), z_B.detach().cpu().numpy())
            np.save(os.path.join(self.result_path, "x_AtoB.npy"), x_AtoB.detach().cpu().numpy())
            np.save(os.path.join(self.result_path, "x_BtoA.npy"), x_BtoA.detach().cpu().numpy())
            np.save(os.path.join(self.result_path, "map_A.npy"), m_A.detach().cpu().numpy())
            np.save(os.path.join(self.result_path, "map_B.npy"), m_B.detach().cpu().numpy())
        return m_B
    def eval2(self, D_score=False, save_embedding=False, hvg_num=4000, retain_prop=1, detect_aberrant=False, alpha=0.05):
        self.E_A = encoder(self.npcs, self.n_latent).to(self.device)
        self.E_B = encoder(self.npcs, self.n_latent).to(self.device)
        self.G_A = generator(self.npcs, self.n_latent).to(self.device)
        self.G_B = generator(self.npcs, self.n_latent).to(self.device)
        self.E_s = encoder_site(self.n_latent, self.n_coord).to(self.device)
        self.D_s = decoder_site(self.n_latent, self.n_coord).to(self.device)
        self.E_A.load_state_dict(torch.load(os.path.join(self.model_path, "ckpt.pth"))['E_A'])
        self.E_B.load_state_dict(torch.load(os.path.join(self.model_path, "ckpt.pth"))['E_B'])
        self.G_A.load_state_dict(torch.load(os.path.join(self.model_path, "ckpt.pth"))['G_A'])
        self.G_B.load_state_dict(torch.load(os.path.join(self.model_path, "ckpt.pth"))['G_B'])
        self.E_s.load_state_dict(torch.load(os.path.join(self.model_path, "ckpt.pth"))['E_s'])
        self.D_s.load_state_dict(torch.load(os.path.join(self.model_path, "ckpt.pth"))['D_s'])

        x_A = torch.from_numpy(self.emb_A).float().to(self.device)
        x_B = torch.from_numpy(self.emb_B).float().to(self.device)

        z_A = self.E_A(x_A)
        z_B = self.E_B(x_B)

        m_A = self.E_s(z_A)
        m_B = self.E_s(z_B)

        x_AtoB = self.G_B(z_A)
        x_BtoA = self.G_A(z_B)

        if self.resolution == "high":
            retain_cell = retain_prop * self.plan.T.shape[1]
            if retain_cell < 5:
                raise ValueError(
                    "The retained proportion is smaller than 5 (%d). Try to set a larger retain_prop." % retain_cell)

        self.latent = np.concatenate((z_A.detach().cpu().numpy(), z_B.detach().cpu().numpy()), axis=0)
        self.adata_total.obsm['latent'] = self.latent
        self.data_Aspace = np.concatenate((self.emb_A, x_BtoA.detach().cpu().numpy()), axis=0)
        self.data_Bspace = np.concatenate((x_AtoB.detach().cpu().numpy(), self.emb_B), axis=0)

        if self.resolution == "low":
            self.map = np.concatenate((m_A.detach().cpu().numpy(), m_B.detach().cpu().numpy()), axis=0)*self.sf_coord
            self.map_A = m_A.detach().cpu().numpy()*self.sf_coord
            self.map_B = m_B.detach().cpu().numpy()*self.sf_coord

        self.adata_total.obs["batch"] = self.adata_total.obs["batch"].replace(["0", "1"], ["scRNA-seq", "ST"])

        if not os.path.exists(self.result_path):
            os.makedirs(self.result_path)

        # Add map results, save adata_total
        if self.resolution == "low":
            self.adata_total.obsm['loc'] = self.map
            self.adata_B_input.obsm["loc"] = self.map_B
            self.adata_A_input.obsm["loc"] = self.map_A
        self.adata_total.write(os.path.join(self.result_path, "adata_total.h5ad"))

        # Normalization, save adata_sc and adata_ST
        sc.pp.highly_variable_genes(self.adata_A_input, flavor="seurat_v3", n_top_genes=hvg_num)
        sc.pp.normalize_total(self.adata_A_input, target_sum=1e4)
        sc.pp.log1p(self.adata_A_input)
        self.adata_A_input.write(os.path.join(self.result_path, "adata_sc.h5ad"))

        sc.pp.highly_variable_genes(self.adata_B_input, flavor="seurat_v3", n_top_genes=hvg_num)
        sc.pp.normalize_total(self.adata_B_input, target_sum=1e4)
        sc.pp.log1p(self.adata_B_input)
        self.adata_B_input.write(os.path.join(self.result_path, "adata_ST.h5ad"))

        # keep cells inside section, save adata_sc_keep
        if self.resolution == "low":
            dist_with_spot = cdist(self.adata_A_input.obsm["loc"], self.adata_B_input.obsm["spatial"])
            min_dist = np.min(dist_with_spot, axis=1)
            self.adata_A_keep = self.adata_A_input[min_dist <= self.rad_cutoff]
            if 'spatial' in self.adata_B_input.uns:
                self.adata_A_keep.uns["spatial"] = self.adata_B_input.uns["spatial"]
                self.adata_A_keep.obsm["spatial"] = self.adata_A_keep.obsm["loc"]
            self.adata_A_keep.write(os.path.join(self.result_path, "adata_sc_keep.h5ad"))
            if self.verbose:
                print("Localized scRNA-seq dataset has been saved!")

        # save OT plan
        if self.ot:
            self.plan_df = pd.DataFrame(self.plan, index=self.adata_A_input.obs.index,
                                        columns=self.adata_B_input.obs.index)
            self.plan_df.index = self.plan_df.index.values
            self.plan_df.to_csv(os.path.join(self.result_path, "trans_plan.csv"))

        # save latent with batch and celltype
        self.latent_df = pd.DataFrame(self.latent, index=self.adata_total.obs.index)
        self.latent_df.columns = ["latent_" + str(x) for x in range(1, self.n_latent + 1)]
        self.latent_df["batch"] = self.adata_total.obs["batch"]
        self.latent_df["cell_type"] = self.adata_total.obs["celltype"]
        self.latent_df.to_csv(os.path.join(self.result_path, "latent.csv"))

        # save cell type scores
        if self.ot:
            sc_celltype = self.latent_df[self.latent_df["batch"] == "scRNA-seq"]
            sc_celltype = sc_celltype["cell_type"].astype("str")
            cluster_name = sc_celltype.unique()
            self.cluster_score = np.zeros(shape=(len(cluster_name), self.adata_B_input.shape[0]))
            self.cluster_score = pd.DataFrame(self.cluster_score, index=cluster_name,
                                              columns=self.adata_B_input.obs.index)
            for i in cluster_name:
                self.cluster_score.loc[i, :] = np.mean(self.plan[np.where(sc_celltype == i), :][0], axis=0)
            self.cluster_score = self.cluster_score.T
            self.cluster_score.to_csv(os.path.join(self.result_path, "cluster_score.csv"))

        # save transfer label, maximum probability
        if self.resolution == "high":
            self.trans_label = self.cluster_score.apply(lambda x: get_max_index(x), axis=1)
            self.trans_label = self.trans_label.replace(range(len(self.trans_label.unique())), self.cluster_score.columns)
            self.trans_label = pd.DataFrame(self.trans_label)
            self.trans_label.columns = ["transfer_label"]
            self.trans_label.index = self.adata_B_input.obs.index
            self.trans_label.to_csv(os.path.join(self.result_path, "trans_label.csv"))

        # save transfer data, probability mapping
        if self.resolution == "high":
            retain_cell = retain_prop * self.plan.shape[1]
            plan_filt = self.plan * (np.argsort(np.argsort(self.plan)) >= self.plan.shape[1] - retain_cell)
            self.plan_norm = plan_filt.T / np.sum(plan_filt, 1)
            self.plan_norm = sp.csr_matrix(self.plan_norm)
            self.data_pm = self.plan_norm @ self.adata_A_input.X
            self.adata_ST_pm = sc.AnnData(X=self.data_pm, obs=self.adata_B_input.obs, var=self.adata_A_input.var, obsm=self.adata_B_input.obsm)
            self.adata_ST_pm.write(os.path.join(self.result_path, "adata_ST_pm.h5ad"))
            if self.verbose:
                print("Enhanced ST dataset has been saved!")

        if D_score:
            self.D_A = discriminator(self.npcs).to(self.device)
            self.D_B = discriminator(self.npcs).to(self.device)
            self.D_A.load_state_dict(torch.load(os.path.join(self.model_path, "ckpt.pth"))['D_A'])
            self.D_B.load_state_dict(torch.load(os.path.join(self.model_path, "ckpt.pth"))['D_B'])

            score_D_A_A = self.D_A(x_A)
            score_D_B_A = self.D_B(x_AtoB)
            score_D_B_B = self.D_B(x_B)
            score_D_A_B = self.D_A(x_BtoA)

            self.score_Aspace = np.concatenate((score_D_A_A.detach().cpu().numpy(), score_D_A_B.detach().cpu().numpy()), axis=0)
            self.score_Bspace = np.concatenate((score_D_B_A.detach().cpu().numpy(), score_D_B_B.detach().cpu().numpy()), axis=0)

        if save_embedding:
            np.save(os.path.join(self.result_path, "latent_A.npy"), z_A.detach().cpu().numpy())
            np.save(os.path.join(self.result_path, "latent_B.npy"), z_B.detach().cpu().numpy())
            np.save(os.path.join(self.result_path, "x_AtoB.npy"), x_AtoB.detach().cpu().numpy())
            np.save(os.path.join(self.result_path, "x_BtoA.npy"), x_BtoA.detach().cpu().numpy())
            np.save(os.path.join(self.result_path, "map_A.npy"), m_A.detach().cpu().numpy())
            np.save(os.path.join(self.result_path, "map_B.npy"), m_B.detach().cpu().numpy())
            
            
        if detect_aberrant:
            # 执行Conformal Prediction异常检测
            aberrant, confidence,lambda_calib,pred_coords,true_coords = self.conformal_prediction(alpha=alpha)
            
            # 将结果保存到adata对象中
            self.adata_B_input.obs['aberrant'] = aberrant
            self.adata_B_input.obs['confidence_radius'] = confidence
            self.adata_B_input.obs['pred_x'] = m_B.detach().cpu().numpy()[:, 0] * self.sf_coord
            self.adata_B_input.obs['pred_y'] = m_B.detach().cpu().numpy()[:, 1] * self.sf_coord
            
            # 保存结果
            #if not os.path.exists(self.result_path):
                #os.makedirs(self.result_path)
                
            #aberrant_df = pd.DataFrame({
                #'spot_id': self.adata_B_input.obs.index,
                #'aberrant': aberrant,
                #'confidence_radius': confidence,
                #'true_x': self.adata_B_input.obsm['spatial'][:, 0],
                #'true_y': self.adata_B_input.obsm['spatial'][:, 1],
                #'pred_x': self.adata_B_input.obs['pred_x'],
                #'pred_y': self.adata_B_input.obs['pred_y']
            #})
            #aberrant_df.to_csv(os.path.join(self.result_path, "aberrant_spots.csv"), index=False)
            return aberrant, confidence,lambda_calib,pred_coords,true_coords
            #if self.verbose:
                #print(f"Aberrant detection results saved to {os.path.join(self.result_path, 'aberrant_spots.csv')}")
    def conformal_prediction(self, alpha=0.05, k_neighbors=10):
        """
        执行Conformal Prediction检测空间异常细胞
        参数:
            alpha: 显著性水平 (默认0.05对应95%置信区间)
            k_neighbors: 计算λ时的最近邻数量
        返回:
            aberrant: 布尔数组，标记哪些spot是异常的
            confidence_intervals: 每个spot的置信区间半径
        """
        # 获取ST数据的预测坐标和真实坐标
        x_B = torch.from_numpy(self.emb_B).float().to(self.device)
        z_B = self.E_B(x_B)
        print(z_B.shape)
        m_B = self.E_s(z_B)
        pred_coords = m_B.detach().cpu().numpy() * self.sf_coord
        true_coords = self.coord_B * self.sf_coord
        
        # 转换为tensor
        pred_coords = torch.from_numpy(pred_coords).float()
        true_coords = torch.from_numpy(true_coords).float()
        
        # 随机划分测试集和校准集 (50-50)
        n = len(true_coords)
        idx = torch.randperm(n)
        test_idx, calib_idx = idx[:n//2], idx[n//2:]
    
        # 计算测试集的非一致性分数
        dist_matrix = torch.cdist(z_B[test_idx,:], z_B[test_idx,:])
        lambda_test = dist_matrix.topk(k_neighbors+1, largest=False).values[:, 1:].mean(dim=1)# 因为有自己
        print(lambda_test.shape)
        v_test = torch.norm(true_coords[test_idx] - pred_coords[test_idx], dim=1) / lambda_test
        
        # 计算分位数
        q = np.quantile(v_test.detach().numpy(), 1 - alpha)
        
        # 为校准集计算λ和置信区间
        dist_calib_to_test = torch.cdist(z_B[calib_idx,:], z_B[test_idx,:])
        lambda_calib = dist_calib_to_test.topk(k_neighbors, largest=False).values.mean(dim=1)
        print(dist_calib_to_test.topk(k_neighbors, largest=False).values)
        print(lambda_calib.shape)
        confidence_intervals = q * lambda_calib
        
        # 检测异常点
        pred_errors = torch.norm(true_coords[calib_idx,:] - pred_coords[calib_idx], dim=1)
        aberrant = pred_errors > confidence_intervals
        
        # 交换测试集和校准集再计算一次 (论文中的完整流程)
        dist_matrix = torch.cdist(true_coords[calib_idx], true_coords[calib_idx])
        lambda_calib = dist_matrix.topk(k_neighbors+1, largest=False).values[:, 1:].mean(dim=1)
        
        v_calib = torch.norm(true_coords[calib_idx] - pred_coords[calib_idx], dim=1) / lambda_calib
        q = np.quantile(v_calib.numpy(), 1 - alpha)
        
        dist_test_to_calib = torch.cdist(true_coords[test_idx], true_coords[calib_idx])
        lambda_test = dist_test_to_calib.topk(k_neighbors, largest=False).values.mean(dim=1)
        confidence_intervals_test = q * lambda_test
        
        pred_errors_test = torch.norm(true_coords[test_idx] - pred_coords[test_idx], dim=1)
        aberrant_test = pred_errors_test > confidence_intervals_test
        
        # 合并两次结果
        final_aberrant = torch.zeros(n, dtype=torch.bool)
        final_aberrant[calib_idx] = aberrant
        final_aberrant[test_idx] = aberrant_test
        final_lambda_calib=torch.zeros(n)
        final_confidence = torch.zeros(n)
        final_lambda_calib[calib_idx]=lambda_calib
        final_lambda_calib[test_idx]=lambda_test
        final_confidence[calib_idx] = confidence_intervals
        final_confidence[test_idx] = confidence_intervals_test 
        
        if self.verbose:
            print(f"Detected {final_aberrant.sum().item()} aberrant spots (alpha={alpha})")       
        return final_aberrant.numpy(), final_confidence.detach().numpy(),final_lambda_calib,pred_coords,true_coords

In [16]:
import warnings
warnings.filterwarnings("ignore")
import os
import numpy as np
import pandas as pd
import scanpy as sc
#import STALocator
import scipy.sparse as sp
import torch
from datetime import datetime
# 创建标准化目录结构
experiment_path = "SpatialVG"
model_path = os.path.join(experiment_path, "models")
data_path = os.path.join(experiment_path, "data")
result_path = os.path.join(experiment_path, "results")
os.makedirs(model_path, exist_ok=True)
os.makedirs(data_path, exist_ok=True)
os.makedirs(result_path, exist_ok=True)
# 读取数据（保持原始代码不变）

adata_ST=sc.read('/home/qyyuan/project/ST_GRN/data/SCC/adata_SCC_ST.h5ad')
adata_sc=sc.read('/home/qyyuan/project/ST_GRN/data/SCC/adata_SCC_sc.h5ad')

In [17]:
adata_ST_Ori=sc.read('/home/qyyuan/project/ST_GRN/data/SCC/adata_SCC_ST.h5ad')
adata_sc_Ori=sc.read('/home/qyyuan/project/ST_GRN/data/SCC/adata_SCC_sc.h5ad')

In [18]:
adata_ST_Ori

AnnData object with n_obs × n_vars = 666 × 14809
    obs: 'x', 'y', 'new_x', 'new_y', 'pixel_x', 'pixel_y'
    var: 'n_cells'
    obsm: 'spatial'

In [19]:
adata_sc_Ori

AnnData object with n_obs × n_vars = 26299 × 23458
    obs: 'nCount_RNA', 'nFeature_RNA', 'patient', 'tum.norm', 'level1_celltype', 'level2_celltype', 'level3_celltype', 'celltype'
    var: 'n_cells'

In [33]:

adata_ST=sc.read('/home/qyyuan/project/ST_GRN/CeLEry/tutorial/data/MousePosteriorToy.h5ad')
adata_sc=sc.read('/home/qyyuan/project/ST_GRN/CeLEry/tutorial/data/MouseSCToy.h5ad')

In [21]:
adata_ST

AnnData object with n_obs × n_vars = 5824 × 356
    obs: 'x', 'y', 'inner'
    var: 'genename'

In [22]:
adata_ST
adata_sc

AnnData object with n_obs × n_vars = 3000 × 356
    obs: 'exp_component_name', 'platform_label', 'cluster_color', 'cluster_order', 'cluster_label', 'class_color', 'class_order', 'class_label', 'subclass_color', 'subclass_order', 'subclass_label', 'full_genotype_color', 'full_genotype_id', 'full_genotype_label', 'sex_color', 'sex_id', 'donor_sex_label', 'region_color', 'region_id', 'region_label', 'cell_type_accession_color', 'cell_type_accession_id', 'cell_type_accession_label', 'cell_type_alias_color', 'cell_type_alias_id', 'cell_type_alias_label', 'cell_type_alt_alias_color', 'cell_type_alt_alias_id', 'cell_type_alt_alias_label', 'cell_type_designation_color', 'cell_type_designation_id', 'cell_type_designation_label', 'neighborhood_label', 'neighborhood_id', 'neighborhood_color', 'external_donor_name_color', 'external_donor_name_id', 'external_donor_name_label', 'facs_population_plan_color', 'facs_population_plan_id', 'facs_population_plan_label', 'injection_materials_color', 'inject

In [34]:
adata_ST.obsm['spatial']= np.column_stack((adata_ST.obs['x'].values, adata_ST.obs['y'].values))

In [35]:
model_path = os.path.join(experiment_path, "models")
data_path = os.path.join(experiment_path, "data")
result_path = os.path.join(experiment_path, "results")
os.makedirs(model_path, exist_ok=True)
os.makedirs(data_path, exist_ok=True)
os.makedirs(result_path, exist_ok=True)
model = Model3(
    resolution="low",
    batch_size=200,
    train_epoch=1000,
    cut_steps=0.5,
    sf_coord=50,
    rad_cutoff=1.2,
    seed=1234,
    lambdacos=10,
    lambdaSWD=5,
    lambdalat=10,
    lambdarec=0.1,
    model_path=model_path,
    data_path=data_path,
    result_path=result_path,
    ot=False,
    device="cpu"
)

model.preprocess(adata_sc, adata_ST)

Finding highly variable genes...
# overlap highly variable genes is: 356
Normalizing and scaling...
Dimensionality reduction via PCA...


In [36]:
Y=adata_ST.obs[['x','y']].values
Y = Y.astype(np.float32)
N_st=adata_ST.X.shape[0]
N_rna=adata_sc.X.shape[0]
Y=adata_ST.obs[['x','y']].values
Y = Y.astype(np.float32)
N_st=adata_ST.X.shape[0]
N_rna=adata_sc.X.shape[0]
training_idx_rna=np.array(range(N_rna))
training_idx_st=np.array(range(N_st))
model.train(training_idx_rna,training_idx_st)


Begining time:  Sat Aug 23 20:57:10 2025
step 0, total_loss=351.1396, loss_D=6.5583, loss_GAN=4.3003, loss_AE=14.3325, loss_cos=2.0493, loss_LA=15.7307, loss_SWD=4.8383
step 500, loss_lat=3.1307, loss_rec=1.7792
Ending time:  Sat Aug 23 21:02:54 2025
Training takes 343.06 seconds


In [37]:
y=model.eval()
final_aberrant, final_confidence,lambda_calib,pred_coords,true_coords=model.eval2(D_score=True, save_embedding=True, detect_aberrant=True, alpha=0.05)

Localized scRNA-seq dataset has been saved!


KeyError: 'celltype'

### predict the location use linear model

### imputate data

In [1]:
import pandas as pd
import scanpy as sc



In [2]:
import numpy as np
import pandas as pd
import random
import scanpy as sc
#from LingerGRN.immupute_dis import immupute_dis
def pseudo_bulk(adata_RNA):
    K = 20
    #sc.tl.pca(adata_RNA, svd_solver="arpack")
    sc.pp.normalize_total(adata_RNA, target_sum=1e4)
    sc.pp.log1p(adata_RNA)
    sc.pp.filter_genes(adata_RNA, min_cells=3) 
    sc.pp.highly_variable_genes(adata_RNA, min_mean=0.0125, max_mean=3, min_disp=0.5)
    adata_RNA.raw = adata_RNA
    adata_RNA = adata_RNA[:, adata_RNA.var.highly_variable]
    sc.pp.scale(adata_RNA, max_value=10)
    sc.tl.pca(adata_RNA, n_comps=15,svd_solver="arpack")
    sc.pp.neighbors(adata_RNA, n_neighbors=K, n_pcs=30,use_rep='X_pca')
    connectivities=(adata_RNA.obsp['distances']>0)
    connectivities=pd.DataFrame(connectivities.toarray(),index=adata_RNA.obs['barcode'].tolist())
    connectivities=connectivities.values
    A=(connectivities @ adata_RNA.raw.X.toarray())
    TG_filter1=A/(K-1)
    TG_filter1=pd.DataFrame(TG_filter1.T,columns=adata_RNA.obs['barcode'].values,index=adata_RNA.raw.var['gene_ids'].tolist())
    return TG_filter1

In [3]:
import pandas as pd
adata_ST=sc.read_h5ad('/home/qyyuan/project/ST_GRN/data/SCC/adata_SCC_ST.h5ad')



In [4]:
from scipy.sparse import csc_matrix
import anndata
matrix = csc_matrix(adata_ST.X)
matrix.data=matrix.data.astype(np.float32)
adata_RNA=anndata.AnnData(X= csc_matrix(matrix))

In [5]:

adata_RNA.var['gene_ids']=adata_ST.var.index
adata_RNA.obs['barcode']=adata_ST.obs.index
TG_filter1 = pseudo_bulk(adata_RNA)

  view_to_actual(adata)


### svg raw data to spaGCN

In [6]:
import umap.umap_ as umap
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
svg_data = pd.read_csv("svg.csv",header=0)  # 假设 csv 文件没有表头


In [None]:
TG_filter2=TG_filter1.loc[svg_data['genes'],:]
from sklearn.preprocessing import StandardScaler
import pandas as pd

# 初始化标准化器
scaler = StandardScaler()

# 拟合（计算每列的μ和σ）并转换训练数据
df_zscore = pd.DataFrame(scaler.fit_transform(TG_filter2.T), columns=TG_filter2.index)
X=df_zscore.values

In [None]:

#X = scaler.fit_transform(z_B)
Y=(adata_ST.obsm['spatial']-adata_ST.obsm['spatial'].min(axis=0))/(adata_ST.obsm['spatial'].max(axis=0)-adata_ST.obsm['spatial'].min(axis=0))
from sklearn.model_selection import train_test_split
X_train, X_test, Y_train, Y_test = train_test_split(
    X, Y, 
    test_size=0.333,  # 1/3 for testing, 2/3 for training
    random_state=42   # For reproducibility
)

In [12]:
print(X.shape,Y.shape)

(666, 410) (666, 2)


In [16]:
from sklearn.linear_model import Lasso
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
# 训练模型（alpha 是正则化强度，越大惩罚越强）
model = Lasso(alpha=0.01, random_state=42)  # 可调整 alpha
model.fit(X_train, Y_train)
# 预测和评估（代码和 LinearRegression 相同）
Y_train_pred = model.predict(X_train)
Y_test_pred = model.predict(X_test)
# 输出结果
print("Lasso Regression:")
print(f"  Train MAE: {mean_absolute_error(Y_train, Y_train_pred):.4f}, R²: {r2_score(Y_train, Y_train_pred):.4f}")
print(f"  Test MAE: {mean_absolute_error(Y_test, Y_test_pred):.4f}, R²: {r2_score(Y_test, Y_test_pred):.4f}")
print(f"  Train MSE: {mean_squared_error(Y_train, Y_train_pred):.4f}, R²: {r2_score(Y_train, Y_train_pred):.4f}")
print(f"  Test MSE: {mean_squared_error(Y_test, Y_test_pred):.4f}, R²: {r2_score(Y_test, Y_test_pred):.4f}")

Lasso Regression:
  Train MAE: 0.1256, R²: 0.5557
  Test MAE: 0.1343, R²: 0.5054
  Train MSE: 0.0256, R²: 0.5557
  Test MSE: 0.0300, R²: 0.5054


In [17]:
!pip install xgboost



In [17]:
import numpy as np
from xgboost import XGBRegressor
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import mean_absolute_error

# 1. 定义参数分布（包含XGBoost特有参数）
param_dist = {
    'n_estimators': np.arange(800, 2500, 100),
    'max_depth': [3, 4, 5,6],
    'learning_rate': [0.005, 0.02, 0.05],
    'subsample': [ 0.7,0.8, 0.9, 1],
    'colsample_bytree': [0.6,0.65,0.7, 0.75, 0.8,0.85, 0.9],
    'reg_alpha': [ 0.1],
    'reg_lambda': [0],
    'min_child_weight': [1, 3, 5],
    'gamma': [0, 0.1],
    'eval_metric': ['mae'],  # 作为模型参数
    'early_stopping_rounds': [20]  # 作为模型参数
}

# 2. 初始化基础模型
base_model = XGBRegressor(
    random_state=42,
    tree_method='hist'
)

# 3. 初始化RandomizedSearchCV
random_search = RandomizedSearchCV(
    estimator=base_model,
    param_distributions=param_dist,
    n_iter=50,
    scoring='neg_mean_absolute_error',
    cv=5,
    verbose=2,
    n_jobs=-1,
    random_state=42,
    refit=True  # 自动用最佳参数重新训练
)

# 4. 执行搜索（只传递eval_set给fit）
random_search.fit(
    X_train, Y_train,
    eval_set=[(X_test, Y_test)],  # 这是唯一需要传递给fit的XGBoost特有参数
    verbose=False
)

# 5. 获取最佳模型
best_model = random_search.best_estimator_
print("\nBest Parameters:", random_search.best_params_)
print("Best MAE:", -random_search.best_score_)

# 6. 最终评估
Y_pred = best_model.predict(X_test)
print("Final Test MAE:", mean_absolute_error(Y_test, Y_pred))

Fitting 5 folds for each of 50 candidates, totalling 250 fits
[CV] END colsample_bytree=0.75, early_stopping_rounds=20, eval_metric=mae, gamma=0, learning_rate=0.02, max_depth=4, min_child_weight=3, n_estimators=1200, reg_alpha=0.1, reg_lambda=0, subsample=1; total time=   8.9s
[CV] END colsample_bytree=0.75, early_stopping_rounds=20, eval_metric=mae, gamma=0, learning_rate=0.02, max_depth=4, min_child_weight=3, n_estimators=1200, reg_alpha=0.1, reg_lambda=0, subsample=1; total time=  10.3s
[CV] END colsample_bytree=0.7, early_stopping_rounds=20, eval_metric=mae, gamma=0, learning_rate=0.02, max_depth=6, min_child_weight=5, n_estimators=1700, reg_alpha=0.1, reg_lambda=0, subsample=0.9; total time=  13.8s
[CV] END colsample_bytree=0.6, early_stopping_rounds=20, eval_metric=mae, gamma=0.1, learning_rate=0.02, max_depth=4, min_child_weight=3, n_estimators=1300, reg_alpha=0.1, reg_lambda=0, subsample=0.7; total time=   3.9s
[CV] END colsample_bytree=0.7, early_stopping_rounds=20, eval_metr

71 fits failed out of a total of 250.
The score on these train-test partitions for these parameters will be set to nan.
If these failures are not expected, you can try to debug them by setting error_score='raise'.

Below are more details about the failures:
--------------------------------------------------------------------------------
1 fits failed with the following error:
Traceback (most recent call last):
  File "/home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/sklearn/model_selection/_validation.py", line 729, in _fit_and_score
    estimator.fit(X_train, y_train, **fit_params)
  File "/home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/core.py", line 726, in inner_f
    return func(**kwargs)
  File "/home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/sklearn.py", line 1143, in fit
    train_dmatrix, evals = _wrap_evaluation_matrices(
  File "/home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/sklearn.py", 

XGBoostError: [23:22:41] /workspace/src/data/array_interface.cu:44: Check failed: err == cudaGetLastError() (0 vs. 2) : 
Stack trace:
  [bt] (0) /home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/lib/libxgboost.so(+0x25c1ac) [0x7e800ca5c1ac]
  [bt] (1) /home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/lib/libxgboost.so(+0xa3f4bc) [0x7e800d23f4bc]
  [bt] (2) /home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/lib/libxgboost.so(+0x4e3a2e) [0x7e800cce3a2e]
  [bt] (3) /home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/lib/libxgboost.so(XGDMatrixSetInfoFromInterface+0xb2) [0x7e800c963522]
  [bt] (4) /home/qyyuan/.conda/envs/STALocator/lib/python3.8/lib-dynload/../../libffi.so.8(+0xa052) [0x7e8516170052]
  [bt] (5) /home/qyyuan/.conda/envs/STALocator/lib/python3.8/lib-dynload/../../libffi.so.8(+0x8925) [0x7e851616e925]
  [bt] (6) /home/qyyuan/.conda/envs/STALocator/lib/python3.8/lib-dynload/../../libffi.so.8(ffi_call+0xde) [0x7e851616f06e]
  [bt] (7) /home/qyyuan/.conda/envs/STALocator/lib/python3.8/lib-dynload/_ctypes.cpython-38-x86_64-linux-gnu.so(_ctypes_callproc+0x31f) [0x7e8516cb14af]
  [bt] (8) /home/qyyuan/.conda/envs/STALocator/lib/python3.8/lib-dynload/_ctypes.cpython-38-x86_64-linux-gnu.so(+0x9013) [0x7e8516ca7013]



### 126 genes

In [18]:
svg_list=pd.read_csv('/home/zzhang/SPADE/SpaGCN/tutorial/svg-enhanced-scc-new-0830.csv',header=0,sep=',',index_col=0)
svg_list=svg_list.index
svg_list=list(set(svg_list)&set(adata_ST.var.index))
TG_filter2=TG_filter1.loc[svg_list,:]
from sklearn.preprocessing import StandardScaler
import pandas as pd

# 初始化标准化器
scaler = StandardScaler()

# 拟合（计算每列的μ和σ）并转换训练数据
df_zscore = pd.DataFrame(scaler.fit_transform(TG_filter2.T), columns=TG_filter2.index)
X=df_zscore.values
#X = scaler.fit_transform(z_B)
adata_ST.obsm['spatial']= np.column_stack((adata_ST.obs['x'].values, adata_ST.obs['y'].values))
Y=(adata_ST.obsm['spatial']-(adata_ST.obsm['spatial'].min()))/((adata_ST.obsm['spatial'].max())-(adata_ST.obsm['spatial'].min()))
from sklearn.model_selection import train_test_split
X_train, X_test, Y_train, Y_test = train_test_split(
    X, Y, 
    test_size=0.333,  # 1/3 for testing, 2/3 for training
    random_state=42   # For reproducibility
)

In [21]:
from sklearn.linear_model import Lasso
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
# 训练模型（alpha 是正则化强度，越大惩罚越强）
model = Lasso(alpha=0.005, random_state=42)  # 可调整 alpha
model.fit(X_train, Y_train)
# 预测和评估（代码和 LinearRegression 相同）
Y_train_pred = model.predict(X_train)
Y_test_pred = model.predict(X_test)
# 输出结果
print("Lasso Regression:")
print(f"  Train MAE: {mean_absolute_error(Y_train, Y_train_pred):.4f}, R²: {r2_score(Y_train, Y_train_pred):.4f}")
print(f"  Test MAE: {mean_absolute_error(Y_test, Y_test_pred):.4f}, R²: {r2_score(Y_test, Y_test_pred):.4f}")
print(f"  Train MSE: {mean_squared_error(Y_train, Y_train_pred):.4f}, R²: {r2_score(Y_train, Y_train_pred):.4f}")
print(f"  Test MSE: {mean_squared_error(Y_test, Y_test_pred):.4f}, R²: {r2_score(Y_test, Y_test_pred):.4f}")

Lasso Regression:
  Train MAE: 0.1112, R²: 0.5556
  Test MAE: 0.1191, R²: 0.4931
  Train MSE: 0.0200, R²: 0.5556
  Test MSE: 0.0239, R²: 0.4931


In [23]:
import numpy as np
from xgboost import XGBRegressor
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import mean_absolute_error

# 1. 定义参数分布（包含XGBoost特有参数）
param_dist = {
    'n_estimators': np.arange(800, 2500, 100),
    'max_depth': [3, 4, 5,6],
    'learning_rate': [0.005, 0.02, 0.05],
    'subsample': [ 0.7,0.8, 0.9, 1],
    'colsample_bytree': [0.6,0.65,0.7, 0.75, 0.8,0.85, 0.9],
    'reg_alpha': [0, 0.1, 0.5],
    'reg_lambda': [0, 0.1, 0.5],
    'min_child_weight': [1, 3, 5],
    'gamma': [0, 0.1, 0.2],
    'eval_metric': ['mae'],  # 作为模型参数
    'early_stopping_rounds': [20]  # 作为模型参数
}

# 2. 初始化基础模型
base_model = XGBRegressor(
    random_state=42,
    tree_method='hist'
)

# 3. 初始化RandomizedSearchCV
random_search = RandomizedSearchCV(
    estimator=base_model,
    param_distributions=param_dist,
    n_iter=50,
    scoring='neg_mean_absolute_error',
    cv=5,
    verbose=2,
    n_jobs=-1,
    random_state=42,
    refit=True  # 自动用最佳参数重新训练
)

# 4. 执行搜索（只传递eval_set给fit）
random_search.fit(
    X_train, Y_train,
    eval_set=[(X_test, Y_test)],  # 这是唯一需要传递给fit的XGBoost特有参数
    verbose=False
)

# 5. 获取最佳模型
best_model = random_search.best_estimator_
print("\nBest Parameters:", random_search.best_params_)
print("Best MAE:", -random_search.best_score_)

# 6. 最终评估
Y_pred = best_model.predict(X_test)
print("Final Test MAE:", mean_absolute_error(Y_test, Y_pred))

Fitting 5 folds for each of 50 candidates, totalling 250 fits
[CV] END colsample_bytree=0.65, early_stopping_rounds=20, eval_metric=mae, gamma=0.2, learning_rate=0.02, max_depth=5, min_child_weight=3, n_estimators=1200, reg_alpha=0.5, reg_lambda=0, subsample=0.9; total time=   0.2s
[CV] END colsample_bytree=0.75, early_stopping_rounds=20, eval_metric=mae, gamma=0.2, learning_rate=0.05, max_depth=4, min_child_weight=1, n_estimators=1600, reg_alpha=0.1, reg_lambda=0, subsample=0.9; total time=   0.0s
[CV] END colsample_bytree=0.7, early_stopping_rounds=20, eval_metric=mae, gamma=0, learning_rate=0.005, max_depth=5, min_child_weight=5, n_estimators=1400, reg_alpha=0.5, reg_lambda=0.5, subsample=0.8; total time=   0.0s
[CV] END colsample_bytree=0.65, early_stopping_rounds=20, eval_metric=mae, gamma=0, learning_rate=0.05, max_depth=6, min_child_weight=3, n_estimators=2400, reg_alpha=0.1, reg_lambda=0.1, subsample=0.9; total time=   5.7s
[CV] END colsample_bytree=0.75, early_stopping_rounds=

125 fits failed out of a total of 250.
The score on these train-test partitions for these parameters will be set to nan.
If these failures are not expected, you can try to debug them by setting error_score='raise'.

Below are more details about the failures:
--------------------------------------------------------------------------------
1 fits failed with the following error:
Traceback (most recent call last):
  File "/home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/sklearn/model_selection/_validation.py", line 729, in _fit_and_score
    estimator.fit(X_train, y_train, **fit_params)
  File "/home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/core.py", line 726, in inner_f
    return func(**kwargs)
  File "/home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/sklearn.py", line 1143, in fit
    train_dmatrix, evals = _wrap_evaluation_matrices(
  File "/home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/sklearn.py",


Best Parameters: {'subsample': 0.9, 'reg_lambda': 0.1, 'reg_alpha': 0.1, 'n_estimators': 1300, 'min_child_weight': 1, 'max_depth': 5, 'learning_rate': 0.05, 'gamma': 0, 'eval_metric': 'mae', 'early_stopping_rounds': 20, 'colsample_bytree': 0.65}
Best MAE: 0.11588412002019062
Final Test MAE: 0.11786239706685295


### 60+ genes

In [24]:
svg_list=pd.read_csv('/home/zzhang/SPADE/SpaGCN/tutorial/svg-enhanced-scc-new.csv',header=0,sep=',',index_col=0)
svg_list=svg_list.index
svg_list=list(set(svg_list)&set(adata_ST.var.index))
TG_filter2=TG_filter1.loc[svg_list,:]
X=TG_filter2.values.T
#X = scaler.fit_transform(z_B)
adata_ST.obsm['spatial']= np.column_stack((adata_ST.obs['x'].values, adata_ST.obs['y'].values))
Y=(adata_ST.obsm['spatial']-(adata_ST.obsm['spatial'].min()))/((adata_ST.obsm['spatial'].max())-(adata_ST.obsm['spatial'].min()))
from sklearn.model_selection import train_test_split
X_train, X_test, Y_train, Y_test = train_test_split(
    X, Y, 
    test_size=0.333,  # 1/3 for testing, 2/3 for training
    random_state=42   # For reproducibility
)

In [25]:
import numpy as np
from xgboost import XGBRegressor
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import mean_absolute_error

# 1. 定义参数分布（包含XGBoost特有参数）
param_dist = {
    'n_estimators': np.arange(800, 2500, 100),
    'max_depth': [3, 4, 5,6],
    'learning_rate': [0.005, 0.02, 0.05],
    'subsample': [ 0.7,0.8, 0.9, 1],
    'colsample_bytree': [0.6,0.65,0.7, 0.75, 0.8,0.85, 0.9],
    'reg_alpha': [0, 0.1, 0.5],
    'reg_lambda': [0, 0.1, 0.5],
    'min_child_weight': [1, 3, 5],
    'gamma': [0, 0.1, 0.2],
    'eval_metric': ['mae'],  # 作为模型参数
    'early_stopping_rounds': [20]  # 作为模型参数
}

# 2. 初始化基础模型
base_model = XGBRegressor(
    random_state=42,
    tree_method='hist'
)

# 3. 初始化RandomizedSearchCV
random_search = RandomizedSearchCV(
    estimator=base_model,
    param_distributions=param_dist,
    n_iter=50,
    scoring='neg_mean_absolute_error',
    cv=5,
    verbose=2,
    n_jobs=-1,
    random_state=42,
    refit=True  # 自动用最佳参数重新训练
)

# 4. 执行搜索（只传递eval_set给fit）
random_search.fit(
    X_train, Y_train,
    eval_set=[(X_test, Y_test)],  # 这是唯一需要传递给fit的XGBoost特有参数
    verbose=False
)

# 5. 获取最佳模型
best_model = random_search.best_estimator_
print("\nBest Parameters:", random_search.best_params_)
print("Best MAE:", -random_search.best_score_)

# 6. 最终评估
Y_pred = best_model.predict(X_test)
print("Final Test MAE:", mean_absolute_error(Y_test, Y_pred))

Fitting 5 folds for each of 50 candidates, totalling 250 fits
[CV] END colsample_bytree=0.65, early_stopping_rounds=20, eval_metric=mae, gamma=0.2, learning_rate=0.02, max_depth=5, min_child_weight=3, n_estimators=1200, reg_alpha=0.5, reg_lambda=0, subsample=0.9; total time=   0.8s
[CV] END colsample_bytree=0.65, early_stopping_rounds=20, eval_metric=mae, gamma=0.2, learning_rate=0.02, max_depth=4, min_child_weight=1, n_estimators=2200, reg_alpha=0.5, reg_lambda=0.5, subsample=1; total time=   1.2s
[CV] END colsample_bytree=0.75, early_stopping_rounds=20, eval_metric=mae, gamma=0, learning_rate=0.005, max_depth=3, min_child_weight=3, n_estimators=1100, reg_alpha=0.5, reg_lambda=0.5, subsample=0.8; total time=   4.3s
[CV] END colsample_bytree=0.75, early_stopping_rounds=20, eval_metric=mae, gamma=0, learning_rate=0.005, max_depth=3, min_child_weight=3, n_estimators=1100, reg_alpha=0.5, reg_lambda=0.5, subsample=0.8; total time=   5.8s
[CV] END colsample_bytree=0.8, early_stopping_rounds

71 fits failed out of a total of 250.
The score on these train-test partitions for these parameters will be set to nan.
If these failures are not expected, you can try to debug them by setting error_score='raise'.

Below are more details about the failures:
--------------------------------------------------------------------------------
1 fits failed with the following error:
Traceback (most recent call last):
  File "/home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/sklearn/model_selection/_validation.py", line 729, in _fit_and_score
    estimator.fit(X_train, y_train, **fit_params)
  File "/home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/core.py", line 726, in inner_f
    return func(**kwargs)
  File "/home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/sklearn.py", line 1143, in fit
    train_dmatrix, evals = _wrap_evaluation_matrices(
  File "/home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/sklearn.py", 


Best Parameters: {'subsample': 0.7, 'reg_lambda': 0, 'reg_alpha': 0.1, 'n_estimators': 2400, 'min_child_weight': 3, 'max_depth': 3, 'learning_rate': 0.05, 'gamma': 0, 'eval_metric': 'mae', 'early_stopping_rounds': 20, 'colsample_bytree': 0.75}
Best MAE: 0.1148755526900966
Final Test MAE: 0.1099526233399894


### enhhanced data

In [2]:
adata_ST=sc.read_h5ad('/home/qyyuan/project/ST_GRN/TESLA/tutorial/results/enhanced_exp.h5ad')




In [3]:
svg_list=pd.read_csv('/home/zzhang/SPADE/SpaGCN/tutorial/svg-enhanced-scc-new.csv',header=0,sep=',',index_col=0)
svg_list=svg_list.index
svg_list=list(set(svg_list)&set(adata_ST.var.index))

In [7]:
X=adata_ST[:,svg_list].X
import numpy as np
#X = scaler.fit_transform(z_B)
adata_ST.obsm['spatial']= np.column_stack((adata_ST.obs['x'].values, adata_ST.obs['y'].values))
Y=(adata_ST.obsm['spatial']-(adata_ST.obsm['spatial'].min()))/((adata_ST.obsm['spatial'].max())-(adata_ST.obsm['spatial'].min()))
from sklearn.model_selection import train_test_split
X_train, X_test, Y_train, Y_test = train_test_split(
    X, Y, 
    test_size=0.333,  # 1/3 for testing, 2/3 for training
    random_state=42   # For reproducibility
)

In [8]:
import numpy as np
from xgboost import XGBRegressor
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import mean_absolute_error

# 1. 定义参数分布（包含XGBoost特有参数）
param_dist = {
    'n_estimators': np.arange(800, 2500, 100),
    'max_depth': [3, 4, 5,6],
    'learning_rate': [0.005, 0.02, 0.05],
    'subsample': [ 0.7,0.8, 0.9, 1],
    'colsample_bytree': [0.6,0.65,0.7, 0.75, 0.8,0.85, 0.9],
    'reg_alpha': [0, 0.1, 0.5],
    'reg_lambda': [0, 0.1, 0.5],
    'min_child_weight': [1, 3, 5],
    'gamma': [0, 0.1, 0.2],
    'eval_metric': ['mae'],  # 作为模型参数
    'early_stopping_rounds': [20]  # 作为模型参数
}

# 2. 初始化基础模型
base_model = XGBRegressor(
    random_state=42,
    tree_method='hist'
)

# 3. 初始化RandomizedSearchCV
random_search = RandomizedSearchCV(
    estimator=base_model,
    param_distributions=param_dist,
    n_iter=50,
    scoring='neg_mean_absolute_error',
    cv=5,
    verbose=2,
    n_jobs=-1,
    random_state=42,
    refit=True  # 自动用最佳参数重新训练
)

# 4. 执行搜索（只传递eval_set给fit）
random_search.fit(
    X_train, Y_train,
    eval_set=[(X_test, Y_test)],  # 这是唯一需要传递给fit的XGBoost特有参数
    verbose=False
)

# 5. 获取最佳模型
best_model = random_search.best_estimator_
print("\nBest Parameters:", random_search.best_params_)
print("Best MAE:", -random_search.best_score_)

# 6. 最终评估
Y_pred = best_model.predict(X_test)
print("Final Test MAE:", mean_absolute_error(Y_test, Y_pred))

Fitting 5 folds for each of 50 candidates, totalling 250 fits
[CV] END colsample_bytree=0.65, early_stopping_rounds=20, eval_metric=mae, gamma=0.2, learning_rate=0.02, max_depth=5, min_child_weight=3, n_estimators=1200, reg_alpha=0.5, reg_lambda=0, subsample=0.9; total time=   9.7s
[CV] END colsample_bytree=0.65, early_stopping_rounds=20, eval_metric=mae, gamma=0.2, learning_rate=0.02, max_depth=5, min_child_weight=3, n_estimators=1200, reg_alpha=0.5, reg_lambda=0, subsample=0.9; total time=  10.0s
[CV] END colsample_bytree=0.65, early_stopping_rounds=20, eval_metric=mae, gamma=0.2, learning_rate=0.02, max_depth=5, min_child_weight=3, n_estimators=1200, reg_alpha=0.5, reg_lambda=0, subsample=0.9; total time=  10.0s
[CV] END colsample_bytree=0.65, early_stopping_rounds=20, eval_metric=mae, gamma=0.2, learning_rate=0.02, max_depth=5, min_child_weight=3, n_estimators=1200, reg_alpha=0.5, reg_lambda=0, subsample=0.9; total time=  10.8s
[CV] END colsample_bytree=0.65, early_stopping_rounds=

74 fits failed out of a total of 250.
The score on these train-test partitions for these parameters will be set to nan.
If these failures are not expected, you can try to debug them by setting error_score='raise'.

Below are more details about the failures:
--------------------------------------------------------------------------------
1 fits failed with the following error:
Traceback (most recent call last):
  File "/home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/sklearn/model_selection/_validation.py", line 729, in _fit_and_score
    estimator.fit(X_train, y_train, **fit_params)
  File "/home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/core.py", line 726, in inner_f
    return func(**kwargs)
  File "/home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/sklearn.py", line 1143, in fit
    train_dmatrix, evals = _wrap_evaluation_matrices(
  File "/home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/sklearn.py", 

XGBoostError: [23:06:51] /workspace/src/data/array_interface.cu:44: Check failed: err == cudaGetLastError() (0 vs. 2) : 
Stack trace:
  [bt] (0) /home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/lib/libxgboost.so(+0x25c1ac) [0x7b6254a5c1ac]
  [bt] (1) /home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/lib/libxgboost.so(+0xa3f4bc) [0x7b625523f4bc]
  [bt] (2) /home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/lib/libxgboost.so(+0x4e3a2e) [0x7b6254ce3a2e]
  [bt] (3) /home/qyyuan/.conda/envs/STALocator/lib/python3.8/site-packages/xgboost/lib/libxgboost.so(XGDMatrixSetInfoFromInterface+0xb2) [0x7b6254963522]
  [bt] (4) /home/qyyuan/.conda/envs/STALocator/lib/python3.8/lib-dynload/../../libffi.so.8(+0xa052) [0x7b64e0482052]
  [bt] (5) /home/qyyuan/.conda/envs/STALocator/lib/python3.8/lib-dynload/../../libffi.so.8(+0x8925) [0x7b64e0480925]
  [bt] (6) /home/qyyuan/.conda/envs/STALocator/lib/python3.8/lib-dynload/../../libffi.so.8(ffi_call+0xde) [0x7b64e048106e]
  [bt] (7) /home/qyyuan/.conda/envs/STALocator/lib/python3.8/lib-dynload/_ctypes.cpython-38-x86_64-linux-gnu.so(_ctypes_callproc+0x31f) [0x7b64e05314af]
  [bt] (8) /home/qyyuan/.conda/envs/STALocator/lib/python3.8/lib-dynload/_ctypes.cpython-38-x86_64-linux-gnu.so(+0x9013) [0x7b64e0527013]



#调参

In [None]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import GridSearchCV

# 定义参数网格
param_grid = {
    'n_estimators': [100, 200, 300],
    'max_depth': [5, 10, 15, None],
    'min_samples_split': [2, 5, 10],
    'max_features': ['auto', 0.8]
}

# 初始化模型和搜索器
rf = RandomForestRegressor(random_state=42)
grid_search = GridSearchCV(
    estimator=rf,
    param_grid=param_grid,
    scoring='neg_mean_absolute_error',  # 或 'neg_mean_squared_error'
    cv=5,  # 5折交叉验证
    verbose=2,
    n_jobs=-1  # 并行计算
)

# 执行搜索
grid_search.fit(X_train, Y_train)

# 输出最优参数和性能
print("Best parameters:", grid_search.best_params_)
print("Best MAE:", -grid_search.best_score_)  # 注意：sklearn返回负值

Fitting 5 folds for each of 72 candidates, totalling 360 fits
[CV] END max_depth=5, max_features=auto, min_samples_split=5, n_estimators=100; total time=   0.0s
[CV] END max_depth=5, max_features=auto, min_samples_split=2, n_estimators=200; total time=   0.0s
[CV] END max_depth=5, max_features=auto, min_samples_split=5, n_estimators=100; total time=   0.0s
[CV] END max_depth=5, max_features=auto, min_samples_split=10, n_estimators=300; total time=   0.0s
[CV] END max_depth=5, max_features=auto, min_samples_split=5, n_estimators=300; total time=   0.0s
[CV] END max_depth=5, max_features=auto, min_samples_split=2, n_estimators=300; total time=   0.0s
[CV] END max_depth=5, max_features=auto, min_samples_split=2, n_estimators=200; total time=   0.0s
[CV] END max_depth=10, max_features=auto, min_samples_split=10, n_estimators=200; total time=   0.0s
[CV] END max_depth=5, max_features=auto, min_samples_split=5, n_estimators=300; total time=   0.0s
[CV] END max_depth=5, max_features=auto, min

In [None]:
from xgboost import XGBRegressor
from sklearn.model_selection import RandomizedSearchCV
import numpy as np

# 定义参数分布
param_dist = {
    'n_estimators': np.arange(100, 500, 50),
    'max_depth': np.arange(3, 10),
    'learning_rate': [0.01, 0.05, 0.1, 0.2],
    'subsample': [0.6, 0.8, 1.0],
    'colsample_bytree': [0.6, 0.8, 1.0],
    'reg_alpha': [0, 1, 2],
    'reg_lambda': [0, 1, 2]
}
param_dist = {
    'n_estimators': np.arange(400, 2000, 50),  # 增加树的数量
    'max_depth': np.arange(3, 20),             # 限制深度防止过拟合
    'learning_rate': [0.0001, 0.001, 0.002, 0.005, 0.01, 0.05,0.1],     # 更小的学习率
    'subsample': [0.5, 0.6, 0.7,0.8,0.9,1.0],             # 样本采样比例
    'colsample_bytree': [0.7, 0.8, 0.9],      # 特征采样比例
    'reg_alpha': [0, 0.1, 0.5],               # L1 正则化
    'reg_lambda': [0, 0.1, 0.5],              # L2 正则化
    'min_child_weight': [1, 3, 5]             # 叶子节点最小样本权重
}


# 初始化模型和搜索器
xgb = XGBRegressor(random_state=42)
random_search = RandomizedSearchCV(
    estimator=xgb,
    param_distributions=param_dist,
    n_iter=1500,  # 随机采样50组参数
    scoring='neg_mean_absolute_error',
    cv=5,
    verbose=2,
    n_jobs=-1
)

# 执行搜索
random_search.fit(X_train, Y_train, eval_set=[(X_test, Y_test)], verbose=False)

# 输出最优参数和性能
print("Best parameters:", random_search.best_params_)
print("Best MAE:", -random_search.best_score_)

Fitting 5 folds for each of 1500 candidates, totalling 7500 fits
[CV] END colsample_bytree=0.9, learning_rate=0.005, max_depth=3, min_child_weight=1, n_estimators=500, reg_alpha=0.1, reg_lambda=0, subsample=0.5; total time=  28.1s
[CV] END colsample_bytree=0.9, learning_rate=0.1, max_depth=4, min_child_weight=3, n_estimators=1350, reg_alpha=0.1, reg_lambda=0, subsample=1.0; total time=  28.9s
[CV] END colsample_bytree=0.9, learning_rate=0.1, max_depth=4, min_child_weight=3, n_estimators=1350, reg_alpha=0.1, reg_lambda=0, subsample=1.0; total time=  29.2s
[CV] END colsample_bytree=0.7, learning_rate=0.1, max_depth=3, min_child_weight=1, n_estimators=1000, reg_alpha=0.5, reg_lambda=0.5, subsample=0.8; total time=  30.4s
[CV] END colsample_bytree=0.9, learning_rate=0.005, max_depth=3, min_child_weight=1, n_estimators=500, reg_alpha=0.1, reg_lambda=0, subsample=0.5; total time=  31.1s
[CV] END colsample_bytree=0.9, learning_rate=0.1, max_depth=4, min_child_weight=3, n_estimators=1350, reg_

In [None]:
import numpy as np
from xgboost import XGBRegressor
from sklearn.model_selection import RandomizedSearchCV

# 第一阶段：粗调主要参数
phase1_params = {
    'n_estimators': np.arange(800, 2000, 100),  # 大幅增加树的数量
    'max_depth': [3, 4, 5],                    # 严格控制深度
    'learning_rate': [0.003, 0.005, 0.01],     # 更小的学习率
    'subsample': [0.4, 0.5, 0.6],              # 更激进的采样
    'colsample_bytree': [0.6, 0.7, 0.8],
    'reg_alpha': [0, 0.01, 0.05],              # 更精细的正则化
    'reg_lambda': [0, 0.01, 0.05],
    'min_child_weight': [5, 7, 10],            # 更高的节点权重
    'gamma': [0, 0.1, 0.2]                     # 新增分裂阈值参数
}

# 第二阶段：微调关键参数
phase2_params = {
    'n_estimators': np.arange(1500, 2500, 50),
    'learning_rate': np.linspace(0.003, 0.01, 5),
    'subsample': np.linspace(0.4, 0.6, 5),
    'colsample_bytree': np.linspace(0.6, 0.8, 5),
    'reg_alpha': np.linspace(0, 0.05, 5),
    'reg_lambda': np.linspace(0, 0.05, 5)
}

def optimize_xgb(params, n_iter=100):
    xgb = XGBRegressor(
        random_state=42,
        tree_method='hist',  # 使用hist加速
        enable_categorical=True,
        early_stopping_rounds=30
    )
    search = RandomizedSearchCV(
        estimator=xgb,
        param_distributions=params,
        n_iter=n_iter,
        scoring='neg_mean_absolute_error',
        cv=5,
        verbose=2,
        n_jobs=-1,
        random_state=42
    )
    search.fit(
        X_train, Y_train,
        eval_set=[(X_test, Y_test)],
        verbose=0
    )
    return search

# 执行两阶段优化
phase1_result = optimize_xgb(phase1_params, n_iter=150)
phase2_result = optimize_xgb(phase2_params, n_iter=50)

best_params = phase2_result.best_params_
print("Optimized Parameters:", best_params)
print("Best MAE:", -phase2_result.best_score_)

Fitting 5 folds for each of 150 candidates, totalling 750 fits
[CV] END colsample_bytree=0.6, gamma=0, learning_rate=0.01, max_depth=3, min_child_weight=7, n_estimators=1200, reg_alpha=0, reg_lambda=0, subsample=0.5; total time=   0.0s
[CV] END colsample_bytree=0.8, gamma=0.1, learning_rate=0.005, max_depth=4, min_child_weight=7, n_estimators=1800, reg_alpha=0, reg_lambda=0.05, subsample=0.5; total time=   3.2s
[CV] END colsample_bytree=0.6, gamma=0.1, learning_rate=0.01, max_depth=4, min_child_weight=5, n_estimators=1700, reg_alpha=0, reg_lambda=0, subsample=0.4; total time=   6.1s
[CV] END colsample_bytree=0.6, gamma=0.1, learning_rate=0.01, max_depth=4, min_child_weight=5, n_estimators=1700, reg_alpha=0, reg_lambda=0, subsample=0.4; total time=   6.2s
[CV] END colsample_bytree=0.6, gamma=0, learning_rate=0.01, max_depth=3, min_child_weight=7, n_estimators=1200, reg_alpha=0, reg_lambda=0, subsample=0.5; total time=   6.4s
[CV] END colsample_bytree=0.8, gamma=0.2, learning_rate=0.01, 

In [None]:
import numpy as np
from xgboost import XGBRegressor
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import mean_absolute_error

# 超参数空间（极限优化版）
param_dist = {
    'n_estimators': np.arange(2000, 5000, 100),  # 极大量树
    'max_depth': [2, 3],                       # 更浅的树
    'learning_rate': np.linspace(0.001, 0.01, 10),
    'subsample': np.linspace(0.3, 0.6, 5),     # 更激进采样
    'colsample_bytree': np.linspace(0.5, 0.7, 5),
    'reg_alpha': np.linspace(0, 0.1, 5),
    'reg_lambda': np.linspace(0.1, 1, 5),
    'min_child_weight': [5, 10, 15],           # 更高限制
    'gamma': [0.5, 1, 2]                       # 更强分裂控制
}

xgb = XGBRegressor(
    random_state=42,
    tree_method='gpu_hist',  # 使用GPU加速
    enable_categorical=True,
    eval_metric='mae'
)

search = RandomizedSearchCV(
    estimator=xgb,
    param_distributions=param_dist,
    n_iter=200,              # 大幅增加尝试次数
    scoring='neg_mean_absolute_error',
    cv=5,
    verbose=2,
    n_jobs=-1,
    random_state=42
)

search.fit(
    X_train, Y_train,
    eval_set=[(X_test, Y_test)],
    early_stopping_rounds=50,
    verbose=0
)

best_params = search.best_params_
print("Optimized Parameters:", best_params)
print("Best MAE:", -search.best_score_)

In [None]:
import numpy as np
import pandas as pd
from xgboost import XGBRegressor
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import mean_absolute_error
from sklearn.preprocessing import StandardScaler
from lightgbm import LGBMRegressor
from sklearn.ensemble import VotingRegressor

# 1. 数据准备（假设 X_train, Y_train, X_test, Y_test 已定义）
# 标准化数据（可选，对线性模型更有效）
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 2. XGBoost 参数优化
param_dist = {
    'n_estimators': np.arange(400, 1000, 50),  # 增加树的数量
    'max_depth': np.arange(3, 6),             # 限制深度防止过拟合
    'learning_rate': [0.005, 0.01, 0.05],     # 更小的学习率
    'subsample': [0.5, 0.6, 0.7],             # 样本采样比例
    'colsample_bytree': [0.7, 0.8, 0.9],      # 特征采样比例
    'reg_alpha': [0, 0.1, 0.5],               # L1 正则化
    'reg_lambda': [0, 0.1, 0.5],              # L2 正则化
    'min_child_weight': [1, 3, 5]             # 叶子节点最小样本权重
}

xgb = XGBRegressor(random_state=42, tree_method='hist')  # 使用hist加速训练
random_search = RandomizedSearchCV(
    estimator=xgb,
    param_distributions=param_dist,
    n_iter=100,                               # 增加搜索次数
    scoring='neg_mean_absolute_error',
    cv=5,
    verbose=2,
    n_jobs=-1
)

# 启用早停和评估
random_search.fit(
    X_train, Y_train,
    eval_set=[(X_test, Y_test)],
    early_stopping_rounds=20,
    eval_metric='mae',
    verbose=False
)

# 输出最优参数
print("\nBest Parameters:", random_search.best_params_)
print("Best MAE:", -random_search.best_score_)

# 3. 用最优参数训练最终模型
best_xgb = random_search.best_estimator_
best_xgb.fit(
    X_train, Y_train,
    eval_set=[(X_test, Y_test)],
    early_stopping_rounds=20,
    verbose=False
)

# 预测和评估
Y_test_pred_xgb = best_xgb.predict(X_test)
mae_xgb = mean_absolute_error(Y_test, Y_test_pred_xgb)
print("\nXGBoost Test MAE:", mae_xgb)

# 4. 特征重要性分析
importance = best_xgb.feature_importances_
feature_importance = pd.DataFrame({
    'feature': X_train.columns,
    'importance': importance
}).sort_values('importance', ascending=False)
print("\nTop 10 Important Features:")
print(feature_importance.head(10))

# 5. 尝试 LightGBM（对比）
lgbm = LGBMRegressor(
    n_estimators=1000,
    learning_rate=0.01,
    max_depth=5,
    reg_alpha=0.1,
    reg_lambda=0.1,
    random_state=42
)
lgbm.fit(
    X_train, Y_train,
    eval_set=[(X_test, Y_test)],
    early_stopping_rounds=20,
    verbose=False
)
Y_test_pred_lgbm = lgbm.predict(X_test)
mae_lgbm = mean_absolute_error(Y_test, Y_test_pred_lgbm)
print("\nLightGBM Test MAE:", mae_lgbm)

# 6. 模型集成（XGBoost + LightGBM）
ensemble = VotingRegressor([
    ('xgb', best_xgb),
    ('lgbm', lgbm)
])
ensemble.fit(X_train, Y_train)
Y_test_pred_ensemble = ensemble.predict(X_test)
mae_ensemble = mean_absolute_error(Y_test, Y_test_pred_ensemble)
print("\nEnsemble Test MAE:", mae_ensemble)

Fitting 5 folds for each of 100 candidates, totalling 500 fits
[CV] END colsample_bytree=0.8, learning_rate=0.01, max_depth=4, min_child_weight=5, n_estimators=450, reg_alpha=0.1, reg_lambda=0.5, subsample=0.5; total time=   0.0s
[CV] END colsample_bytree=0.8, learning_rate=0.01, max_depth=4, min_child_weight=5, n_estimators=450, reg_alpha=0.1, reg_lambda=0.5, subsample=0.5; total time=   0.0s
[CV] END colsample_bytree=0.8, learning_rate=0.01, max_depth=4, min_child_weight=5, n_estimators=450, reg_alpha=0.1, reg_lambda=0.5, subsample=0.5; total time=   0.0s
[CV] END colsample_bytree=0.8, learning_rate=0.01, max_depth=4, min_child_weight=5, n_estimators=450, reg_alpha=0.1, reg_lambda=0.5, subsample=0.5; total time=   0.0s
[CV] END colsample_bytree=0.8, learning_rate=0.01, max_depth=4, min_child_weight=5, n_estimators=450, reg_alpha=0.1, reg_lambda=0.5, subsample=0.5; total time=   0.0s
[CV] END colsample_bytree=0.8, learning_rate=0.01, max_depth=5, min_child_weight=5, n_estimators=500, 

ValueError: 
All the 500 fits failed.
It is very likely that your model is misconfigured.
You can try to debug the error by setting error_score='raise'.

Below are more details about the failures:
--------------------------------------------------------------------------------
500 fits failed with the following error:
Traceback (most recent call last):
  File "/home/qyyuan/.conda/envs/STALocator-zzqyy/lib/python3.9/site-packages/sklearn/model_selection/_validation.py", line 866, in _fit_and_score
    estimator.fit(X_train, y_train, **fit_params)
  File "/home/qyyuan/.conda/envs/STALocator-zzqyy/lib/python3.9/site-packages/xgboost/core.py", line 726, in inner_f
    return func(**kwargs)
TypeError: fit() got an unexpected keyword argument 'early_stopping_rounds'


In [None]:
import numpy as np
from xgboost import XGBRegressor
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import mean_absolute_error

# 定义参数分布
param_dist = {
    'n_estimators': np.arange(400, 1000, 50),
    'max_depth': np.arange(3, 6),
    'learning_rate': [0.005, 0.01, 0.05],
    'subsample': [0.5, 0.6, 0.7],
    'colsample_bytree': [0.7, 0.8, 0.9],
    'reg_alpha': [0, 0.1, 0.5],
    'reg_lambda': [0, 0.1, 0.5],
    'min_child_weight': [1, 3, 5],
    'eval_metric': ['mae'],  # 将eval_metric作为模型参数
    'early_stopping_rounds': [20]  # 将early_stopping_rounds作为模型参数
}

# 初始化模型
xgb = XGBRegressor(random_state=42)

# 初始化搜索器
random_search = RandomizedSearchCV(
    estimator=xgb,
    param_distributions=param_dist,
    n_iter=50,
    scoring='neg_mean_absolute_error',
    cv=5,
    verbose=2,
    n_jobs=-1,
    random_state=42,
    refit=True  # 自动用最佳参数重新训练
)

# 执行搜索
random_search.fit(
    X_train, Y_train,
    eval_set=[(X_test, Y_test)],  # 这是唯一需要传递给fit的参数
    verbose=False
)

# 输出最优参数和性能
print("\nBest Parameters:", random_search.best_params_)
print("Best MAE:", -random_search.best_score_)

# 最终评估（使用自动refit的模型）
best_model = random_search.best_estimator_
Y_test_pred = best_model.predict(X_test)
print("\nFinal Test MAE:", mean_absolute_error(Y_test, Y_test_pred))

Fitting 5 folds for each of 50 candidates, totalling 250 fits
[CV] END colsample_bytree=0.8, early_stopping_rounds=20, eval_metric=mae, learning_rate=0.05, max_depth=4, min_child_weight=3, n_estimators=650, reg_alpha=0.1, reg_lambda=0.1, subsample=0.5; total time=  26.5s
[CV] END colsample_bytree=0.8, early_stopping_rounds=20, eval_metric=mae, learning_rate=0.05, max_depth=4, min_child_weight=1, n_estimators=850, reg_alpha=0, reg_lambda=0, subsample=0.5; total time=  27.4s
[CV] END colsample_bytree=0.9, early_stopping_rounds=20, eval_metric=mae, learning_rate=0.05, max_depth=3, min_child_weight=3, n_estimators=400, reg_alpha=0, reg_lambda=0, subsample=0.7; total time=  28.0s
[CV] END colsample_bytree=0.8, early_stopping_rounds=20, eval_metric=mae, learning_rate=0.05, max_depth=5, min_child_weight=3, n_estimators=400, reg_alpha=0, reg_lambda=0, subsample=0.7; total time=  28.0s
[CV] END colsample_bytree=0.7, early_stopping_rounds=20, eval_metric=mae, learning_rate=0.05, max_depth=3, min

In [None]:
import numpy as np
from xgboost import XGBRegressor
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import mean_absolute_error

# 1. 定义参数分布（包含XGBoost特有参数）
param_dist = {
    'n_estimators': np.arange(800, 1500, 100),
    'max_depth': [3, 4, 5],
    'learning_rate': [0.005, 0.01, 0.05],
    'subsample': [0.5, 0.6, 0.7],
    'colsample_bytree': [0.7, 0.8, 0.9],
    'reg_alpha': [0, 0.1, 0.5],
    'reg_lambda': [0, 0.1, 0.5],
    'min_child_weight': [1, 3, 5],
    'gamma': [0, 0.1, 0.2],
    'eval_metric': ['mae'],  # 作为模型参数
    'early_stopping_rounds': [20]  # 作为模型参数
}

# 2. 初始化基础模型
base_model = XGBRegressor(
    random_state=42,
    tree_method='hist'
)

# 3. 初始化RandomizedSearchCV
random_search = RandomizedSearchCV(
    estimator=base_model,
    param_distributions=param_dist,
    n_iter=50,
    scoring='neg_mean_absolute_error',
    cv=5,
    verbose=2,
    n_jobs=-1,
    random_state=42,
    refit=True  # 自动用最佳参数重新训练
)

# 4. 执行搜索（只传递eval_set给fit）
random_search.fit(
    X_train, Y_train,
    eval_set=[(X_test, Y_test)],  # 这是唯一需要传递给fit的XGBoost特有参数
    verbose=False
)

# 5. 获取最佳模型
best_model = random_search.best_estimator_
print("\nBest Parameters:", random_search.best_params_)
print("Best MAE:", -random_search.best_score_)

# 6. 最终评估
Y_pred = best_model.predict(X_test)
print("Final Test MAE:", mean_absolute_error(Y_test, Y_pred))

Fitting 5 folds for each of 50 candidates, totalling 250 fits
[CV] END colsample_bytree=0.9, early_stopping_rounds=20, eval_metric=mae, gamma=0.1, learning_rate=0.05, max_depth=3, min_child_weight=1, n_estimators=1000, reg_alpha=0.1, reg_lambda=0, subsample=0.7; total time=  26.5s
[CV] END colsample_bytree=0.9, early_stopping_rounds=20, eval_metric=mae, gamma=0.2, learning_rate=0.05, max_depth=3, min_child_weight=5, n_estimators=1200, reg_alpha=0.5, reg_lambda=0, subsample=0.7; total time=  27.1s
[CV] END colsample_bytree=0.9, early_stopping_rounds=20, eval_metric=mae, gamma=0.2, learning_rate=0.05, max_depth=3, min_child_weight=5, n_estimators=1200, reg_alpha=0.5, reg_lambda=0, subsample=0.7; total time=  27.8s
[CV] END colsample_bytree=0.8, early_stopping_rounds=20, eval_metric=mae, gamma=0, learning_rate=0.05, max_depth=3, min_child_weight=3, n_estimators=900, reg_alpha=0.1, reg_lambda=0.5, subsample=0.5; total time=  27.7s
[CV] END colsample_bytree=0.8, early_stopping_rounds=20, ev

In [None]:
from sklearn.linear_model import Lasso
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

# 定义参数网格
param_grid = {'alpha': [0.001, 0.01, 0.1, 1, 10, 100]}

# 初始化模型和搜索器
lasso = Lasso(random_state=42)
grid_search = GridSearchCV(
    estimator=lasso,
    param_grid=param_grid,
    scoring='neg_mean_absolute_error',  # 优化MAE（也可用 'neg_mean_squared_error'）
    cv=5,  # 5折交叉验证
    verbose=1,
    n_jobs=-1
)

# 执行搜索
grid_search.fit(X_train, Y_train)

# 输出最优参数
print("Best alpha:", grid_search.best_params_['alpha'])

# 用最优参数重新训练模型
best_lasso = Lasso(alpha=grid_search.best_params_['alpha'], random_state=42)
best_lasso.fit(X_train, Y_train)

# 预测和评估
Y_train_pred = best_lasso.predict(X_train)
Y_test_pred = best_lasso.predict(X_test)

print("\nLasso Regression (Optimized):")
print(f"  Train MAE: {mean_absolute_error(Y_train, Y_train_pred):.4f}, R²: {r2_score(Y_train, Y_train_pred):.4f}")
print(f"  Test  MAE: {mean_absolute_error(Y_test, Y_test_pred):.4f}, R²: {r2_score(Y_test, Y_test_pred):.4f}")
print(f"  Train MSE: {mean_squared_error(Y_train, Y_train_pred):.4f}, R²: {r2_score(Y_train, Y_train_pred):.4f}")
print(f"  Test  MSE: {mean_squared_error(Y_test, Y_test_pred):.4f}, R²: {r2_score(Y_test, Y_test_pred):.4f}")

Fitting 5 folds for each of 6 candidates, totalling 30 fits


  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(


Best alpha: 0.01

Lasso Regression (Optimized):
  Train MAE: 0.1002, R²: 0.6019
  Test  MAE: 0.1332, R²: 0.3463
  Train MSE: 0.0174, R²: 0.6019
  Test  MSE: 0.0289, R²: 0.3463


In [None]:
import numpy as np
from sklearn.linear_model import Lasso
from sklearn.metrics import mean_absolute_error, mean_squared_error

# 候选的 alpha 值
alphas = [0.001, 0.01, 0.1, 1, 10, 100]

# 存储结果
results = []

for alpha in alphas:
    # 训练模型
    model = Lasso(alpha=alpha, random_state=42)
    model.fit(X_train, Y_train)
    
    # 预测测试集
    Y_test_pred = model.predict(X_test)
    
    # 计算指标
    mae = mean_absolute_error(Y_test, Y_test_pred)
    mse = mean_squared_error(Y_test, Y_test_pred)
    
    # 记录结果
    results.append({
        'alpha': alpha,
        'MAE': mae,
        'MSE': mse,
        'coef': model.coef_  # 查看系数稀疏性
    })

# 转换为DataFrame并打印
import pandas as pd
results_df = pd.DataFrame(results)
print(results_df.sort_values('MAE'))  # 按MAE排序

# 最优参数（MAE最小）
best_alpha = results_df.loc[results_df['MAE'].idxmin(), 'alpha']
print(f"\nBest alpha (minimize MAE): {best_alpha}")

     alpha       MAE       MSE  \
1    0.010  0.133188  0.028878   
2    0.100  0.145437  0.031886   
3    1.000  0.160688  0.038259   
4   10.000  0.178547  0.044311   
5  100.000  0.178547  0.044311   
0    0.001  0.220601  0.080892   

                                                coef  
1  [[-0.0, 0.001936920368848395, -0.0, -0.0005856...  
2  [[-0.0, 0.0, -0.0, -0.0, -0.0, -0.0, -0.0, 0.0...  
3  [[-0.0, -0.0, -0.0, -0.0, -0.0, -0.0, -0.0, -0...  
4  [[-0.0, -0.0, -0.0, -0.0, -0.0, -0.0, -0.0, -0...  
5  [[-0.0, -0.0, -0.0, -0.0, -0.0, -0.0, -0.0, -0...  
0  [[-0.004353598409109726, 0.011958376109296015,...  

Best alpha (minimize MAE): 0.01


In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score

# 创建管道：标准化 + 线性回归
pipeline = Pipeline([
    ('scaler', StandardScaler()),  # 标准化特征
    ('linear', LinearRegression())
])

# 交叉验证评估
scores = cross_val_score(
    pipeline, X_train, Y_train,
    cv=5,
    scoring='neg_mean_absolute_error'  # 也可用 'neg_mean_squared_error'
)
print("Cross-validated MAE scores:", -scores)
print("Mean MAE:", -scores.mean())

# 训练最终模型
pipeline.fit(X_train, Y_train)
Y_test_pred = pipeline.predict(X_test)
print("\nLinear Regression (Standardized):")
print(f"  Test MAE: {mean_absolute_error(Y_test, Y_test_pred):.4f}")
print(f"  Test MSE: {mean_squared_error(Y_test, Y_test_pred):.4f}")

Cross-validated MAE scores: [3.21102025 2.66971784 4.0147376  1.63623377 3.2493282 ]
Mean MAE: 2.9562075327474213

Linear Regression (Standardized):
  Test MAE: 0.4066
  Test MSE: 0.3236


In [None]:
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression

# 标准化特征
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 训练模型
model = LinearRegression()
model.fit(X_train_scaled, Y_train)

# 评估
Y_test_pred = model.predict(X_test_scaled)
print("Test MAE:", mean_absolute_error(Y_test, Y_test_pred))
print("Test MSE:", mean_squared_error(Y_test, Y_test_pred))

In [None]:
!pip install lightgbm

Collecting lightgbm
  Downloading lightgbm-4.6.0-py3-none-manylinux_2_28_x86_64.whl.metadata (17 kB)
Downloading lightgbm-4.6.0-py3-none-manylinux_2_28_x86_64.whl (3.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.6/3.6 MB[0m [31m11.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: lightgbm
Successfully installed lightgbm-4.6.0


In [None]:
from sklearn.ensemble import RandomForestRegressor

model = RandomForestRegressor(n_estimators=100, max_depth=10, random_state=42)
model.fit(X_train, Y_train)
# 预测和评估（代码和 LinearRegression 相同）
Y_train_pred = model.predict(X_train)
Y_test_pred = model.predict(X_test)

# 输出结果
print("Lasso Regression:")

print(f"  Train MAE: {mean_absolute_error(Y_train, Y_train_pred):.4f}, R²: {r2_score(Y_train, Y_train_pred):.4f}")
print(f"  Test MAE: {mean_absolute_error(Y_test, Y_test_pred):.4f}, R²: {r2_score(Y_test, Y_test_pred):.4f}")
print(f"  Train MSE: {mean_squared_error(Y_train, Y_train_pred):.4f}, R²: {r2_score(Y_train, Y_train_pred):.4f}")
print(f"  Test MSE: {mean_squared_error(Y_test, Y_test_pred):.4f}, R²: {r2_score(Y_test, Y_test_pred):.4f}")

Lasso Regression:
  Train MAE: 0.0426, R²: 0.9253
  Test MAE: 0.1060, R²: 0.5617
  Train MSE: 0.0032, R²: 0.9253
  Test MSE: 0.0197, R²: 0.5617


In [None]:
from sklearn.linear_model import ElasticNet

model = ElasticNet(alpha=0.1, l1_ratio=0.5, random_state=42)  # l1_ratio 控制 L1/L2 比例
model.fit(X_train, Y_train)
# 预测和评估（代码和 LinearRegression 相同）
Y_train_pred = model.predict(X_train)
Y_test_pred = model.predict(X_test)

# 输出结果
print("Lasso Regression:")

print(f"  Train MAE: {mean_absolute_error(Y_train, Y_train_pred):.4f}, R²: {r2_score(Y_train, Y_train_pred):.4f}")
print(f"  Test MAE: {mean_absolute_error(Y_test, Y_test_pred):.4f}, R²: {r2_score(Y_test, Y_test_pred):.4f}")
print(f"  Train MSE: {mean_squared_error(Y_train, Y_train_pred):.4f}, R²: {r2_score(Y_train, Y_train_pred):.4f}")
print(f"  Test MSE: {mean_squared_error(Y_test, Y_test_pred):.4f}, R²: {r2_score(Y_test, Y_test_pred):.4f}")

Lasso Regression:
  Train MAE: 0.1269, R²: 0.4087
  Test MAE: 0.1347, R²: 0.3469
  Train MSE: 0.0258, R²: 0.4087
  Test MSE: 0.0288, R²: 0.3469


In [None]:
from sklearn.linear_model import Ridge

model = Ridge(alpha=1.0, random_state=42)  # alpha 控制正则化强度
model.fit(X_train, Y_train)
# 其余代码同上

# 预测和评估（代码和 LinearRegression 相同）
Y_train_pred = model.predict(X_train)
Y_test_pred = model.predict(X_test)

# 输出结果
print("Lasso Regression:")

print(f"  Train MAE: {mean_absolute_error(Y_train, Y_train_pred):.4f}, R²: {r2_score(Y_train, Y_train_pred):.4f}")
print(f"  Test MAE: {mean_absolute_error(Y_test, Y_test_pred):.4f}, R²: {r2_score(Y_test, Y_test_pred):.4f}")
print(f"  Train MSE: {mean_squared_error(Y_train, Y_train_pred):.4f}, R²: {r2_score(Y_train, Y_train_pred):.4f}")
print(f"  Test MSE: {mean_squared_error(Y_test, Y_test_pred):.4f}, R²: {r2_score(Y_test, Y_test_pred):.4f}")

Lasso Regression:
  Train MAE: 0.0533, R²: 0.8628
  Test MAE: 0.3781, R²: -5.4023
  Train MSE: 0.0060, R²: 0.8628
  Test MSE: 0.2773, R²: -5.4023


### reduce overfitting

#### load data

In [None]:
import pandas as pd
data0=pd.read_csv( "T3/results/latent.csv",index_col=0)
z_A=data0.values[0:(data0['batch'].values=='scRNA-seq').sum(),:20].astype(float)
z_B=data0.values[0:(data0['batch'].values=='ST').sum(),:20].astype(float)
adata_ST=sc.read_h5ad('T3/results/adata_ST.h5ad')
import umap.umap_ as umap
from sklearn.preprocessing import StandardScaler
#scaler = StandardScaler()
#X = scaler.fit_transform(z_B)
Y=adata_ST.obsm['spatial']/50
z_A0=z_A.copy()
z_B0=z_B.copy()

def split_into_k_folds(n_samples, k=5, seed=None):
    if seed is not None:
        np.random.seed(seed)
    indices = np.random.permutation(n_samples)  # 打乱索引
    return np.array_split(indices, k)           # 均匀切分为 k 份

In [None]:
Y = Y.astype(np.float32)
N_st=adata_ST.X.shape[0]
N_rna=z_A.shape[0]
cross_k=3
sample_indices_st = split_into_k_folds(N_st, k=cross_k, seed=42)
sample_indices_rna = split_into_k_folds(N_rna, k=cross_k, seed=42)
i=0
training_idx_rna=np.array(range(N_rna))
training_idx_st=np.array(list(set(range(N_st))-set(sample_indices_st[i])))
test_idx_st=np.array(sample_indices_st[i])
z_B=z_B[training_idx_st,:]
c_B=Y[training_idx_st,:]

In [None]:
# 修改后的隐藏层维度（示例）
n_hiddens_encoder = [256, 128, 64]  # 逐步降低维度
n_hiddens_decoder = [64, 128, 256]  # 对称结构

class encoder_site(nn.Module):
    def __init__(self, n_input, n_latent):
        super(encoder_site, self).__init__()
        self.n_input = n_input
        self.n_latent = n_latent
        s#elf.device='cpu'
        n_hidd_1 = n_hiddens_encoder[0]
        n_hidd_2 = n_hiddens_encoder[1]
        n_hidd_3 = n_hiddens_encoder[2]
        #n_hidd_4 = 10

        self.fc1 = nn.Linear(n_input, n_hidd_1)
        self.fc1_bn = nn.BatchNorm1d(n_hidd_1)
        self.fc2 = nn.Linear(n_hidd_1, n_hidd_2)
        self.fc2_bn = nn.BatchNorm1d(n_hidd_2)
        self.fc3 = nn.Linear(n_hidd_2, n_hidd_3)
        self.fc3_bn = nn.BatchNorm1d(n_hidd_3)
        #self.fc4 = nn.Linear(n_hidd_3, n_hidd_4)
        #self.fc4_bn = nn.BatchNorm1d(n_hidd_4)
        self.fc5 = nn.Linear(n_hidd_3, n_latent)

    def forward(self, input):
        h1 = F.relu(self.fc1_bn(self.fc1(input)))
        h2 = F.relu(self.fc2_bn(self.fc2(h1)))
        h3 = F.relu(self.fc3_bn(self.fc3(h2)))
        #h4 = F.relu(self.fc4_bn(self.fc4(h3)))
        #return F.relu(self.fc5(h4))
        return self.fc5(h3)

class decoder_site(nn.Module):
    def __init__(self, n_input, n_latent):
        super(decoder_site, self).__init__()
        self.n_input = n_input
        self.n_latent = n_latent
        #self.device='cpu'
        n_hidd_6 =  n_hiddens_decoder[0]
        n_hidd_7 = n_hiddens_decoder[1]
        n_hidd_8 = n_hiddens_decoder[2]
        #n_hidd_9 = n_hiddens_decoder[2]

        self.fc6 = nn.Linear(n_latent, n_hidd_6)
        self.fc6_bn = nn.BatchNorm1d(n_hidd_6)
        self.fc7 = nn.Linear(n_hidd_6, n_hidd_7)
        self.fc7_bn = nn.BatchNorm1d(n_hidd_7)
        self.fc8 = nn.Linear(n_hidd_7, n_hidd_8)
        self.fc8_bn = nn.BatchNorm1d(n_hidd_8)
        #self.fc9 = nn.Linear(n_hidd_8, n_hidd_9)
        #self.fc9_bn = nn.BatchNorm1d(n_hidd_9)
        self.fc10 = nn.Linear(n_hidd_8, n_input)

    def forward(self, z):
        h6 = F.relu(self.fc6_bn(self.fc6(z)))
        h7 = F.relu(self.fc7_bn(self.fc7(h6)))
        h8 = F.relu(self.fc8_bn(self.fc8(h7)))
        #h9 = F.relu(self.fc9_bn(self.fc9(h8)))
        return self.fc10(h8)

In [None]:
        n_latent=20
        lambdalat=10
        lambdarec = 0.1
        num_projections=50
        E_s = encoder_site(20, 2).to('cpu')
        D_s = decoder_site(20, 2).to('cpu')
        params_S = list(E_s.parameters()) + list(D_s.parameters())
        optimizer_S = optim.Adam(params_S, lr=0.001, weight_decay=0.)
        z_A=torch.tensor(z_A).float()
        z_B=torch.tensor(z_B).float()
        c_B = torch.tensor(c_B).float()
        
        

  z_A=torch.tensor(z_A).float()
  z_B=torch.tensor(z_B).float()
  c_B = torch.tensor(c_B).float()


In [None]:
            for j in range(5000):
                m_A = E_s(z_A)
                m_B = E_s(z_B)
                
                s_Brecon = D_s(m_B)
                optimizer_S.zero_grad()
                loss_lat = loss1(m_B, c_B) + 0.1 * sliced_wasserstein_distance(m_B, c_B, num_projections=num_projections, device='cpu')
                loss_rec = loss2(s_Brecon, z_B) + 0.1 * loss1(s_Brecon, z_B)
                loss_S = lambdalat * loss_lat + lambdarec * loss_rec
                loss_S.backward()
                optimizer_S.step()

                if not j % 500:
                    print("step %d, loss_lat=%.4f, loss_rec=%.4f" % (j, loss_lat, loss_rec))

step 0, loss_lat=0.8578, loss_rec=0.8722
step 500, loss_lat=0.0111, loss_rec=0.3196
step 2500, loss_lat=0.0065, loss_rec=0.2121
step 3000, loss_lat=0.0059, loss_rec=0.1732
step 3500, loss_lat=0.0063, loss_rec=0.1801
step 4000, loss_lat=0.0038, loss_rec=0.1265
step 4500, loss_lat=0.0036, loss_rec=0.1079


#### test

In [None]:
y_pred=E_s(torch.tensor(z_B0).float())

## simulation

In [None]:
N_s=adata_ST.X.shape[0]


33

In [None]:
import random

# Randomly select 33 unique numbers between 1 and 666
selected_numbers = random.sample(range(N_s), int(np.floor(N_s*0.05)))
# Sort the numbers for better readability
selected_numbers_spot = sorted(selected_numbers)

selected_numbers = random.sample(range(adata_ST.X.shape[1]), 100)
# Sort the numbers for better readability
selected_numbers_gene = sorted(selected_numbers)

real_exp=adata_ST.X[selected_numbers_spot,:][:,selected_numbers_gene]

real_exp_0=(real_exp==0)

real_exp[real_exp_0]=1
real_exp[~real_exp_0]=0

adata_ST.X[np.ix_(selected_numbers_spot, selected_numbers_gene)]=real_exp

In [None]:
experiment_path = "/project/zduren/durenlab/palmetto/Kaya/ST/STALocator/ours/T3"
model_path = os.path.join(experiment_path, "models")
data_path = os.path.join(experiment_path, "data")
result_path = os.path.join(experiment_path, "results")
os.makedirs(model_path, exist_ok=True)
os.makedirs(data_path, exist_ok=True)
os.makedirs(result_path, exist_ok=True)
model = Model3(
    resolution="low",
    batch_size=200,
    train_epoch=10000,
    cut_steps=0.5,
    sf_coord=50,
    rad_cutoff=1.2,
    seed=1234,
    lambdacos=10,
    lambdaSWD=5,
    lambdalat=10,
    lambdarec=0.1,
    model_path=model_path,
    data_path=data_path,
    result_path=result_path,
    ot=False,
    device="cpu"
)

model.preprocess(adata_sc, adata_ST)

Finding highly variable genes...
# overlap highly variable genes is: 1609
Normalizing and scaling...
Dimensionality reduction via PCA...


In [None]:
Y=adata_ST.obs[['x','y']].values
Y = Y.astype(np.float32)
N_st=adata_ST.X.shape[0]
N_rna=adata_sc.X.shape[0]
Y=adata_ST.obs[['x','y']].values
Y = Y.astype(np.float32)
N_st=adata_ST.X.shape[0]
N_rna=adata_sc.X.shape[0]
training_idx_rna=np.array(range(N_rna))
training_idx_st=np.array(range(N_st))
model.train(training_idx_rna,training_idx_st)
y=model.eval()
final_aberrant, final_confidence,lambda_calib,pred_coords,true_coords=model.eval2(D_score=True, save_embedding=True, detect_aberrant=True, alpha=0.05)

Begining time:  Fri May 30 12:04:46 2025
step 0, total_loss=773.3005, loss_D=6.8151, loss_GAN=1.2465, loss_AE=34.5450, loss_cos=2.0370, loss_LA=33.1132, loss_SWD=12.0449
