In [None]:
import scanpy as sc
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pathlib as pl
import squidpy as sq

from typing import Tuple
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
import infercnvpy as cnv

In [None]:
from anndata import AnnData

In [None]:
from statannotations.Annotator import Annotator

In [None]:
import pathlib as pl

In [None]:
from ncem.interpretation import InterpreterDeconvolution
from ncem.train import TrainModelLinearDeconvolution
from ncem.data import get_data_custom, customLoaderDeconvolution

In [None]:
def pretty_ax(ax):
    ax.spines['right'].set_visible(False)
    ax.spines['top'].set_visible(False)
    ax.tick_params(
        axis='both',  
        which='both',      
        bottom=True,     
        top=False,
        left=False,
        labelbottom=True,
        labelleft = True)
    ax.spines["bottom"].set_linewidth(1.5)
    ax.spines["left"].set_linewidth(1.5)

In [None]:
def get_preprocessed_sample(sample_path: pl.Path, min_counts: int, pct_mt: int, min_cells: int) -> sc.AnnData:

    adata = sc.read_visium(path=sample_path)

    adata.var_names_make_unique()
    adata.var["mt"] = adata.var_names.str.startswith("MT-")
    sc.pp.calculate_qc_metrics(adata, qc_vars=["mt"], inplace=True)

    adata.obsm["spatial"] = adata.obsm["spatial"].astype(int)

    sc.pp.filter_cells(adata, min_counts=min_counts)
    adata = adata[adata.obs["pct_counts_mt"] < pct_mt]
    print(f"#cells after MT filter: {adata.n_obs}")
    sc.pp.filter_genes(adata, min_cells=min_cells)
    
    return adata

In [None]:
spatial_dir = pl.Path("/add/path/here/")

In [None]:
cell2location_results_dir = pl.Path("/add/path/here/")

# EGSFR0074_A

In [None]:
patient_name = "EGSFR0074_A"

In [None]:
resdir = cell2location_results_dir / patient_name 

sample_path = resdir / patient_name

adata = sc.read_h5ad(resdir / "cell2location_map" / "sp.h5ad")

tissue_path = spatial_dir / patient_name / "spatial/tissue_positions_list.csv"
tissue_position = pd.read_csv(tissue_path,index_col=0)
tissue_position = tissue_position.loc[adata.obs_names]

#Set coordinates
x_array=tissue_position["array_row"].tolist()
y_array=tissue_position["array_col"].tolist()
x_pixel=tissue_position["pxl_row_in_fullres"].tolist()
y_pixel=tissue_position["pxl_col_in_fullres"].tolist()

x_min, x_max = np.min(x_pixel), np.max(x_pixel)
y_min, y_max = np.min(y_pixel), np.max(y_pixel)

In [None]:
cell2loc_res = pd.read_csv(cell2location_results_dir / patient_name / "cell2location_map" / "celltype_abundance.csv",index_col=0)

In [None]:
cell_types = cell2loc_res.columns.to_numpy()

cell_expression = []
node_types = []
proportions = []
spatial = []
for i, ct in enumerate(cell_types):
    proportions.append(cell2loc_res)
    cell_expression.append(adata.layers[ct].toarray())
    nt = np.zeros((cell2loc_res.shape[0], len(cell_types)))
    nt[:, i] = 1
    node_types.append(nt)
    spatial.append(adata.obsm['spatial'])
    
proportions = pd.DataFrame(np.concatenate(proportions), columns=cell_types)
cell_expression = pd.DataFrame(np.concatenate(cell_expression), columns=adata.var_names)
node_types = pd.DataFrame(np.concatenate(node_types), columns=cell_types)
spatial = pd.DataFrame(np.concatenate(spatial))

In [None]:
ad = AnnData(cell_expression)
ad.obsm['proportions'] = np.array(proportions)
ad.obsm['node_types'] = np.array(node_types)
ad.obsm['spatial'] = np.array(spatial)

ad.uns["node_type_names"] = {x: x for x in cell_types}

ad.var_names = adata.var_names

sc.pp.log1p(ad)
sc.pp.highly_variable_genes(ad, n_top_genes=2000, subset=True,)

h_0 = pd.DataFrame(ad.obsm['node_types'], columns=list(ad.uns['node_type_names'].values()))
target_type = pd.DataFrame(np.array(h_0.idxmax(axis=1)), columns=["target_cell"]).reset_index()
ad.obs = target_type

random_library_id = np.random.randint(2, size=ad.shape[0])
ad.obs['library_id'] = pd.Categorical([f"image_{i}" for i in random_library_id])

In [None]:
ncem_ip = InterpreterDeconvolution()

