In [2]:
%run "../head.py"

In [3]:
from pathlib import Path

import numpy as np
import pandas as pd
import plotly.express as px
from hic_basic.plot.hic import _plot_mat
from hic_basic.hicio import load_json, read_meta

In [9]:
def parse_bowtie2_log(log):
    if not log.exists():
        return pd.NA
    with open(log) as f:
        for line in f:
            if "overall alignment rate" in line:
                return float(line.split()[0][:-1])
def pp_json(filepat, samples, key_dict, dtypes = None):
    data = {
        sample : load_json(str(filepat).format(sample = sample))
        for sample in samples
        }
    df = pd.DataFrame(data).T
    df = df.rename(
        columns = {
            value : key
            for key, value in key_dict.items()
        }
    )
    key_used = list(key_dict.keys())
    df = df[key_used].astype(dtypes)
    return df
def add_mapping(sample_table_fp, ana_home):
    if isinstance(sample_table_fp, str):
        sample_table = read_meta(sample_table_fp)
    elif isinstance(sample_table_fp, pd.DataFrame):
        sample_table = sample_table_fp
    else:
        raise ValueError("sample_table_fp should be a path or a DataFrame")
    ddir = Path(ana_home)
    meta = sample_table.assign(
        mapping_rate = sample_table.apply(
            lambda row: parse_bowtie2_log(
                ddir / "logs" / f"{row.name}.mapping.log"
            ),
            axis = 1
        )
    )
    dup_info = pp_json(
        ddir / "dedup" / "{sample}.json",
        meta.index,
        dict(zip(
            ["read_num", "dup_read_num"],
            ["READ", "DUPLICATE TOTAL"]
            )),
        dtypes = {
            "read_num" : int,
            "dup_read_num" : int
        }
    )
    meta = pd.concat(
        [meta, dup_info],
        axis = 1
    )
    meta = meta.assign(
        unique_mapped = meta["read_num"] - meta["dup_read_num"],
    )
    return meta
# add_mapping(
#     "/sharec/ychi/raw/sperm70_gam/sample_table.csv",
#     "/sharec/ychi/repo/sperm70_gam"
# )
def parse_gam(gam_file, pixel=False):
    """
    Input:
        gam_file: path to gam file
            temporarily, only support .npz file
        pixel: a cooler's joint pixel format
    Output:
        mat: pandas.DataFrame
            if not pixel, a DataFrame with 2-level-multiindex (chrom, start) index and columns
    """
    npz = np.load(gam_file)
    if not pixel:
        index = pd.MultiIndex.from_frame(
            pd.DataFrame(
                {
                    "chrom" : pd.Series(npz["windows_0"][:,0], dtype="string"),
                    "start" : pd.Series(npz["windows_0"][:,1], dtype="int")
                }
                )
        )
        columns = pd.MultiIndex.from_frame(
            pd.DataFrame(
                {
                    "chrom" : pd.Series(npz["windows_1"][:,0], dtype="string"),
                    "start" : pd.Series(npz["windows_1"][:,1], dtype="int")
                }
                )
        )
        mat = pd.DataFrame(
            npz["scores"],
            index=index,
            columns=columns,
            dtype = "float"
        )
    else:
        values = npz["windows_0"]
        pos1 = pd.DataFrame(
            {
                "chrom1" : pd.Series(values[:,0], dtype="string"),
                "start1" : pd.Series(values[:,1], dtype="int"),
                "end1" : pd.Series(values[:,2], dtype="int")
            }
        )
        values = npz["windows_1"]
        pos2 = pd.DataFrame(
            {
                "chrom2" : pd.Series(values[:,0], dtype="string"),
                "start2" : pd.Series(values[:,1], dtype="int"),
                "end2" : pd.Series(values[:,2], dtype="int")
            }
        )
        mat = pd.DataFrame(
            npz["scores"],
            dtype = "float"
            ).stack().reset_index()
        mat.columns = ["sub_id1","sub_id2","count"]
        mat = pd.merge(
            mat,
            pos1,
            left_on="sub_id1",
            right_index=True
        )
        mat = pd.merge(
            mat,
            pos2,
            left_on="sub_id2",
            right_index=True
        )
        mat = mat[
            ["chrom1","start1","end1","chrom2","start2","end2","count"]
            ]
    return mat
