# Experimental Counting Optimization

In [1]:
bamfile = "/mnt/workspace2/jdetlef/data/public_data/heart_left_ventricle_194_CB_tagged.bam"
fragments_file = "/mnt/workspace2/jdetlef/data/public_data/fragments_heart_left_ventricle_194_sorted.bed"
h5ad_file = "/mnt/workspace2/jdetlef/data/public_data/heart_lv_SM-JF1NY.h5ad"

In [2]:
import sctoolbox.tools as tools

Unable to determine R home: [Errno 2] No such file or directory: 'R'


In [120]:
# individual imports
import episcanpy as epi
import pandas as pd
import gzip
import datetime
import multiprocessing as mp
from multiprocessing import Manager, Lock, Pool

from beartype import beartype
from beartype.typing import Any, Optional

In [4]:
adata = epi.read_h5ad(h5ad_file)
adata

AnnData object with n_obs × n_vars = 9110 × 1154611
    obs: 'logUMI', 'tsse', 'tissue', 'cell type', 'Life stage', 'closest Cell Ontology term(s)', 'Cell Ontology ID'
    var: 'Chromosome', 'hg38_Start', 'hg38_End', 'Class', 'Present in fetal tissues', 'Present in adult tissues', 'CRE module'

In [6]:
adata_barcodes = adata.obs.index.tolist()

In [7]:
%%time
# split index for barcodes CBs
barcodes = []
for entry in adata_barcodes:
    barcode = entry.split('+')[1]
    barcodes.append(barcode)

CPU times: user 2.3 ms, sys: 0 ns, total: 2.3 ms
Wall time: 2.31 ms


In [None]:
def count_lines(filename):
    with open(filename, 'r') as file:
        return sum(1 for line in file)

In [None]:
%%time
# Replace 'yourfile.txt' with the path to your file
number_of_lines = count_lines(fragments_file)
print(f"Total number of lines: {number_of_lines}")

In [8]:
small_fragments = '/mnt/workspace2/jdetlef/data/public_data/cropped_heart_fragments.bed'

In [127]:

