In [1]:
import gfapy
import networkx as nx
import matplotlib.pyplot as plt

In [2]:
def add_nodes_to_graph(G: gfapy.Gfa):
    """
    Adds nodes from a graph G to a networkx graph G_nx.

    Parameters:
    - G: The original graph with segments containing name and sequence attributes.
    - G_nx: A networkx graph instance to which nodes will be added.

    Each node is added to G_nx with its sequence as an attribute.
    """

    G_nx = nx.DiGraph()
    nodes_dict = {int(segment.name): segment.sequence for segment in G.segments}
    for node in nodes_dict.keys():
        G_nx.add_node(node, sequence=nodes_dict[node])

    return G_nx

def add_oriented_edges(G, G_nx):
    """
    Adds edges to the graph G_nx based on the orientation of edges in graph G.
    For each edge in G, nodes are added or connected in G_nx based on the orientation
    of the 'from' and 'to' segments of the edge. New nodes are created with unique IDs
    and their sequences are fetched from nodes_dict.

    Parameters:
    - G: The original graph with edges that have orientations.
    - G_nx: The networkx graph to which the oriented edges will be added.
    """

    nodes_dict = {segment.name: segment.sequence for segment in G.segments}

    for edge in G.edges:
        if edge.from_orient == "-" and edge.to_orient == "-":
            new_from = max([int(n) for n in G_nx.nodes]) + 1
            new_to = new_from + 1
            G_nx.add_node(new_from, sequence=nodes_dict[edge.from_segment.name])
            G_nx.add_node(new_to, sequence=nodes_dict[edge.to_segment.name])
            G_nx.add_edge(new_from, new_to)

        elif edge.from_orient == "-" and edge.to_orient == "+":
            new_from = max([int(n) for n in G_nx.nodes]) + 1
            G_nx.add_node(new_from, sequence=nodes_dict[edge.from_segment.name])
            G_nx.add_edge(new_from, int(edge.to_segment.name))

        elif edge.from_orient == "+" and edge.to_orient == "-":
            new_to = max([int(n) for n in G_nx.nodes]) + 1
            G_nx.add_node(new_to, sequence=nodes_dict[edge.to_segment.name])
            G_nx.add_edge(int(edge.from_segment.name), new_to)

        else:
            G_nx.add_edge(int(edge.from_segment.name), int(edge.to_segment.name))

def process_graph(G):
    """
    Processes a GFA graph and returns a networkx graph with nodes and oriented edges.

    Parameters:
    - G: A GFA graph with segments and edges.

    Returns:
    - G_nx: A networkx graph with nodes and oriented edges.
    """

    G_nx = add_nodes_to_graph(G)
    add_oriented_edges(G, G_nx)

    return G_nx

def print_info(G):
    # print the name of the graph
    print(f"Graph name: {G.name}")
    # Number of nodes and edges
    print(f"Number of nodes: {len(G.nodes)}")
    print(f"Number of edges: {len(G.edges)}")
    # Check if is a DAG and print in case the number of cycles
    print(f"Is DAG: {nx.is_directed_acyclic_graph(G)}")
    # print the number of nodes with in-degree 1
    print(f"Number of nodes with in-degree 1: {len([node for node in G.nodes if G.in_degree(node) == 1])}")
    # print the average in-degree
    print(f"Average in-degree: {sum([G.in_degree(node) for node in G.nodes]) / len(G.nodes)}")
    # nodes with out-degree 1
    print(f"Number of nodes with out-degree 1: {len([node for node in G.nodes if G.out_degree(node) == 1])}")
    # average out-degree
    print(f"Average out-degree: {sum([G.out_degree(node) for node in G.nodes]) / len(G.nodes)}")


In [None]:
# G1 = gfapy.Gfa.from_file("../../data/pangenome-graphs/DRB1-3123_unsorted.gfa")
# G2 = gfapy.Gfa.from_file("../../data/pangenome-graphs/chrX.pan.fa.a2fb268.4030258.6a1ecc2.smooth.gfa")
# G3 = gfapy.Gfa.from_file("../../data/pangenome-graphs/chrY.pan.fa.a2fb268.4030258.6a1ecc2.smooth.gfa")

In [3]:
G1 = gfapy.Gfa.from_file("../../data/pangenome-graphs/DRB1-3123_unsorted.gfa")
G1_nx = process_graph(G1)
G1_nx.name = "DRB1-3123_unsorted"