# ddir = Path("/sharec/ychi/repo/sperm71_gam")
# npz = np.load(ddir / "matrix" / "1000000" / "chr1.npz")

In [4]:
# ddir = Path("/sharec/ychi/repo/sperm71_gam")
# npz = np.load(ddir / "matrix" / "1000000" / "chr1.npz")

In [1]:
def plot_gam_heatmap(ana_home, task, chrom, gam_file=None, title="TITLE", binsize=500000, qmax=0.85, qmin=0.05, cmap="RdBu_r"):
    if gam_file is None:
        gam_file = Path(ana_home) / "matrix" / f"{binsize}" / "npmi" / f"{task}.{chrom}__{chrom}.npz"
    mat = parse_gam(gam_file, pixel=False)

    data = pd.DataFrame(mat.values).stack()
    print(data.quantile(qmin),data.quantile(qmax))
    fig = _plot_mat(
        mat,
        title=title,
        cmap=cmap,
        donorm = False,
        ignore_diags = False,
        fillna = False,
        zmax = data.quantile(qmax),
        zmin = data.quantile(qmin)
    )
    fig.update_xaxes(
        visible=False
    )
    fig.update_yaxes(
        visible=False
    )
    fig.update_layout(
        height = 800,
        width = 800,
    )
    #fig.show(renderer="png")
    return fig

### compartment functions

In [None]:
import cooler
from hic_basic.compartment import compartments
from hic_basic.coolstuff import cool2mat
def ab_norm(data, a, b):
    return (data - a) / (b - a)
def normTo01(data):
    return ab_norm(data, np.min(data), np.max(data))
def norm_PC_for_gam(orig_data):
    data = orig_data.copy()
    positive = data[data>0]
    data[data>0] = normTo01(positive)
    negative = data[data<0]
    data[data<0] = -normTo01(negative)
    return data
def GAM_compartment(gam_mat, ref_track, give_corr=False, debug=False):
    ref_track = ref_track.rename("GC")
    PCs = compartments(
        gam_mat,
        normalize=False,
        nPCs=3
    )
    PCs.index = gam_mat.index
    PCs_e = pd.concat([PCs, ref_track], axis=1)
    corrs = []
    for PC in PCs:
        corr = PCs_e[PC].corr(PCs_e["GC"])
        if debug:
            print(corr)
        corrs.append(abs(corr))
        sig = -1 if corr < 0 else 1
        PCs[PC] = sig * PCs[PC]
    max_corr_PC = PCs.columns[np.argmax(corrs)]
    if debug:
        print(max_corr_PC)
    normed_PC = norm_PC_for_gam(PCs[max_corr_PC])
    if not give_corr:
        return normed_PC
    else:
        return normed_PC, max(corrs)
def GAM_compartments(gam_file, ref_track, min_nz=10, bad_bins=None):
    """
    Calculate compartmentalization from GAM file
    Input:
        gam_file: str, path to GAM cool file
        ref_track: pd.Series, reference track
    Output:
    """
    chroms = cooler.Cooler(str(gam_file)).chromnames
    eigen_list = []
    for chrom in chroms:
        mat = cool2mat(gam_file, chrom, balance=False, min_nz=10, bad_bins=bad_bins)
        eigen = GAM_compartment(mat, ref_track)
        eigen.name = "eigen"
        eigen = eigen.reset_index()
        eigen["AB"] = "A"
        eigen.loc[eigen["eigen"] < 0, "AB"] = "B"
        eigen_list.append(eigen)
    eigen_all = pd.concat(eigen_list)
    return eigen_all

# gam_file = h.ddir / "Sperm_hg.gam.1m.cool"
# ref_track = pd.read_table(
#     h.ddir / "GRCh38.gc.1m.tsv",
#     index_col=["chrom","start"]
# )["GC"]
# eigen = GAM_compartments(gam_file, ref_track)