class MPFragmentCounter():
    """
    """

    def __init__(self):
        """Init class variables."""
        
        self.m = Manager()
        self.d = self.m.dict()
        self.lock = Lock()


        
    def _check_in_list(element: Any, alist: list[Any] | set[Any]) -> bool:
        """
        Check if element is in list.

        TODO Do we need this function?

        Parameters
        ----------
        element : Any
            Element that is checked for.
        alist : list[Any] | set[Any]
            List or set in which the element is searched for.

        Returns
        -------
        bool
            True if element is in list else False
        """

        return element in alist


    
    def _check_true(element: Any, alist: Optional[list[Any]] = None) -> bool:  # true regardless of input
        """
        Return True regardless of input

        Parameters
        ----------
        element : Any
            Element that is checked for.
        alist: Optional[list[Any]]
            List or set in which the element is searched for.

        Returns
        -------
        bool
            True if element is in list else False
        """

        return True

    
    def insertsize_from_fragments(self, fragments: str,
                                  barcodes: Optional[list[str]] = None,
                                  n_threads: int = 8) -> pd.DataFrame:
        # Open fragments file
        if _is_gz_file(fragments):
            f = gzip.open(fragments, "rt")
        else:
            f = open(fragments, "r")

        # Prepare function for checking against barcodes list
        if barcodes is not None:
            barcodes = set(barcodes)
            check_in = self._check_in_list
        else:
            check_in = self._check_true

        iterator = pd.read_csv(fragments,
                               delimiter='\t',
                               header=None,
                               names=['chr', 'start', 'stop', 'barcode', 'count'],
                               iterator=True,
                               chunksize=1000)

        # start timer
        start_time = datetime.datetime.now()

        pool = Pool(n_threads, maxtasksperchild=48)
        jobs = []
        # split fragments into chunks
        for chunk in iterator:
            # apply async job wit callback function
            job = pool.apply_async(self._count_fragments_worker, args=(chunk, barcodes, check_in))
            jobs.append(job)
        # monitor progress
        # utils.monitor_jobs(jobs, description="Progress")
        # close pool
        pool.close()
        # wait for all jobs to finish
        pool.join()
        # reset settings
        count_dict = self.d
        print('what is going on')
        print(count_dict)
        # Fill missing sizes with 0
        max_fragment_size = 1001

        for barcode in count_dict:
            for size in range(max_fragment_size):
                if size not in count_dict[barcode]:
                    count_dict[barcode][size] = 0

        # Close file and print elapsed time
        end_time = datetime.datetime.now()
        f.close()

        elapsed = end_time - start_time
        print("Done reading file - elapsed time: {0}".format(str(elapsed).split(".")[0]))

        # Convert dict to pandas dataframe
        print("Converting counts to dataframe...")
        table = pd.DataFrame.from_dict(count_dict, orient="index")
        table = table[["insertsize_count", "mean_insertsize"] + sorted(table.columns[2:])]
        table["mean_insertsize"] = table["mean_insertsize"].round(2)

        print("Done getting insertsizes from fragments!")

        return table

    
    def _count_fragments_worker(self, chunk, barcodes, check_in):
        
        count_dict = {}
        
        for i in range(len(chunk)):
            row = chunk.iloc[i]
            start = int(row['start'])
            end = int(row['stop'])
            barcode = row['barcode']
            count = int(row['count'])
            size = end - start - 9  # length of insertion (-9 due to to shifted cutting of Tn5)

            # Only add fragment if check is true
            if check_in(barcode, barcodes) is True:
                count_dict = self._add_fragment(count_dict, barcode, size, count)
                
        with self.lock:
            self.d = update_count_dict(self.d, count_dict)


    def _add_fragment(count_dict: dict[str, int],
                      barcode: str,
                      size: int,
                      count: int = 1):
        """
        Add fragment of size 'size' to count_dict.

        Parameters
        ----------
        count_dict : dict[str, int]
            Dictionary containing the counts per insertsize.
        barcode : str
            Barcode of the read.
        size : int
            Insertsize to add to count_dict.
        count : int, default 1
            Number of reads to add to count_dict.
        """

        # Initialize if barcode is seen for the first time
        if barcode not in count_dict:
            count_dict[barcode] = {"mean_insertsize": 0, "insertsize_count": 0}

        # Add read to dict
        if size >= 0 and size <= 1000:  # do not save negative insertsize, and set a cap on the maximum insertsize to limit outlier effects

            count_dict[barcode]["insertsize_count"] += count

            # Update mean
            mu = count_dict[barcode]["mean_insertsize"]
            total_count = count_dict[barcode]["insertsize_count"]
            diff = (size - mu) / total_count
            count_dict[barcode]["mean_insertsize"] = mu + diff

            # Save to distribution
            if size not in count_dict[barcode]:  # first time size is seen
                count_dict[barcode][size] = 0
            count_dict[barcode][size] += count
            
        return count_dict
    

    def _log_result(self, result: Any) -> None:
        """Log results from mp_counter."""

        if self.merged_dict:
            self.merged_dict = dict(Counter(self.merged_dict) + Counter(result))
            # print('merging')
        else:
            self.merged_dict = result

In [128]:
 mpc = MPFragmentCounter()

In [129]:
%%time
counts = mpc.insertsize_from_fragments(small_fragments, barcodes)

what is going on
{}
Done reading file - elapsed time: 0:00:00
Converting counts to dataframe...


KeyError: "None of [Index(['insertsize_count', 'mean_insertsize'], dtype='object')] are in the [columns]"

In [43]:
count_dict={}

In [100]:
count_dict_1={}
count_dict_1['AB'] = {"mean_insertsize": 10, "insertsize_count": 5}
count_dict_1['BC'] = {"mean_insertsize": 10, "insertsize_count": 20}

count_dict_2={}
count_dict_2['AB'] = {"mean_insertsize": 20, "insertsize_count": 20}
count_dict_2['BC'] = {"mean_insertsize": 20, "insertsize_count": 5}

