In [None]:
import os

from glob import glob

import scanpy as sc
from keggtools import Pathway, Renderer, Resolver, Storage

BASE_DIR = os.getcwd()
DATA_DIR = os.path.join(BASE_DIR, "data")

CHECKPOINT_DIR = os.path.join(DATA_DIR, "checkpoints")

PROCESSED_DIR = os.path.join(DATA_DIR, "processed")
PDF_DIR = os.path.join(PROCESSED_DIR, "pdf")
NOTEBOOK_DIR = os.path.join(BASE_DIR, "notebooks")

RAW_DATA_DIR = os.path.join(DATA_DIR, "raw")

PROJECT_NAME = "CropSeq-19"

def sfile(filename):
    _fname = os.path.join(PDF_DIR, f"{PROJECT_NAME}_merged_{filename}")
    print(f"File save at '{_fname}'")
    return _fname


# Checkpoint handling functions

def save_checkpoint(adata_obj, filename, overwrite=False):
    filename = os.path.join(CHECKPOINT_DIR, filename)
    if os.path.isfile(filename) and not overwrite:
        raise FileExistsError(f"File '{filename}' already exists")
    adata_obj.write_h5ad(filename)

def load_checkpoint(filename):
    filename = os.path.join(CHECKPOINT_DIR, filename)
    if not os.path.isfile(filename):
        raise FileNotFoundError(f"Cant find file '{filename}'")
    return sc.read_h5ad(filename)

def list_checkpoints():
    found_checkpoints = glob(os.path.join(CHECKPOINT_DIR, "*"))
    found_checkpoints = [os.path.split(filename)[1] for filename in found_checkpoints]
    print(f"Found {len(found_checkpoints)} checkpoint files in dir '{CHECKPOINT_DIR}'")
    return found_checkpoints

### Load data

In [None]:
adata_concat = load_checkpoint("Cropseq_all_integrated_murine__gRNA_integrated_seuratObject.h5ad")
adata_concat.raw = adata_concat

### Load annotations

In [None]:
annotation = sc.queries.biomart_annotations(
    "mmusculus",
    ["mgi_symbol", "entrezgene_id", "ensembl_gene_id"],
    use_cache=True
).set_index("mgi_symbol")

### Differential expression

In [None]:
guide_groups = list(
    adata_concat.obs[(adata_concat.obs["gRNA_group"] != "control")]["gRNA_group"].unique()
)

sc.tl.rank_genes_groups(
    adata_concat,
    groupby="gRNA_group",
    reference="control",
    groups=guide_groups,
    method="t-test",
)

### Keggtools enrichment analysis

In [None]:
resolver = Resolver(cache=Storage("mmu_pathways"))

In [None]:
with open(os.path.join(os.getcwd(), "mmu-regulatory-network.xml"), "r") as f:
    custom_pathway = Pathway.parse(f.read())

In [None]:
MAX_LOG2FC = 2

for target in guide_groups:

    df = sc.get.rank_genes_groups_df(adata_concat, group=target)

    df = df[df["pvals"] < 0.05]

    print(target)
    diff_expr_dict = {}
    anno_dict = annotation[annotation.index.isin(df["names"]) & (~annotation["entrezgene_id"].isna())]["entrezgene_id"].astype(int).to_dict()

    for gene_symbol, logfold in dict(zip(df["names"], df["logfoldchanges"])).items():
        if gene_symbol in anno_dict.keys():
            if str(anno_dict[gene_symbol]) in custom_pathway.get_genes():
                diff_expr_dict[str(anno_dict[gene_symbol])] = logfold

    for k, v in diff_expr_dict.items():
        if v < -MAX_LOG2FC:
            diff_expr_dict[k] = -MAX_LOG2FC

    renderer = Renderer(kegg_pathway=custom_pathway, gene_dict=diff_expr_dict, cache_or_resolver=resolver)

    renderer.render()


    binary_data = renderer.to_binary("png")

    # save string
    open(sfile(f"{target}-KEGG-custom-pathway-4.dot"), "w").write(renderer.to_string())

    # dot to SVG
    os.system("dot " + sfile(f"{target}-KEGG-custom-pathway-4.dot") + " -Tsvg > " + sfile(f"{target}-KEGG-custom-pathway-4.svg"))

    # Render legends
    legend_string = f"""<?xml version="1.0" standalone="no"?>
    <svg height="200" width="300" version="1.1" baseProfile="full" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns:ev="http://www.w3.org/2001/xml-events">
    <defs>
        <linearGradient id="cmap" x1="0%" y1="0%" x2="0%" y2="100%">
            <stop offset="0%" style="stop-color:rgb(255,0,0);stop-opacity:1" />
            <stop offset="50%" style="stop-color:rgb(255,255,255);stop-opacity:1" />
            <stop offset="100%" style="stop-color:rgb(0,0,255);stop-opacity:1" />
        </linearGradient>
    </defs>
    <g>
        <rect x="20" y="50" width="20" height="100" fill="url(#cmap)" />
        <rect x="20" y="50" width="20" height="100" style="stroke:black;stroke-width:2;fill-opacity:0;stroke-opacity:1" />
        <text x="55" y="150" fill="black" alignment-baseline="central">{0 if len(list(diff_expr_dict.values())) <= 0 else min(list(diff_expr_dict.values()))}</text>
        <text x="55" y="100" fill="black" alignment-baseline="central">0</text>
        <text x="55" y="50" fill="black" alignment-baseline="central">{0 if len(list(diff_expr_dict.values())) <= 0 else max(list(diff_expr_dict.values()))}</text>

        <line x1="40" y1="50" x2="50" y2="50" style="stroke:rgb(0,0,0);stroke-width:2" />
        <line x1="40" y1="100" x2="50" y2="100" style="stroke:rgb(0,0,0);stroke-width:2" />
        <line x1="40" y1="150" x2="50" y2="150" style="stroke:rgb(0,0,0);stroke-width:2" />
    </g>
    </svg>"""

    open(sfile(f"{target}-KEGG-custom-pathway-legend-4.svg"), "w").write(legend_string)