In [2]:
from pathlib import Path
import networkx as nx
import matplotlib.pyplot as plt
import pandas as pd

In [3]:
def get_graph_data_from_topo(filepath=None):
    """
    Reads a .topo file and returns:
    - A NetworkX directed graph with gene names as node labels and 'Type' as edge weight.
    - A mapping from gene names to integer indices (useful for ML models like PyG).
    
    :param filepath: path to the topology file
    :return: G_named (NetworkX DiGraph), gene_to_idx (dict)
    """
    df = pd.read_csv(filepath, sep=r"\s+")

    # Create gene-to-index mapping for optional ML use
    genes = sorted(set(df['Source']).union(df['Target']))
    gene_to_idx = {gene: idx for idx, gene in enumerate(genes)}

    # Build NetworkX DiGraph with weights
    edges_with_weights = list(zip(df['Source'], df['Target'], df['Type']))
    G = nx.DiGraph()
    G.add_weighted_edges_from(edges_with_weights)

    return G, gene_to_idx

In [4]:
def create_sample_topo_file(filepath, num_nodes=500, num_edges=1000, num_hubs=2):
    import random
    import string
    from itertools import product
    from pathlib import Path

    if filepath is None:
        raise ValueError("filepath must be provided")
    p = Path(filepath)

    if num_nodes <= 1:
        raise ValueError("num_nodes must be > 1")
    if not (1 <= num_hubs < num_nodes):
        raise ValueError("num_hubs must be >=1 and less than num_nodes")

    # maximum directed edges without self-loops
    max_edges = num_nodes * (num_nodes - 1)
    required_hub_edges = num_hubs * (num_nodes - 1)  # each hub -> every other node (excluding self)

    if num_edges < required_hub_edges:
        raise ValueError(
            f"num_edges ({num_edges}) is smaller than the number of edges required to connect "
            f"{num_hubs} hubs to all other nodes ({required_hub_edges})."
        )
    if num_edges > max_edges:
        raise ValueError(f"num_edges ({num_edges}) exceeds maximum possible directed edges without self-loops ({max_edges}).")

    # Generate alphabetic-only gene names (no digits). Use two-letter combos (26^2 = 676 >= 500).
    letters = string.ascii_uppercase
    gen_names = (''.join(t) for t in product(letters, repeat=2))
    nodes = [next(gen_names) for _ in range(num_nodes)]

    hubs = nodes[:num_hubs]

    # Use dict to avoid duplicate directed edges: mapping (src, tgt) -> weight
    edges_map = {}

    # Connect hubs to all other nodes (exclude self-loops)
    for hub in hubs:
        for node in nodes:
            if hub == node:
                continue
            edges_map[(hub, node)] = random.choice([1, 2])

    # Add random unique edges until reaching desired count
    attempts = 0
    while len(edges_map) < num_edges:
        attempts += 1
        if attempts > (num_edges * 100):  # safety to avoid infinite loop
            raise RuntimeError("Too many attempts to generate unique random edges; adjust parameters.")
        src = random.choice(nodes)
        tgt = random.choice(nodes)
        if src == tgt:
            continue
        if (src, tgt) in edges_map:
            continue
        edges_map[(src, tgt)] = random.choice([1, 2])

    # Write to file
    p.parent.mkdir(parents=True, exist_ok=True)
    with p.open('w') as f:
        f.write("Source Target Type\n")
        for (src, tgt), weight in edges_map.items():
            f.write(f"{src} {tgt} {weight}\n")

    print(f"Created {p} with {num_nodes} nodes, {len(edges_map)} edges, {num_hubs} hubs.")
    return p

def create_equal_topo_file(filepath, num_nodes=10715):
    #create a topo file where all nodes are arraged in a circle and each node points to the nodes on the left and right
    import string
    import random
    from itertools import product
    from pathlib import Path
    if filepath is None:
        raise ValueError("filepath must be provided")
    p = Path(filepath)
    
    # Generate alphabetic-only gene names (no digits). Use two-letter combos (26^2 = 676 >= 500).
    letters = string.ascii_uppercase
    # Keep gen_names as an iterator and prepend "GENE" when generating node names.
    gen_names = ("GENE" + ''.join(t) for t in product(letters, repeat=2))
    nodes = [next(gen_names) for _ in range(num_nodes)]
    # Create a circular topology
    with p.open('w') as f:
        f.write("Source Target Type\n")
        for i in range(num_nodes):
            left = (i - 1) % num_nodes
            right = (i + 1) % num_nodes
            weight = random.choice([1, 2])
            f.write(f"{nodes[i]} {nodes[left]} {weight}\n")
            f.write(f"{nodes[i]} {nodes[right]} {weight}\n")

    print(f"Created {p} with {num_nodes} nodes arranged in a circle.")
    return p