In [117]:
def update_count_dict(count_dict_1, count_dict_2):
    """
    updates
    """
    
    # make Dataframes for computation
    df1 = pd.DataFrame(count_dict_1).T
    df2 = pd.DataFrame(count_dict_2).T

    # merge counts
    merged_counts = pd.merge(df1["insertsize_count"], df2["insertsize_count"], left_index=True, right_index=True)
    # sum total counts/barcode
    updated_counts = merged_counts.sum(axis=1)

    # calculate scaling factors
    x_scaling_factor = merged_counts["insertsize_count_x"] / updated_counts
    y_scaling_factor = merged_counts["insertsize_count_y"] / updated_counts

    # merge mean insertsizes
    merged_mean_insertsizes = pd.merge(df1["mean_insertsize"], df2["mean_insertsize"], left_index=True, right_index=True)

    # scale mean insertsizes
    merged_mean_insertsizes["mean_insertsize_x"] = merged_mean_insertsizes["mean_insertsize_x"] * x_scaling_factor
    merged_mean_insertsizes["mean_insertsize_y"] = merged_mean_insertsizes["mean_insertsize_y"] * y_scaling_factor

    # sum the scaled means
    updated_means = merged_mean_insertsizes.sum(axis=1)

    # build the updated dictionary
    updated_dict = pd.DataFrame({'mean_insertsizes': updated_means, 'insertsize_counts' : updated_counts}).T.to_dict()
    
    
    return updated_dict

In [113]:
pd.DataFrame({'mean_insertsizes': updated_means, 'insertsize_counts' : updated_counts})

Unnamed: 0,mean_insertsizes,insertsize_counts
AB,18.0,25
BC,12.0,25


In [102]:
merged_insertsizes = pd.merge(df1["insertsize_count"], df2["insertsize_count"], left_index=True, right_index=True)
merged

Unnamed: 0,insertsize_count_x,insertsize_count_y
AB,4,2
BC,5,3


In [103]:
x_scaling_factor = merged_insertsizes["insertsize_count_x"] / merged_insertsizes.sum(axis=1)
y_scaling_factor = merged_insertsizes["insertsize_count_y"] / merged_insertsizes.sum(axis=1)

In [104]:
merged_mean_insertsizes = pd.merge(df1["mean_insertsize"], df2["mean_insertsize"], left_index=True, right_index=True)
merged_mean_insertsizes

Unnamed: 0,mean_insertsize_x,mean_insertsize_y
AB,10,20
BC,10,20


In [105]:
merged_mean_insertsizes["mean_insertsize_x"] = merged_mean_insertsizes["mean_insertsize_x"] * x_scaling_factor
merged_mean_insertsizes["mean_insertsize_y"] = merged_mean_insertsizes["mean_insertsize_y"] * y_scaling_factor

In [106]:
merged_mean_insertsizes.sum(axis=1)

AB    18.0
BC    12.0
dtype: float64

In [None]:
merged_mean_insertsizes * 

In [72]:
import pandas as pd

# Erstellen Sie zwei Beispieldatenframes
df1 = pd.DataFrame({'Werte1': [1, 2, 3]}, index=['a', 'b', 'c'])
df2 = pd.DataFrame({'Werte1': [4, 5, 6]}, index=['a', 'b', 'c'])

# Mergen Sie die DataFrames am Index
merged_df = pd.merge(df1, df2, left_index=True, right_index=True)

# Summieren Sie die Werte
summed_df = merged_df.sum(axis=1)

print(summed_df)


a    5
b    7
c    9
dtype: int64


In [73]:
merged_df

Unnamed: 0,Werte1_x,Werte1_y
a,1,4
b,2,5
c,3,6


In [14]:
%%time
count_table = tools._insertsize_from_fragments(small_fragments, barcodes)

[INFO] Counting fragment lengths from fragments file...
[INFO] Done reading file - elapsed time: 0:00:00
[INFO] Converting counts to dataframe...
[INFO] Done getting insertsizes from fragments!
CPU times: user 2.64 s, sys: 377 ms, total: 3.02 s
Wall time: 2.95 s