In [None]:
ncem_ip.data = customLoaderDeconvolution(
    adata=ad, patient=None, library_id='library_id', radius=None
)

In [None]:
get_data_custom(interpreter=ncem_ip, deconvolution=True)

In [None]:
ncem_ip.get_sender_receiver_effects()

In [None]:
type_coupling = ncem_ip.type_coupling_analysis_circular(
    edge_attr='magnitude', edge_width_scale=0.2, figsize=(5,5), text_space=1.28, 
    de_genes_threshold=25,)

# EGSFR1938_A

In [None]:
patient_name = "EGSFR1938_A"

In [None]:
resdir = cell2location_results_dir / patient_name 

sample_path = resdir / patient_name

adata = sc.read_h5ad(resdir / "cell2location_map" / "sp.h5ad")

tissue_path = spatial_dir / patient_name / "spatial/tissue_positions_list.csv"
tissue_position = pd.read_csv(tissue_path,index_col=0)
tissue_position = tissue_position.loc[adata.obs_names]

#Set coordinates
x_array=tissue_position["array_row"].tolist()
y_array=tissue_position["array_col"].tolist()
x_pixel=tissue_position["pxl_row_in_fullres"].tolist()
y_pixel=tissue_position["pxl_col_in_fullres"].tolist()

x_min, x_max = np.min(x_pixel), np.max(x_pixel)
y_min, y_max = np.min(y_pixel), np.max(y_pixel)

In [None]:
cell2loc_res = pd.read_csv(cell2location_results_dir / patient_name / "cell2location_map" / "celltype_abundance.csv",index_col=0)

In [None]:
cell_types = cell2loc_res.columns.to_numpy()

cell_expression = []
node_types = []
proportions = []
spatial = []
for i, ct in enumerate(cell_types):
    proportions.append(cell2loc_res)
    cell_expression.append(adata.layers[ct].toarray())
    nt = np.zeros((cell2loc_res.shape[0], len(cell_types)))
    nt[:, i] = 1
    node_types.append(nt)
    spatial.append(adata.obsm['spatial'])
    
proportions = pd.DataFrame(np.concatenate(proportions), columns=cell_types)
cell_expression = pd.DataFrame(np.concatenate(cell_expression), columns=adata.var_names)
node_types = pd.DataFrame(np.concatenate(node_types), columns=cell_types)
spatial = pd.DataFrame(np.concatenate(spatial))

In [None]:
ad = AnnData(cell_expression)
ad.obsm['proportions'] = np.array(proportions)
ad.obsm['node_types'] = np.array(node_types)
ad.obsm['spatial'] = np.array(spatial)

ad.uns["node_type_names"] = {x: x for x in cell_types}

ad.var_names = adata.var_names

sc.pp.log1p(ad)
sc.pp.highly_variable_genes(ad, n_top_genes=2000, subset=True,)

h_0 = pd.DataFrame(ad.obsm['node_types'], columns=list(ad.uns['node_type_names'].values()))
target_type = pd.DataFrame(np.array(h_0.idxmax(axis=1)), columns=["target_cell"]).reset_index()
ad.obs = target_type

random_library_id = np.random.randint(2, size=ad.shape[0])
ad.obs['library_id'] = pd.Categorical([f"image_{i}" for i in random_library_id])

In [None]:
ncem_ip = InterpreterDeconvolution()

In [None]:
ncem_ip.data = customLoaderDeconvolution(
    adata=ad, patient=None, library_id='library_id', radius=None
)

In [None]:
get_data_custom(interpreter=ncem_ip, deconvolution=True)

In [None]:
ncem_ip.get_sender_receiver_effects()

In [None]:
type_coupling = ncem_ip.type_coupling_analysis_circular(
    edge_attr='magnitude', edge_width_scale=0.2, figsize=(5,5), text_space=1.28, de_genes_threshold=25)

# EGSFR0148

In [None]:
patient_name = "EGSFR0148"

In [None]:
resdir = cell2location_results_dir / patient_name 

sample_path = resdir / patient_name

adata = sc.read_h5ad(resdir / "cell2location_map" / "sp.h5ad")

tissue_path = spatial_dir / patient_name / "spatial/tissue_positions_list.csv"
tissue_position = pd.read_csv(tissue_path,index_col=0)
tissue_position = tissue_position.loc[adata.obs_names]

#Set coordinates
x_array=tissue_position["array_row"].tolist()
y_array=tissue_position["array_col"].tolist()
x_pixel=tissue_position["pxl_row_in_fullres"].tolist()
y_pixel=tissue_position["pxl_col_in_fullres"].tolist()