In [11]:
def search_linear_path(G: nx.DiGraph, start_node: int):
    path = []
    visited = set()
    current_node = start_node
    while G.in_degree(current_node) == 1 and G.out_degree(current_node) == 1:
        visited.add(current_node)
        path.insert(0, current_node)
        current_node = list(G.predecessors(current_node))[0]
        if current_node in visited:
            break

    current_node = start_node
    while G.in_degree(current_node) == 1 and G.out_degree(current_node) == 1:
        visited.add(current_node)
        if current_node not in path:
            path.append(current_node) # avoid adding the same node twice
        current_node = list(G.successors(current_node))[0]
        if current_node in visited:
            break

    return path


def compress_edges(G: nx.DiGraph) -> nx.DiGraph:
    G_copy = G.copy()
    visited = set()
    nodes = list(G_copy.nodes)
    for node in nodes:
        if node not in visited and node in G_copy.nodes:
            path = search_linear_path(G_copy, node)
            if len(path) > 1:
                new_segment = "".join([G_copy.nodes[node]["sequence"] for node in path])
                node_to_attach = list(G_copy.successors(path[::-1][0]))[0]
                for n in path[1:]:
                    visited.add(n)
                    G_copy.remove_node(n)
                G_copy.nodes[path[0]]["sequence"] = new_segment
                G_copy.add_edge(path[0], node_to_attach)

    return G_copy

def compact(G: nx.DiGraph) -> nx.DiGraph:
    G_new = G.copy()
    while True:
        found = False
        for u, v in G_new.edges:
            if G_new.in_degree(v) == 1 and G_new.out_degree(u) == 1:
                new_segment = G_new.nodes[u]["sequence"] + G_new.nodes[v]["sequence"]
                G_new.nodes[u]["sequence"] = new_segment
                successors = list(G_new.successors(v))
                G_new.remove_node(v)
                for s in successors:
                    G_new.add_edge(u, s)
                found = True
                break
        if not found:
            break

    return G_new

def histogram_in_degree(G: nx.DiGraph):
    plt.hist([G.in_degree(node) for node in G.nodes], bins=range(int(min([G.in_degree(node) for node in G.nodes])), int(max([G.in_degree(node) for node in G.nodes])) + 2))
    plt.xticks(range(int(min([G.in_degree(node) for node in G.nodes])), int(max([G.in_degree(node) for node in G.nodes])) + 1))
    plt.xlabel("In-degree")
    plt.ylabel("Frequency")
    plt.title("Histogram of in-degrees")
    plt.show()


In [5]:
print_info(G1_nx)
# histogram_in_degree(G1_nx)

Graph name: DRB1-3123_unsorted
Number of nodes: 3845
Number of edges: 4380
Is DAG: False
Number of nodes with in-degree 1: 2690
Average in-degree: 1.1391417425227568
Number of nodes with out-degree 1: 2166
Average out-degree: 1.1391417425227568


In [12]:
G_grossi = compress_grossi(G1_nx)
G_compact = compress_edges(G1_nx)

print_info(G_grossi)
print("----")
print_info(G_compact)

Graph name: DRB1-3123_unsorted
Number of nodes: 3262
Number of edges: 3797
Is DAG: False
Number of nodes with in-degree 1: 2107
Average in-degree: 1.1640098099325566
Number of nodes with out-degree 1: 1583
Average out-degree: 1.1640098099325566
----
Graph name: DRB1-3123_unsorted
Number of nodes: 3782
Number of edges: 4317
Is DAG: False
Number of nodes with in-degree 1: 2627
Average in-degree: 1.1414595452141725
Number of nodes with out-degree 1: 2103
Average out-degree: 1.1414595452141725


In [None]:
# print all the nodes with in degree 1 and out degree 1
for node in G_compact.nodes:
    if G_compact.in_degree(node) == 1 and G_compact.out_degree(node) == 1:
        print(node)