In [None]:
@beartype
def _insertsize_from_fragments(fragments: str,
                               barcodes: Optional[list[str]] = None) -> pd.DataFrame:
    """
    Get fragment insertsize distributions per barcode from fragments file.

    Parameters
    ----------
    fragments : str
        Path to fragments.bed(.gz) file.
    barcodes : Optional[list[str]], default None
        Only collect fragment sizes for the barcodes in barcodes

    Returns
    -------
    pd.DataFrame
        DataFrame with insertsize distributions per barcode.
    """

    # Open fragments file
    if utils._is_gz_file(fragments):
        f = gzip.open(fragments, "rt")
    else:
        f = open(fragments, "r")

    # Prepare function for checking against barcodes list
    if barcodes is not None:
        barcodes = set(barcodes)
        check_in = _check_in_list
    else:
        check_in = _check_true

    # Read fragments file and add to dict
    print("Counting fragment lengths from fragments file...")
    start_time = datetime.datetime.now()
    count_dict = {}
    for line in f:
        columns = line.rstrip().split("\t")
        start = int(columns[1])
        end = int(columns[2])
        barcode = columns[3]
        count = int(columns[4])
        size = end - start - 9  # length of insertion (-9 due to to shifted cutting of Tn5)

        # Only add fragment if check is true
        if check_in(barcode, barcodes) is True:
            count_dict = _add_fragment(count_dict, barcode, size, count)

    # Fill missing sizes with 0
    max_fragment_size = 1001

    for barcode in count_dict:
        for size in range(max_fragment_size):
            if size not in count_dict[barcode]:
                count_dict[barcode][size] = 0

    # Close file and print elapsed time
    end_time = datetime.datetime.now()
    elapsed = end_time - start_time
    f.close()
    print("Done reading file - elapsed time: {0}".format(str(elapsed).split(".")[0]))

    # Convert dict to pandas dataframe
    print("Converting counts to dataframe...")
    table = pd.DataFrame.from_dict(count_dict, orient="index")
    table = table[["insertsize_count", "mean_insertsize"] + sorted(table.columns[2:])]
    table["mean_insertsize"] = table["mean_insertsize"].round(2)

    print("Done getting insertsizes from fragments!")

    return table

In [None]:
@beartype
def _add_fragment(count_dict: dict[str, int],
                  barcode: str,
                  size: int,
                  count: int = 1) -> dict[str, int]:
    """
    Add fragment of size 'size' to count_dict.

    Parameters
    ----------
    count_dict : dict[str, int]
        Dictionary containing the counts per insertsize.
    barcode : str
        Barcode of the read.
    size : int
        Insertsize to add to count_dict.
    count : int, default 1
        Number of reads to add to count_dict.

    Returns
    -------
    dict[str, int]
        Updated count_dict
    """

    # Initialize if barcode is seen for the first time
    if barcode not in count_dict:
        count_dict[barcode] = {"mean_insertsize": 0, "insertsize_count": 0}

    # Add read to dict
    if size >= 0 and size <= 1000:  # do not save negative insertsize, and set a cap on the maximum insertsize to limit outlier effects

        count_dict[barcode]["insertsize_count"] += count

        # Update mean
        mu = count_dict[barcode]["mean_insertsize"]
        total_count = count_dict[barcode]["insertsize_count"]
        diff = (size - mu) / total_count
        count_dict[barcode]["mean_insertsize"] = mu + diff

        # Save to distribution
        if size not in count_dict[barcode]:  # first time size is seen
            count_dict[barcode][size] = 0
        count_dict[barcode][size] += count

    return count_dict

# HELPERS

In [12]:
@beartype
def _is_gz_file(filepath: str) -> bool:
    """
    Check wheather file is a compressed .gz file.

    Parameters
    ----------
    filepath : str
        Path to file.

    Returns
    -------
    bool
        True if the file is a compressed .gz file.
    """

    with open(filepath, 'rb') as test_f:
        return test_f.read(2) == b'\x1f\x8b'

In [None]:
@beartype
def gunzip_file(f_in: str, f_out: str) -> None:
    """
    Decompress file.

    Parameters
    ----------
    f_in : str
        Path to compressed input file.
    f_out : str
        Destination to decompressed output file.
    """

    with gzip.open(f_in, 'rb') as h_in:
        with open(f_out, 'wb') as h_out:
            shutil.copyfileobj(h_in, h_out)