x_min, x_max = np.min(x_pixel), np.max(x_pixel)
y_min, y_max = np.min(y_pixel), np.max(y_pixel)

In [None]:
cell2loc_res = pd.read_csv(cell2location_results_dir / patient_name / "cell2location_map" / "celltype_abundance.csv",index_col=0)

In [None]:
cell_types = cell2loc_res.columns.to_numpy()

cell_expression = []
node_types = []
proportions = []
spatial = []
for i, ct in enumerate(cell_types):
    proportions.append(cell2loc_res)
    cell_expression.append(adata.layers[ct].toarray())
    nt = np.zeros((cell2loc_res.shape[0], len(cell_types)))
    nt[:, i] = 1
    node_types.append(nt)
    spatial.append(adata.obsm['spatial'])
    
proportions = pd.DataFrame(np.concatenate(proportions), columns=cell_types)
cell_expression = pd.DataFrame(np.concatenate(cell_expression), columns=adata.var_names)
node_types = pd.DataFrame(np.concatenate(node_types), columns=cell_types)
spatial = pd.DataFrame(np.concatenate(spatial))

In [None]:
ad = AnnData(cell_expression)
ad.obsm['proportions'] = np.array(proportions)
ad.obsm['node_types'] = np.array(node_types)
ad.obsm['spatial'] = np.array(spatial)

ad.uns["node_type_names"] = {x: x for x in cell_types}

ad.var_names = adata.var_names

sc.pp.log1p(ad)
sc.pp.highly_variable_genes(ad, n_top_genes=2000, subset=True,)

h_0 = pd.DataFrame(ad.obsm['node_types'], columns=list(ad.uns['node_type_names'].values()))
target_type = pd.DataFrame(np.array(h_0.idxmax(axis=1)), columns=["target_cell"]).reset_index()
ad.obs = target_type

random_library_id = np.random.randint(2, size=ad.shape[0])
ad.obs['library_id'] = pd.Categorical([f"image_{i}" for i in random_library_id])

In [None]:
ncem_ip = InterpreterDeconvolution()

In [None]:
ncem_ip.data = customLoaderDeconvolution(
    adata=ad, patient=None, library_id='library_id', radius=None
)

In [None]:
get_data_custom(interpreter=ncem_ip, deconvolution=True)

In [None]:
ncem_ip.get_sender_receiver_effects()

In [None]:
type_coupling = ncem_ip.type_coupling_analysis_circular(
    edge_attr='magnitude', edge_width_scale=0.2, figsize=(5,5), text_space=1.28, de_genes_threshold=25)

# EGSFR1938_B

In [None]:
patient_name = "EGSFR1938_B"

In [None]:
resdir = cell2location_results_dir / patient_name 

sample_path = resdir / patient_name

adata = sc.read_h5ad(resdir / "cell2location_map" / "sp.h5ad")

tissue_path = spatial_dir / patient_name / "spatial/tissue_positions_list.csv"
tissue_position = pd.read_csv(tissue_path,index_col=0)
tissue_position = tissue_position.loc[adata.obs_names]

#Set coordinates
x_array=tissue_position["array_row"].tolist()
y_array=tissue_position["array_col"].tolist()
x_pixel=tissue_position["pxl_row_in_fullres"].tolist()
y_pixel=tissue_position["pxl_col_in_fullres"].tolist()

x_min, x_max = np.min(x_pixel), np.max(x_pixel)
y_min, y_max = np.min(y_pixel), np.max(y_pixel)

In [None]:
cell2loc_res = pd.read_csv(cell2location_results_dir / patient_name / "cell2location_map" / "celltype_abundance.csv",index_col=0)

In [None]:
cell_types = cell2loc_res.columns.to_numpy()

cell_expression = []
node_types = []
proportions = []
spatial = []
for i, ct in enumerate(cell_types):
    proportions.append(cell2loc_res)
    cell_expression.append(adata.layers[ct].toarray())
    nt = np.zeros((cell2loc_res.shape[0], len(cell_types)))
    nt[:, i] = 1
    node_types.append(nt)
    spatial.append(adata.obsm['spatial'])
    
proportions = pd.DataFrame(np.concatenate(proportions), columns=cell_types)
cell_expression = pd.DataFrame(np.concatenate(cell_expression), columns=adata.var_names)
node_types = pd.DataFrame(np.concatenate(node_types), columns=cell_types)
spatial = pd.DataFrame(np.concatenate(spatial))

In [None]:
ad = AnnData(cell_expression)
ad.obsm['proportions'] = np.array(proportions)
ad.obsm['node_types'] = np.array(node_types)
ad.obsm['spatial'] = np.array(spatial)