topo_filepath = "10715_equal.topo"
create_equal_topo_file(topo_filepath, num_nodes=10715)

# # Create sample .topo file using existing topo_filepath variable
# create_sample_topo_file(topo_filepath)

StopIteration: 

In [None]:
import numpy as np
import networkx as nx

def add_noise_to_graph(G: nx.DiGraph, graph_type=None) -> nx.DiGraph:
    if graph_type == "barabasi":
        # create a random graph with the same number of nodes and edges
        G_random = nx.barabasi_albert_graph(G.number_of_nodes(), G.number_of_edges() // G.number_of_nodes())
        G_random = nx.DiGraph(G_random)  # Convert to directed graph
        # Assign random weight (1 or 2) to each edge
        for u, v in G_random.edges():
            G_random[u][v]['weight'] = np.random.choice([1, 2])
        print("Random graph created.")
        return G_random
    elif graph_type == "self_loop":
        print("Creating random graph...")
        # create a graph with G.number_of_nodes() and only self loops with weight 1 or 2
        G_random = nx.DiGraph()
        G_random.add_nodes_from(G.nodes(data=True))
        for node in G_random.nodes():
            G_random.add_edge(node, node, weight=np.random.choice([1, 2]))
        print("Random graph created.")
        return G_random
    elif graph_type == "equal":
        print("Creating circular graph with equal edges...")
        G_equal = nx.DiGraph()
        G_equal.add_nodes_from(G.nodes(data=True))
        nodes = list(G_equal.nodes())
        num_nodes = len(nodes)
        for i in range(num_nodes):
            left = (i - 1) % num_nodes
            right = (i + 1) % num_nodes
            G_equal.add_edge(nodes[i], nodes[left], weight=1)
            G_equal.add_edge(nodes[i], nodes[right], weight=1)
        print("Circular graph created.")
        return G_equal
    elif graph_type == "equal_dense":
        print("Creating circular dense graph with equal edges and self-loops...")
        G_equal = nx.DiGraph()
        G_equal.add_nodes_from(G.nodes(data=True))
        nodes = list(G_equal.nodes())
        num_nodes = len(nodes)
        assert num_nodes > 0, "Graph must contain at least one node."

        # Connect each node to three neighbors on both sides and add a self-loop
        for i in range(num_nodes):
            # self-loop
            G_equal.add_edge(nodes[i], nodes[i], weight=1)
            for k in range(1, 4):  # three neighbours on each side
                left = (i - k) % num_nodes
                right = (i + k) % num_nodes
                G_equal.add_edge(nodes[i], nodes[left], weight=1)
                G_equal.add_edge(nodes[i], nodes[right], weight=1)

        print("Dense circular graph created.")
        return G_equal
    elif graph_type == "equal_extradense":
        print("Creating circular dense graph with equal edges and self-loops...")
        G_equal = nx.DiGraph()
        G_equal.add_nodes_from(G.nodes(data=True))
        nodes = list(G_equal.nodes())
        num_nodes = len(nodes)
        assert num_nodes > 0, "Graph must contain at least one node."

        # Connect each node to three neighbors on both sides and add a self-loop
        for i in range(num_nodes):
            # self-loop
            G_equal.add_edge(nodes[i], nodes[i], weight=1)
            for k in range(1, 20):  # three neighbours on each side
                left = (i - k) % num_nodes
                right = (i + k) % num_nodes
                G_equal.add_edge(nodes[i], nodes[left], weight=1)
                G_equal.add_edge(nodes[i], nodes[right], weight=1)

        print("Dense circular graph created.")
        return G_equal
    elif graph_type == "fully_connected":
        G_fc = nx.DiGraph()
        G_fc.add_nodes_from(G.nodes(data=True))
        nodes = list(G_fc.nodes())
        num_nodes = len(nodes)
        assert num_nodes > 0, "Graph must contain at least one node."
        for n in nodes:
            for m in nodes:
                G_fc.add_edge(n, m, weight=1)
        print("Fully connected graph created.")
        return G_fc
    else:
        print("Creating hub graph...")
        G_hub = nx.DiGraph()
        G_hub.add_nodes_from(G.nodes(data=True))
        nodes = list(G_hub.nodes())
        num_nodes = len(nodes)
        # Create G.edges / G.nodes hubs. Every hub connects to all other nodes including itself
        num_hubs = max(1, G.number_of_edges() // G.number_of_nodes())
        for i in range(num_hubs):
            hub_node = nodes[i % num_nodes]
            for target_node in nodes:
                G_hub.add_edge(hub_node, target_node, weight=np.random.choice([1, 2]))
        print("Hub graph created.")
        return G_hub

In [None]:
def create_topo_file_from_graph(network_name, G: nx.DiGraph, dir):
    """
    Create a topo file as expected by racipe from a nx Graph
    and store it in the const.TOPO_PATH directory.
    :param G: nx Graph
    """
    new_file_path = Path(dir) / f"{network_name}.topo" 
    # save graph to a trrust.topo file with the header Source Target Type
    with open(new_file_path, "w") as f:
        f.write("Source Target Type\n")
        for u, v, d in G.edges(data='weight'):
            f.write(f"{u} {v} {d}\n")


In [None]:
#create a nx diGraph with 10716 nodes and no edges
G_empty = nx.DiGraph()
G_empty.add_nodes_from([i for i in range(10716)])
G = add_noise_to_graph(G_empty, graph_type="fully_connected")
# store graph as torch edge index 10716_equal_extradense.pt
# Map node labels (strings) to integer indices before creating a tensor.
import torch

edge_index = torch.tensor(list(G.edges())).t().contiguous()
torch.save(edge_index, "10716_fully_connected.pt")

: 

In [None]:
#create a nx diGraph with 10716 nodes and no edges
G_empty = nx.DiGraph()
G_empty.add_nodes_from([i for i in range(10716)])
# from the first node add edges to all other nodes including itself
for target_node in G_empty.nodes():
    G_empty.add_edge(0, target_node, weight=1)
import torch

edge_index = torch.tensor(list(G.edges())).t().contiguous()
torch.save(edge_index, "10716_hub.pt")

In [5]:
G, _ = get_graph_data_from_topo(filepath=Path("dorothea_1000.topo"))
nodes = list(G.nodes())


In [6]:
print(f"Graph has {G.number_of_nodes()} nodes and {G.number_of_edges()} edges.")

Graph has 1000 nodes and 2006 edges.


In [None]:
print("number of nodes with outgoing edges:", sum(1 for n in G.nodes() if G.out_degree(n) > 0))

number of nodes with outgoing edges: 279


In [None]:
import pandas as pd
df = pd.read_csv("XXL_run_gene_metrics.csv")
genes_XXL = df["gene"].tolist()

FileNotFoundError: [Errno 2] No such file or directory: 'XXL_run_gene_metrics.csv'

In [None]:
print(len(nodes), len(genes_XXL))
print(len(set(nodes)), len(set(genes_XXL)))  # Check if the sets are equal
# print the value counts for genes_XXL
print(df["gene"].value_counts())

150 228
150 125
gene
PGR      2
FOS      2
JUN      2
STAT1    2
HIF1A    2
        ..
NR1H2    1
HNF4A    1
MAFB     1
KLF13    1
HBP1     1
Name: count, Length: 125, dtype: int64


In [None]:
# read splits.pt

import torch
splits = torch.load("splits.pt")

In [None]:
print(len(splits["train_index_forward"]))
print(len(splits["train_index_backward"]))
print(len(splits["test_index_forward"]))
print(len(splits["test_index_backward"]))
print(len(splits["val_index_forward"]))
print(len(splits["val_index_backward"]))

3895
3895
1299
1299
1298
1298


In [None]:
import torch
# read graph f10716_self_loops.pt
G_self_loops = torch.load("10716_hub.pt")
G_self_loops
#get shape of G_self_loops
G_self_loops.shape

151,839

torch.Size([2, 417924])