# Quality control of Xenium data

Only one sample is anlysed in this notebook.

__original__ = "2024-11-05 Tue 13:16:29 GMT"

__created__ = "2025-02-04 Tue 15:17:52 GMT"

__updated__ = "2025-02-06 Thu 10:01:41 GMT"

__version__ = "0.0.9"

__status__ = "Prototype"

__license__ = "GPL"

__maintainer__ = "Ciro Ramírez-Suástegui"

__author__ = "Ciro Ramírez-Suástegui"

__email__ = "cs59@sanger.ac.uk, cramsuig@gmail.com"

### Structure <a class="anchor" id="menu"></a>

* [Global configuration](#bullet1)
* [Loading data](#bullet2)
* [Pre-processing](#bullet3)
* [Main](#bullet4)
* [Conclusions](#bullet5)
* [Save](#bullet6)

## Environment setup
---

### Jupyter extensions

In [4]:
import importlib.util as importlib_util
style = importlib_util.find_spec("lab_black")
try:
    is_pynb = is_pynb = "nteractive" in get_ipython().__class__.__name__
except NameError:
    is_pynb = None

In [5]:
if is_pynb is not None:
    print("Loading extension autoreload")
    %load_ext autoreload
    %autoreload 2
if is_pynb is not None and style is not None:
    print("Loading extension lab_black")
    %load_ext lab_black

Loading extension autoreload


### Basic modules

In [1]:
import os, sys, warnings, re
import datetime  # add current date to output

In [2]:
import pickle  # saving objects
import session_info  # show session information
from io import StringIO  # capture print output

### Working path

Needed to set the working directory to the project's root folder. This let's you
run the script from any location and have the correct paths (code, results).

In [None]:
try:
    __file__
except NameError:
    temp = ["__session__", "__vsc_ipynb_file__"]
    __file__ = [globals()[i] for i in temp if i in globals().keys()]
    if len(__file__) == 0:
        print("Fetching current directory")
        __file__ = os.path.abspath("")  # pathlib.Path().cwd(), globlals()['_dh']
        temp = ["", os.path.basename(os.path.expanduser("~"))]
        if os.path.basename(__file__) in temp:
            print("calling 'getcwd'")
            __file__ = os.path.abspath(os.getcwd())
        __file__ = __file__ + "/quality_control.ipynb"
    else:
        __file__ = __file__[0]
    __file__ = os.path.realpath(__file__)

In [None]:
os.chdir(os.path.dirname(os.path.realpath(__file__)))  # sit in file's path first
project_path = os.popen("git rev-parse --show-toplevel 2>&1").read().rstrip()
if project_path == "" or re.findall(r"fatal|bound", project_path):
    print("Not a git repository, using __file__")
    project_path = os.path.dirname(__file__)
# if where I am sitting is different from the expected project path
# find which is the shortest path
nchars = [len(i) for i in [os.getcwd(), project_path]]
if nchars[0] > nchars[1]:
    print("Changing to project_path")
    os.chdir(project_path)

### Logging configuration

In [3]:
import logging

In [None]:
logger_format = "[%(asctime)s] %(name)s %(levelname)-2s [%(filename)s:%(funcName)s:%(lineno)s] %(message)s"
logger_file = os.path.splitext(os.path.basename(__file__))[0]
logger_file = datetime.datetime.now().strftime(
    os.path.abspath(f".logs/{logger_file}_%Y%m%d%H%M%S")
)

In [None]:
logging.basicConfig(
    filename=logger_file,
    format=logger_format,
    datefmt="%Y-%m-%d %H:%M:%S",
    level=logging.INFO,
)
logging.addLevelName(
    logging.INFO, "\033[1;32m%s\033[1;0m" % logging.getLevelName(logging.INFO)
)
# set up logging to console too
console = logging.StreamHandler(sys.stdout)  # send to output (no red background)
console.setLevel(logging.DEBUG)
# set a format for console use
formatter = logging.Formatter(logger_format)
console.setFormatter(formatter)
# add the handler to the root logger
logging.getLogger("").addHandler(console)
logger = logging.getLogger(__name__)

In [None]:
logging.info(f"Logging to:\n{logger_file}")

In [None]:
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=UserWarning)

### Tool (packaged) modules

In [5]:
import numpy as np  # often not necessary
import pandas as pd  # # check for DataFrame type
import scanpy as sc  # basic analyses
import matplotlib.pyplot as plt  # manage plotting styles and parameters
import matplotlib.figure as mplfig  # check for Figure type

In [6]:
import anndata as ad
import scipy.sparse as sp
import seaborn as sns
import squidpy as sq



### Presenting environment

In [None]:
# fmt: off
bpaths = ".*" + os.environ.get('USER', os.environ.get('USERNAME'))
bpaths = bpaths + "|.os.py|.*mamba|.*conda|.*projects"
logger.info(f'Environ: {re.sub(bpaths, "", os.__file__)}')
logger.info(f'Project: {re.sub(bpaths, "", os.getcwd())} (current path)')
logger.info(f'Routine: {re.sub(bpaths, "", __file__)}'); del bpaths
logger.info(os.popen("echo Machine: ${HOSTNAME} ${SLURMD_NODENAME} ${PBS_NODEFILE}").read().rstrip())
# fmt: on

### In-house/developing modules

In [None]:
sys_path_add = ["code/"]
[sys.path.append(i) for i in sys_path_add if i not in sys.path]
# import filename as shortname

### Session information

In [None]:
stdoutt = StringIO()  # issue with print, capture string and send to logger
stdout0 = sys.stdout
sys.stdout = stdoutt
session_info.show(
    dependencies=True,
    html=False,
    excludes=[
        "builtins",
        "stdlib_list",
        "importlib_metadata",
        # Special module present if test coverage being calculated
        # https://gitlab.com/joelostblom/session_info/-/issues/10
        "$coverage",
    ],
)
session_info_str = stdoutt.getvalue()
logger.info(f"Session info:\n{session_info_str}")
sys.stdout = stdout0
del session_info_str, stdoutt, stdout0

## [Global configuration](#menu) <a class="anchor" id="bullet1"></a>
---

### Variables and paths

In [None]:
action_name = "qc"
indata_name = "temp"
result_name = (
    datetime.datetime.now()
    .strftime("{name}_%Y%m%d-%H%M%S")
    .format(name=f"{indata_name}_{action_name}")
)

In [None]:
inputs_path = "/Users/vm11/SKIN_WARTS/data"
extent_strn = "h5ad"
inputs_file = sample_path

In [None]:
output_resu = os.path.join("./results", f"{result_name}")
output_figs = os.path.join("./results", f"{result_name}")
output_name = re.sub(f".{extent_strn}|anyelse", "", indata_name)
output_file = os.path.join(
    inputs_path, "processed/xenium", f"{output_name}_{action_name}.{extent_strn}"
)

In [None]:
OUTPUTS = dict()

In [None]:
%whos str dict

### Visualisation parameters

In [None]:
# https://github.com/scverse/scanpy/blob/be99b230fa84e077f5167979bc9f6dacc4ad0d41/src/scanpy/plotting/_rcmod.py#L12
# https://github.com/scverse/scanpy/blob/a91bb02b31a637caeee77c71dcd9fbce8437cb7d/src/scanpy/_settings.py#L419-L509
rcParams_dict = {
    "figure.autolayout": True,  # use `tight_layout`
    "figure.dpi": 80,  # scanpy:80
    "figure.figsize": (4, 4),  # scanpy:4x4
    "figure.frameon": False,
    "grid.linestyle": "dotted",
    "grid.color": "#f2f2f2",
    "font.size": 10,  # scanpy:14
    # https://matplotlib.org/stable/users/explain/colors/colormaps.html
    "image.cmap": "viridis",
    "lines.markersize": 2,  # dotplot size
    "savefig.dpi": 150,  # default is 'figure', scanpy:150
    "savefig.bbox": "tight",
    "savefig.transparent": True,
}

In [None]:
sc.settings.set_figure_params(dpi_save=rcParams_dict["savefig.dpi"])
sc.settings.set_figure_params(
    **{
        re.sub("figure|\.", "", k): rcParams_dict[k]
        for k in rcParams_dict.keys()
        if re.match("figure|font", k) and re.match("^((?!layout).)*$", k)
    }
)
sc.settings.figdir = output_figs

In [None]:
# plt.style.available or path (.config/rparams_analysis.mplstyle)
plt.style.use("seaborn-v0_8-colorblind")
rcmax = max([len(i) for i in list(rcParams_dict.keys())])
for i in rcParams_dict.keys():
    i_def = "def:" + str(plt.rcParamsDefault[i]).rjust(10, " ")
    i_new = str(plt.rcParams[i]).rjust(10, " ")
    temp = " ".join([i.ljust(rcmax, " "), i_def, "|", i_new, ">"])
    plt.rcParams[i] = rcParams_dict[i]
    temp = " ".join([temp, str(plt.rcParams[i])])
    logger.info(temp)

## [Loading data](#menu) <a class="anchor" id="bullet2"></a>
---

In [None]:
print(os.getcwd())
temp = os.path.abspath(f"{inputs_file}/cell_feature_matrix.h5")
#adata = sc.read_10x_h5(f"{inputs_file}/cell_feature_matrix.h5")
adata = sc.read_10x_h5(temp)


In [None]:
print(os.getcwd())

In [None]:
logger.info("\n" + adata.__str__())

In [None]:
mdata = pd.read_csv(f"{inputs_file}/cells.csv.gz")

In [None]:
logger.info("\n" + mdata.iloc[:5, :5].__str__())

## [Pre-processing](#menu) <a class="anchor" id="bullet3"></a>
---

In [None]:
mdata.set_index(adata.obs_names, inplace=True)
adata.obs = mdata.copy()

In [None]:
adata.obsm["spatial"] = adata.obs[["x_centroid", "y_centroid"]].copy().to_numpy()

In [None]:
logger.info("Object with medatata:\n" + adata.__str__())

## [Main](#menu) <a class="anchor" id="bullet4"></a>
---

In [None]:
sc.pp.calculate_qc_metrics(adata, percent_top=(10, 20, 50, 150), inplace=True)

In [None]:
cprobes = (
    adata.obs["control_probe_counts"].sum() / adata.obs["total_counts"].sum() * 100
)
cwords = (
    adata.obs["control_codeword_counts"].sum() / adata.obs["total_counts"].sum() * 100
)
logger.info(f"Negative DNA probe count % : {cprobes:.4f}")
logger.info(f"Negative decoding count % : {cwords:.4f}")

In [None]:
fig, axs = plt.subplots(1, 4, figsize=(16, 4))

axs[0].set_title("Total transcripts per cell")
sns.histplot(
    adata.obs["total_counts"],
    kde=False,
    ax=axs[0],
)

axs[1].set_title("Number of genes detected per cell")
sns.histplot(
    adata.obs["n_genes_by_counts"],
    kde=False,
    ax=axs[1],
)

axs[2].set_title("Area of segmented cells (μm^2)")
sns.histplot(
    adata.obs["cell_area"],
    kde=False,
    ax=axs[2],
)

axs[3].set_title("Nucleus ratio")
sns.histplot(
    adata.obs["nucleus_area"] / adata.obs["cell_area"],
    kde=False,
    ax=axs[3],
)
OUTPUTS["hist_qc-metrics"] = fig

Plot distribution of total cells per gene per segmentation method

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(15, 4))
sns.countplot(
    data=adata.obs, y="segmentation_method", hue="segmentation_method", ax=axs[0]
)
axs[1].set_title("Number of cells per gene")
sns.histplot(
    adata.var["n_cells_by_counts"], kde=False, bins=1000, ax=axs[1]
);  # fmt: skip
OUTPUTS["hist_segmentation"] = fig

In [None]:
%%time
plot_columns = ["total_counts", "n_genes_by_counts"]
fig, axs = plt.subplots(ncols=len(plot_columns), figsize=(8, 4))
for i, v in enumerate(plot_columns):
    col_var = re.sub("_", "-", v)
    fname = f"scatter_spatial_{col_var}"
    sq.pl.spatial_scatter(
        adata,
        library_id="spatial",
        shape=None,
        color=v,
        ax=axs[i],
    );
OUTPUTS[fname] = fig

### Necrosis

In [None]:
# Change the upper bound of the range to where you think makes sense.
hist_pct20 = round(max(adata.obs["n_genes_by_counts"]) / 5)
ax = plt.hist(
    adata.obs["n_genes_by_counts"], bins=round(hist_pct20 / 10), range=(0, hist_pct20)
)
fig = plt.gcf()
n, bins = ax[0], ax[1]
min_index = n.argmin()
necrosis_cutoff_genes = (bins[min_index] + bins[min_index + 1]) / 2
plt.axvline(x=necrosis_cutoff_genes, color="#800f0f");  # fmt: skip
OUTPUTS["necrosis-cutoff_n-genes-by-counts"] = fig

In [None]:
hist_median50pct = round(np.median(adata.obs["total_counts"]) / 2)
ax = plt.hist(
    adata.obs["total_counts"], bins=round(hist_pct20 / 10), range=(0, hist_median50pct)
)
fig = plt.gcf()
n, bins = ax[0], ax[1]
min_index = n.argmin()
necrosis_cutoff_transcripts = (bins[min_index] + bins[min_index + 1]) / 2
plt.axvline(x=necrosis_cutoff_transcripts, color="#800f0f");  # fmt: skip
OUTPUTS["necrosis-cutoff_total-counts"] = fig

It looks like necrosis doesn't show with this panel

In [None]:
adata.obs["necrotic"] = (adata.obs["n_genes_by_counts"] < necrosis_cutoff_genes) | (
    adata.obs["total_counts"] < necrosis_cutoff_transcripts
)

In [None]:
fname = f"scatter_spatial_necrosis"
OUTPUTS[fname] = sq.pl.spatial_scatter(
    adata,
    library_id="spatial",
    shape=None,
    color="necrotic",
    return_ax=True,
).get_figure()

### Minimal filters

In [None]:
filter_cells_by_genes = 3
logger.info(
    f"Filtering out cells with fewer than {filter_cells_by_genes} genes expressed."
)
sc.pp.filter_cells(adata, min_genes=3)

In [None]:
filter_genes_by_counts = 3
logger.info(f"Filtering out genes with less than {filter_genes_by_counts} counts.")
sc.pp.filter_genes(adata, min_counts=filter_genes_by_counts)

In [None]:
filter_genes_by_obs_n = 3  # round(0.0001 * len(adata.obs_names))
logger.info(
    f"Filtering out genes expressed in fewer than {filter_genes_by_obs_n} cells."
)
sc.pp.filter_genes(adata, min_cells=filter_genes_by_obs_n)

In [None]:
adata

### Processing counts

In [None]:
adata.layers["counts"] = adata.X.copy()

In [None]:
sc.pp.normalize_total(adata, target_sum=10000, inplace=True)

In [None]:
sc.pp.log1p(adata)

In [None]:
logger.info("\n" + adata.__str__())

## [Conclusions](#menu) <a class="anchor" id="bullet5"></a>
---

In [None]:
temp = list(OUTPUTS.keys())
logger.info(f"Outputs to write ({len(temp)}):\n " + "\n ".join(temp))

In [None]:
for i in temp:
    if len(re.findall(r"params|colours", i)) > 0:
        del OUTPUTS[i]

In [None]:
temp = list(OUTPUTS.keys())
logger.info(f"Outputs to write ({len(temp)}):\n " + "\n ".join(temp))

## [Save](#menu) <a class="anchor" id="bullet6"></a>
---

In [None]:
pflag = " \033[1;32m√\033[0m" # show file was successfully saved
rflag = 0; fflag = 0 # give each file a numerical prefix
for filename, item in OUTPUTS.items():
    eflag = " \033[1;31mX\033[0m" # show if file failed to save
    # output_resu vs output_figs
    if isinstance(item, (pd.DataFrame, dict)):
        fname = os.path.join(output_resu, f"{str(rflag).zfill(2)}_{filename}")
        rflag += 1
    else:
        fname = os.path.join(output_figs, f"{str(fflag).zfill(2)}_{filename}")
        fflag += 1
    oflag = "Storing" + str(type(item)) + "\n" + fname
    # create directory if it does not exist
    if not os.path.isdir(os.path.dirname(fname)):
        os.makedirs(os.path.dirname(fname))
    # add extension and save based on type() # ---------------------------------
    if isinstance(item, (mplfig.Figure, tuple)):
        (item[0] if isinstance(item, tuple) else item).savefig(f"{fname}.png")
        plt.close(); eflag = pflag
    elif isinstance(item, pd.DataFrame):
        item.to_csv(f"{fname}.csv"); eflag = pflag
    elif isinstance(item, dict):
        with open(f"{fname}.pickle", 'wb') as handle:
            pickle.dump(item, handle, protocol=pickle.HIGHEST_PROTOCOL)
        eflag = pflag
    elif item is not None:
        try:
            item.savefig(f"{fname}.png")
        except:
            pass
        plt.close(); eflag = pflag
    logger.info(f"{oflag}{eflag}")

In [None]:
logger.info(f"Saving to:\n{output_file}")

In [None]:
adata.write(output_file)

In [None]:
%%bash -s "$project_path" "$result_name"
echo "Notebook at '$1' completed '$2'" | mail -s 'Jupyter notebook' ${USER}@$(cat /etc/mailname)

Done.