In [None]:
import plotly.graph_objs as go
from plotly.subplots import make_subplots

from hic_basic.compartment import compartments
from hic_basic.plot.utils import filling_l2r_plotly
# def add_eig_track(fig, orig_data, eig_col=None, row=None, col=None):
#     assert (row is None) == (col is None), "row and col must be both None or not None"
#     data = orig_data.copy()
#     if row is not None:
#         subplot_kwargs = dict(row=row, col=col)
#     else:
#         subplot_kwargs = {}
#     if isinstance(data, pd.Series):
#         data = data.rename("eig").reset_index()
#         eig_col = "eig"
#     else:
#         if eig_col is None:
#             eig_col = "eig"
#     bar_width = data["start"].diff().mode()[0]
#     if "AB" not in data.columns:
#         data = data.assign(
#             AB = "A"
#         )
#         data.loc[data[eig_col] < 0, "AB"] = "B"
#     eig_track_fig = px.bar(
#         data,
#         x = "start",
#         y = eig_col,
#         color = "AB",
#         color_discrete_map={"B":'rgb(33,102,172)',"A":'rgb(178,24,43)'},
#     )
#     eig_track_fig.update_traces(
#         showlegend=False,
#         width = bar_width,
#         marker=dict(line=dict(
#             width=0,
#             color = "purple"
#             ))
#     )
#     for trace in eig_track_fig.data:
#         fig.add_trace(trace, **subplot_kwargs)
#     return fig

In [None]:
import xarray as xr
from hic_basic.plot.utils import plotly_fig2array

def tiling_mat_png(lower_fig, upper_fig, rot=True, add_xy=False):
    """
    Tiling two heatmap together using a direct png tiling.
    Input:
        lower: Figure to be placed at the bottom
        upper: Figure to be placed at the top
        rot: whether to rotate 90 degree anti-clockwise, tiling and rotate back
            use this when your figure is plot by go.Heatmap, which start from lower left corner
        add_xy: whether to add x and y axis
    Output:
        img: np.array of the tiling image, if add_xy, give xarray with proper coordinates
    TODO:
        1.use a better interpolation method for x and y to suit non-linear x and y
        2. don't just give array, consider direct plotly figure, Image for more consistent downstream processing
    """
    half_fig_list = [lower_fig, upper_fig]
    for i, half_fig in enumerate(half_fig_list):
        #print(type(data))
        #print(data[0]["x"])
        width = half_fig.data[0]["x"].shape[0]
        height = half_fig.data[0]["y"].shape[0]
        # change to pure heatmap
        half_fig = half_fig.update_layout(
            plot_bgcolor = "white",
            height = width*5,
            width = height*5,
            margin = dict(
                l = 0,
                r = 0,
                b = 0,
                t = 0,
            ),
            xaxis = dict(
                showgrid=False,
                zeroline=False,
                showticklabels=False
            ),
            yaxis = dict(
                showgrid=False,
                zeroline=False,
                showticklabels=False
            )
        )
        half_fig_list[i] = half_fig
        #half_fig.show(renderer="png")
    lower_img, upper_img = [plotly_fig2array(half_fig) for half_fig in half_fig_list]
    # assert same channel
    assert lower_img.shape[2] == upper_img.shape[2]
    # concat arrays against diagonal
    channels = []
    for i in range(lower_img.shape[2]):
        if rot:
            channel = np.triu(np.rot90(lower_img[:,:,i], k=1)) + np.tril(np.rot90(upper_img[:,:,i], k=1))
        else:
            channel = np.triu(lower_img[:,:,i]) + np.tril(upper_img[:,:,i])
        channels.append(channel)
    img = np.stack(channels, axis=2)
    img = np.rot90(img, k=-1) # didn't rotate back at this stage?
    if add_xy:
        x0, x1 = lower_fig.data[0]["x"][0], lower_fig.data[0]["x"][-1]
        x = np.linspace(x0, x1, img.shape[1]) # col/axis2 for np array
        # y is reversed when transformed from plotly to np array
        y0, y1 = upper_fig.data[0]["y"][-1], upper_fig.data[0]["y"][0]
        y = np.linspace(y0, y1, img.shape[0])
        img = xr.DataArray(
            img,
            dims = ("y", "x", "channel"),
            coords = {"x": x, "y": y, "channel" : ["r", "g", "b", "a"]}
        )
    return img
