In [1]:
import sys
from pathlib import Path

# add parent folder to the path
module_path = str(Path.cwd().parents[0])
if module_path not in sys.path:
    sys.path.append(module_path)

# PanRicci

In [2]:
import networkx as nx
import numpy as np
import math
import ot

## Load GFA

In [3]:
from panricci.utils.gfa_loader import GFALoader

In [4]:
gfa_loader = GFALoader(undirected=False)
nodes, edges, G = gfa_loader("../data/test1.gfa")

In [5]:
nodes

{'1': {'label': 'AC', 'len': 2, 'node_depth': 1.0},
 '2': {'label': 'AA', 'len': 2, 'node_depth': 0.5},
 '3': {'label': 'CCTAAA', 'len': 6, 'node_depth': 1.0}}

In [6]:
edges

[('1', '3'), ('1', '2'), ('2', '3')]

In [7]:
F = nx.DiGraph()
F.add_nodes_from([(node, attrs) for node, attrs in nodes.items()])
F.add_edges_from(edges, distance=1)

In [8]:
# access and modify attributes of nodes as a dictionary
F.nodes["1"] ,F.nodes["1"]["node_depth"]

({'label': 'AC', 'len': 2, 'node_depth': 1.0}, 1.0)

In [9]:
# access and modify attributes of edges as a dictionary
F.edges["1","3"]["curvature"] = 5
F.edges["1","3"], F.edges["1","3"].get("curvature")

({'distance': 1, 'curvature': 5}, 5)

In [10]:
F.edges["1","3"]

{'distance': 1, 'curvature': 5}

## Probability distribution for a node

In [11]:
G.nodes["2"]

{'label': 'AA', 'len': 2, 'node_depth': 0.5}

In [12]:
from panricci.distributions.variation_graph import DistributionNodes
distribution_nodes = DistributionNodes(G, alpha=0.5)

In [13]:
distribution_nodes(node="2")

{'2': 0.5, '3': 0.5}

## Ricci Flow

In [14]:
from panricci.ricci_flow import RicciFlow

ricci = RicciFlow(G, dist_nodes=distribution_nodes, dirsave_graphs="../output/test3/ricci-flow")


In [15]:
ricci(iterations=5)

Ricci-Flow: 100%|██████████| 5/5 [00:00<00:00, 383.95it/s]


In [None]:
# def curvature(edge):
edge=["1","2"]
node1, node2 = edge
distance = G.edges[edge]["distance"]
distance

In [None]:
neighbors1 = distribution_nodes(node1)
neighbors2 = distribution_nodes(node2)

neighbors1, neighbors2

In [None]:
import numpy as np

def wasserstein(dist1, dist2):
    nodes_subgraph = list(dist1.keys()) + list(dist2.keys())
    subgraph = G.subgraph(nodes_subgraph)
    distances_subgraph = nx.shortest_path(subgraph, weight="distance", method="dijkstra")#, subgraph.edges()
    
    # distance matrix for OT
    M = np.zeros((len(neighbors1),len(neighbors2)))

    for i,source in enumerate(neighbors1.keys()):
        for j,target in enumerate(neighbors2.keys()):
            try:
                nodes = distances_subgraph[source][target]
                edges = [(n1,n2) for n1,n2 in zip(nodes[:-1], nodes[1])]
                M[i,j] = np.sum([G.edges[e]["distance"] for e in edges])
            except:
                continue
    
    M /= M.max()

    a,b=list(neighbors1.values()), list(neighbors2.values())
    return ot.emd2(a,b,M)


def curvature(edge):
    node1, node2 = edge
    neighbors1 = distribution_nodes(node1)
    neighbors2 = distribution_nodes(node2)
    wass = wasserstein(neighbors1, neighbors2)
    distance = G.edges[edge]["distance"]
    curvature = 1 - wass/distance

    return curvature


In [None]:

wass = wasserstein(neighbors1, neighbors2)

In [None]:
curvature(["2","3"])

In [None]:
nodes_subgraph = list(neighbors1.keys()) + list(neighbors2.keys())
subgraph = G.subgraph(nodes_subgraph)


In [None]:
distances_subgraph = nx.shortest_path(subgraph, weight="distance", method="dijkstra")#, subgraph.edges()
distances_subgraph["2"]["3"]

In [None]:
import numpy as np
M = np.zeros((len(neighbors1),len(neighbors2)))

for i,source in enumerate(neighbors1.keys()):
    for j,target in enumerate(neighbors2.keys()):
        try:
            nodes = distances_subgraph[source][target]
            edges = [(n1,n2) for n1,n2 in zip(nodes[:-1], nodes[1])]
            M[i,j] = np.sum([G.edges[e]["distance"] for e in edges])
        except:
            continue
 
M /= M.max()
M

In [None]:
a,b=list(neighbors1.values()), list(neighbors2.values())
ot.emd2(a,b,M)

## Alignment with 