In [None]:
class Graph:
	# instance variables
	def __init__(self, v, e):
		# v is the number of nodes/vertices
		self.time = 0
		self.traversal_array = []
		self.v = v
		# e is the number of edge
		self.e = e
		# adj. list for graph
		self.graph_list = [[] for _ in range(v)]
		self.edge_types = {"Back": [], "Forward": [], "Cross": [], "Tree": []}

	# function to print adj list
	def print_graph_list(self):
		print("Adjacency List Representation:")
		for i in range(self.v):
			print(i, "-->", *self.graph_list[i])
		print()

	# function the get number of edges
	def number_of_edges(self):
		return self.e

	# function for dfs
	def dfs(self):
		self.visited = [False]*self.v
		self.start_time = [0]*self.v
		self.end_time = [0]*self.v

		for node in range(self.v):
			if not self.visited[node]:
				self.traverse_dfs(node)
		print()
		# print("DFS Traversal: ", self.traversal_array)
		print()

	def traverse_dfs(self, node):
		# mark the node visited
		self.visited[node] = True
		# add the node to traversal
		self.traversal_array.append(node)
		# get the starting time
		self.start_time[node] = self.time
		# increment the time by 1
		self.time += 1
		# traverse through the neighbours
		for neighbour in self.graph_list[node]:
			# if a node is not visited
			if not self.visited[neighbour]:
				# marks the edge as tree edge
				# print('Tree Edge:', str(node)+'-->'+str(neighbour))
				self.edge_types["Tree"].append((node, neighbour))
				# dfs from that node
				self.traverse_dfs(neighbour)
			else:
				# when the parent node is traversed after the neighbour node
				if self.start_time[node] > self.start_time[neighbour] and self.end_time[node] < self.end_time[neighbour]:
					# print('Back Edge:', str(node)+'-->'+str(neighbour))
					self.edge_types["Back"].append((node, neighbour))
				# when the neighbour node is a descendant but not a part of tree
				elif self.start_time[node] < self.start_time[neighbour] and self.end_time[node] > self.end_time[neighbour]:
					# print('Forward Edge:', str(node)+'-->'+str(neighbour))
					self.edge_types["Forward"].append((node, neighbour))
				# when parent and neighbour node do not have any ancestor and a descendant relationship between them
				elif self.start_time[node] > self.start_time[neighbour] and self.end_time[node] > self.end_time[neighbour]:
					# print('Cross Edge:', str(node)+'-->'+str(neighbour))
					self.edge_types["Cross"].append((node, neighbour))
			self.end_time[node] = self.time
			self.time += 1

In [None]:
def compute_edge_types(G1_nx):
	# Create a new Graph instance
	G = Graph(max([int(node) for node in G1_nx.nodes]) + 1, len(G1_nx.edges))
	# Add edges to the adjacency list
	for edge in G1_nx.edges:
		G.graph_list[int(edge[0])].append(int(edge[1]))
	# Perform a DFS traversal
	G.dfs()
	return G.edge_types

def invert_back_edges(G: nx.DiGraph):
	# check if there are back edges
	if len(compute_edge_types(G)["Back"]) == 0:
		print("No back edges to invert")

	back_edges_exist = True

	while back_edges_exist:
		edge_types = compute_edge_types(G)  # type: ignore
		if not edge_types["Back"]:
			back_edges_exist = False
		else:
			for edge in edge_types["Back"]:
				G.remove_edge(edge[0], edge[1])
				G.add_edge(edge[1], edge[0])

def remove_back_edges(G: nx.DiGraph):
	# check if there are back edges
	if len(compute_edge_types(G)["Back"]) == 0:
		print("No back edges to remove")

	edge_types = compute_edge_types(G)  # type: ignore
	for edge in edge_types["Back"]:
		G.remove_edge(edge[0], edge[1])

def remove_cross_edges(G: nx.DiGraph):
	# check if there are cross edges
	if len(compute_edge_types(G)["Cross"]) == 0:
		print("No cross edges to remove")

	edge_types = compute_edge_types(G)  # type: ignore
	for edge in edge_types["Cross"]:
		G.remove_edge(edge[0], edge[1])


def print_edge_types(G: nx.DiGraph, just_len: bool):
	if just_len:
		edge_types = compute_edge_types(G)
		for key in edge_types:
			print(f"{key} edges: {len(edge_types[key])}")
	else:
		edge_types = compute_edge_types(G)
		for key in edge_types:
			print(f"{key} edges: {edge_types[key]}")

In [None]:
G_tmp = G1_nx.copy()
print_edge_types(G_tmp, True)
print(f"Is DAG: {nx.is_directed_acyclic_graph(G_tmp)}")

# remove back and cross edges
remove_back_edges(G_tmp)
remove_cross_edges(G_tmp)
print_edge_types(G_tmp, True)
print(f"Is DAG: {nx.is_directed_acyclic_graph(G_tmp)}")


# remove self loops and check if the graph is a DAG
G_tmp.remove_edges_from(nx.selfloop_edges(G_tmp))
print(f"Is DAG: {nx.is_directed_acyclic_graph(G_tmp)}")

In [None]:
def print_cycles(G: nx.DiGraph):
    cycles = nx.simple_cycles(G)
    for cycle in cycles:
        print(cycle,(list(G.successors(cycle[-1])))[:1])

print_cycles(G_tmp)