ad.uns["node_type_names"] = {x: x for x in cell_types}

ad.var_names = adata.var_names

sc.pp.log1p(ad)
sc.pp.highly_variable_genes(ad, n_top_genes=2000, subset=True,)

h_0 = pd.DataFrame(ad.obsm['node_types'], columns=list(ad.uns['node_type_names'].values()))
target_type = pd.DataFrame(np.array(h_0.idxmax(axis=1)), columns=["target_cell"]).reset_index()
ad.obs = target_type

random_library_id = np.random.randint(2, size=ad.shape[0])
ad.obs['library_id'] = pd.Categorical([f"image_{i}" for i in random_library_id])

In [None]:
ncem_ip = InterpreterDeconvolution()

In [None]:
ncem_ip.data = customLoaderDeconvolution(
    adata=ad, patient=None, library_id='library_id', radius=None
)

In [None]:
get_data_custom(interpreter=ncem_ip, deconvolution=True)

In [None]:
ncem_ip.get_sender_receiver_effects()

In [None]:
type_coupling = ncem_ip.type_coupling_analysis_circular(
    edge_attr='magnitude', edge_width_scale=0.2, figsize=(5,5), text_space=1.28, de_genes_threshold=25)

# EGSFR1938_C

In [None]:
patient_name = "EGSFR1938_C"

In [None]:
resdir = cell2location_results_dir / patient_name 

sample_path = resdir / patient_name

adata = sc.read_h5ad(resdir / "cell2location_map" / "sp.h5ad")

tissue_path = spatial_dir / patient_name / "spatial/tissue_positions_list.csv"
tissue_position = pd.read_csv(tissue_path,index_col=0)
tissue_position = tissue_position.loc[adata.obs_names]

#Set coordinates
x_array=tissue_position["array_row"].tolist()
y_array=tissue_position["array_col"].tolist()
x_pixel=tissue_position["pxl_row_in_fullres"].tolist()
y_pixel=tissue_position["pxl_col_in_fullres"].tolist()

x_min, x_max = np.min(x_pixel), np.max(x_pixel)
y_min, y_max = np.min(y_pixel), np.max(y_pixel)

In [None]:
cell2loc_res = pd.read_csv(cell2location_results_dir / patient_name / "cell2location_map" / "celltype_abundance.csv",index_col=0)

In [None]:
cell_types = cell2loc_res.columns.to_numpy()

cell_expression = []
node_types = []
proportions = []
spatial = []
for i, ct in enumerate(cell_types):
    proportions.append(cell2loc_res)
    cell_expression.append(adata.layers[ct].toarray())
    nt = np.zeros((cell2loc_res.shape[0], len(cell_types)))
    nt[:, i] = 1
    node_types.append(nt)
    spatial.append(adata.obsm['spatial'])
    
proportions = pd.DataFrame(np.concatenate(proportions), columns=cell_types)
cell_expression = pd.DataFrame(np.concatenate(cell_expression), columns=adata.var_names)
node_types = pd.DataFrame(np.concatenate(node_types), columns=cell_types)
spatial = pd.DataFrame(np.concatenate(spatial))

In [None]:
ad = AnnData(cell_expression)
ad.obsm['proportions'] = np.array(proportions)
ad.obsm['node_types'] = np.array(node_types)
ad.obsm['spatial'] = np.array(spatial)

ad.uns["node_type_names"] = {x: x for x in cell_types}

ad.var_names = adata.var_names

sc.pp.log1p(ad)
sc.pp.highly_variable_genes(ad, n_top_genes=2000, subset=True,)

h_0 = pd.DataFrame(ad.obsm['node_types'], columns=list(ad.uns['node_type_names'].values()))
target_type = pd.DataFrame(np.array(h_0.idxmax(axis=1)), columns=["target_cell"]).reset_index()
ad.obs = target_type

random_library_id = np.random.randint(2, size=ad.shape[0])
ad.obs['library_id'] = pd.Categorical([f"image_{i}" for i in random_library_id])

In [None]:
ncem_ip = InterpreterDeconvolution()

In [None]:
ncem_ip.data = customLoaderDeconvolution(
    adata=ad, patient=None, library_id='library_id', radius=None
)

In [None]:
get_data_custom(interpreter=ncem_ip, deconvolution=True)

In [None]:
ncem_ip.get_sender_receiver_effects()

In [None]:
type_coupling = ncem_ip.type_coupling_analysis_circular(
    edge_attr='magnitude', edge_width_scale=0.2, figsize=(5,5), text_space=1.28, de_genes_threshold=25)