# chrom = "chr1"
# hic_file = coolps["Sperm_hg"]
# gam_file = coolps["Sperm_hg_gam"]

# # --- extract hic and gam matrices --- #
# hic_mat = cool2mat(coolps["Sperm_hg"],chrom,balance=True)
# clr = cooler.Cooler(coolps["Sperm_hg"])
# bins = clr.bins()[:]
# bad_bins = bins.loc[bins["weight"].isna(),:]
# gam_mat = cool2mat(coolps["Sperm_hg_gam"],chrom,balance=False, bad_bins=bad_bins)
# # --- plot --- #
# mat = hic_mat
# hic_fig = _plot_mat(
#     mat,
#     ignore_diags = False,
#     donorm = True,
#     cmap = "Turbo",
#     range_for_balance = True,
#     zmin = 0,
#     zmax = 0.6
# )
# mat = gam_mat
# gam_fig = _plot_mat(
#     mat,
#     title="",
#     cmap="Turbo",
#     donorm = True,
#     range_for_balance = True,
#     ignore_diags = False,
#     fillna = False,
#     zmin = 0,
#     zmax = 0.8
# )
# # --- tiling heatmaps --- #
# img = tiling_mat_png(hic_fig, gam_fig, rot=True, add_xy=True)
# fig = px.imshow(img)
# fig.show(renderer="png")
def plot_tilling_gam_hic(hic_file, gam_file, region, add_xy=True, matrix_only=False, hic_balance=True, hic_kwargs={}, gam_kwargs={}):
    # --- extract hic and gam matrices --- #
    # use hic file to get bad bins
    clr = cooler.Cooler(hic_file)
    bins = clr.bins()[:]
    bad_bins = bins.loc[bins["weight"].isna(),:]
    gam_mat = cool2mat(gam_file,region,balance=False, bad_bins=bad_bins)
    #print(hic_file)
    hic_mat = cool2mat(hic_file,region,balance=hic_balance, bad_bins=bad_bins)
    # --- plot --- #
    mat = hic_mat
    default_hic_kwargs = {
        "cmap" : "Turbo",
        "donorm" : True,
        "range_for_balance" : True, # use balacend matrix
        "zmin" : 0,
        "zmax" : 0.6
    }
    hic_fig = _plot_mat(
        mat,
        ignore_diags = False,
        **{**default_hic_kwargs, **hic_kwargs}
    )
    mat = gam_mat
    default_gam_kwargs = {
        "cmap" : "Turbo",
        "donorm" : True,
        "range_for_balance" : True, # use balanced matrix
        "zmin" : 0,
        "zmax" : 0.8
    }
    gam_fig = _plot_mat(
        mat,
        title="",
        ignore_diags = False,
        fillna = False,
        **{**default_gam_kwargs, **gam_kwargs}
    )
    # --- tiling heatmaps --- #
    img = tiling_mat_png(hic_fig, gam_fig, rot=True, add_xy=add_xy)
    if matrix_only:
        return img
    fig = px.imshow(
        img,
        # if img is xarray, "origin" of imshow will be locked at lower left
        labels = {"x": "Hi-C", "y": "GAM"},
        )
    fig.update_xaxes(
        showline=False, # temporary hide lines, look bad for telomere regions
        linewidth=1,
        linecolor='black',
        mirror=True,
    )
    fig.update_yaxes(
        showline=False, # temporary hide lines
        linewidth=1,
        linecolor='black',
        mirror=True,
    )
    return fig
# chrom = "chr1"
# hic_file = coolps["Sperm_hg"]
# gam_file = coolps["Sperm_hg_gam"]
# fig = plot_tilling_gam_hic(hic_file, gam_file, chrom, add_xy=True)
# fig.show(renderer="png")