In [1]:
import pandas as pd
import seaborn as sns
import numpy as np
from scipy.spatial.distance import pdist, squareform

%load_ext autoreload
%autoreload 2

import pandas as pd
import numpy as np
import os
from dask.diagnostics import ProgressBar
import duckdb

sys.path.append("../scripts")
from initial_map import BarcodeMapper
from map_refiner import MapRefiner
import preprocess

In [2]:
import networkx as nx
from collections import defaultdict
import random
from joblib import Parallel, delayed
from tqdm import tqdm
import pandas as pd

def hamming_distance(s1, s2):
    return sum(c1 != c2 for c1, c2 in zip(s1, s2))

def build_graph_parallel(umi_counts, threshold=1, n_jobs=-1):
    """Build UMI graph in parallel using Hamming distance."""
    umis = list(umi_counts.keys())
    G = nx.Graph()
    for u in umis:
        G.add_node(u, count=umi_counts[u])

    def check_pair(i, j):
        if hamming_distance(umis[i], umis[j]) <= threshold:
            return (umis[i], umis[j])
        return None

    pairs = [(i, j) for i in range(len(umis)) for j in range(i+1, len(umis))]
    edges = Parallel(n_jobs=n_jobs)(
        delayed(check_pair)(i, j) for i, j in tqdm(pairs, desc="Computing edges")
    )
    for e in edges:
        if e is not None:
            G.add_edge(*e)
    return G

def directional_adjacency_clustering_df(umi_counts, threshold=1, seed=None, n_jobs=-1, ratio=0.1):
    """
    Adjacency clustering with directionality: only cluster a node into a higher-count node
    if its count < ratio * count of selected node.
    """
    if seed is not None:
        random.seed(seed)

    G = build_graph_parallel(umi_counts, threshold=threshold, n_jobs=n_jobs)
    cluster_records = []

    for component in nx.connected_components(G):
        subgraph = G.subgraph(component).copy()
        visited = set()
        selected = []

        while len(visited) < len(subgraph):
            candidates = [n for n in subgraph.nodes if n not in visited]
            if not candidates:
                break
            max_count = max(subgraph.nodes[n]["count"] for n in candidates)
            top_nodes = [n for n in candidates if subgraph.nodes[n]["count"] == max_count]
            rep = random.choice(top_nodes)
            selected.append(rep)
            visited |= {rep} | set(subgraph.neighbors(rep))

        # Assign each UMI to its representative based on abundance ratio
        for node in component:
            if node in selected:
                rep = node
            else:
                # Consider neighbors that are selected
                neighbors = set(G.neighbors(node)) & set(selected)
                # Only assign to neighbor if node count < ratio * neighbor count
                eligible_reps = [n for n in neighbors if umi_counts[node] < ratio * umi_counts[n]]
                if eligible_reps:
                    # choose highest count among eligible reps
                    rep = max(eligible_reps, key=lambda x: umi_counts[x])
                else:
                    # not eligible -> stays as its own representative
                    rep = node

            cluster_records.append({
                "umi": node,
                "count": umi_counts[node],
                "rep_umi": rep
            })

    df = pd.DataFrame(cluster_records)
    return df


# Example usage
umi_counts = {
    "ACGT": 456,
    "TCGT": 12,
    "CCGT": 11,
    "AAAT": 90,
    "ACAG": 75,
    "ACAT": 72
}

df = directional_adjacency_clustering_df(umi_counts, threshold=1, seed=42, n_jobs=-1, ratio=0.1)
df

Computing edges: 100%|██████████| 15/15 [00:00<00:00, 1082.90it/s]


Unnamed: 0,umi,count,rep_umi
0,AAAT,90,AAAT
1,CCGT,11,ACGT
2,ACGT,456,ACGT
3,ACAT,72,ACAT
4,TCGT,12,ACGT
5,ACAG,75,ACAG
