In [1]:
import networkx as nx
import pandas as pd

In [321]:
import networkx as nx
import pandas as pd
import pygraphviz as pgv

class SignedPathSolver:

    def __init__(self, G):
        self.G = G
        self.H = nx.DiGraph()
        self.results = None
        self.source_dict = {}
        self.target_dict = {}
        self.sscp_results = None  # Store the Signed-Consistent Shortest Path results


    def shortest_signed_paths(self):
        sources = list(self.source_dict.keys())
        targets = list(self.target_dict.keys())

        all_results = {}
        sif_tuples = []

        for source in sources:
            for target in targets:

                if source == target:  # Skip when source and target are the same
                    continue

                shortest_paths_pos = []
                shortest_paths_neg = []
                shortest_distance_pos = float('inf')
                shortest_distance_neg = float('inf')

                for path in nx.all_simple_paths(self.G, source, target):
                    # Calculate the signed distance of the path
                    weights = [self.G[path[i]][path[i+1]]['weight'] for i in range(len(path)-1)]
                    abs_sum = sum(abs(w) for w in weights)
                    sign_product = int(np.sign(np.prod(weights)))
                    signed_distance = abs_sum if sign_product > 0 else -abs_sum

                    # Update positive paths if needed
                    if signed_distance > 0:
                        if signed_distance < shortest_distance_pos:
                            shortest_paths_pos = [path]
                            shortest_distance_pos = signed_distance
                        elif signed_distance == shortest_distance_pos:
                            shortest_paths_pos.append(path)
                    # Update negative paths if needed
                    else:
                        if abs(signed_distance) < abs(shortest_distance_neg):
                            shortest_paths_neg = [path]
                            shortest_distance_neg = signed_distance
                        elif signed_distance == shortest_distance_neg:
                            shortest_paths_neg.append(path)

                # Generate a list of tuples (source, interaction type, target) from the paths
                for path in shortest_paths_pos + shortest_paths_neg:
                    for i in range(len(path) - 1):
                        interaction_type = 'P' if self.G[path[i]][path[i+1]]['weight'] > 0 else 'N'
                        sif_tuples.append((path[i], interaction_type, path[i+1]))

                # Store the results
                all_results[(source, target)] = {
                    'positive_path': shortest_paths_pos,
                    'positive_distance': shortest_distance_pos,
                    'negative_path': shortest_paths_neg,
                    'negative_distance': -shortest_distance_neg  # Revert to positive for reporting
                }

                # Convert sif_tuples to DataFrame
        sif_df = pd.DataFrame(sif_tuples, columns=['source', 'interaction', 'target']).drop_duplicates()

        self.results = (all_results, sif_df)

    def draw_graph(self):
        sources = list(self.source_dict.keys())
        targets = list(self.target_dict.keys())
        # Create a new graph using pygraphviz
        A = pgv.AGraph(directed=True, strict=True, rankdir='LR')

        # Add nodes to the graph
        for node in self.G.nodes():
            if node in sources:
                A.add_node(node, shape='box', color='blue')
            elif node in targets:
                A.add_node(node, shape='ellipse', color='green')
            else:
                A.add_node(node, shape='diamond', color='gray')

        # Add edges to the graph with specific colors
        for u, v, data in self.G.edges(data=True):
            edge_color = 'green' if data['weight'] > 0 else 'red'
            A.add_edge(u, v, color=edge_color)

        # Render the graph
        A.layout(prog='dot')
        A.draw('network.png')


    def get_subnetwork(self):
        if not self.results:
            print("Please run shortest_signed_paths first.")
            return

        H = self.H
        all_results = self.results[0]

        for key in all_results:
            for path in all_results[key]['positive_path']:
                for i in range(len(path) - 1):
                    H.add_edge(path[i], path[i+1], weight=self.G[path[i]][path[i+1]]['weight'])

            for path in all_results[key]['negative_path']:
                for i in range(len(path) - 1):
                    H.add_edge(path[i], path[i+1], weight=self.G[path[i]][path[i+1]]['weight'])

        # Draw the subnetwork H
        A = pgv.AGraph(directed=True, strict=True, rankdir='LR')
        for node in H.nodes():
            if node in list(self.source_dict.keys()):  # Use self.sources and self.targets here
                A.add_node(node, shape='box', color='blue')
            elif node in list(self.target_dict.keys()):
                A.add_node(node, shape='ellipse', color='green')
            else:
                A.add_node(node, shape='diamond', color='gray')

        for u, v, data in H.edges(data=True):
            edge_color = 'green' if data['weight'] > 0 else 'red'
            A.add_edge(u, v, color=edge_color)

        A.layout(prog='dot')
        A.draw('subnetwork.png')

    def filter_signed_consistent_paths(self):
        # Check if the shortest paths are already calculated
        if not self.results:
            print("Please run shortest_signed_paths first.")
            return

        filtered_results = {}
        all_results = self.results[0]
        sif_tuples = []

        for (source, target), value in all_results.items():
            # Use the provided sign for the source and target, else default to positive
            source_sign = self.source_dict.get(source, 1)
            target_sign = self.target_dict.get(target, 1)

            # Filter positive paths
            pos_paths = value['positive_path'] if source_sign * target_sign > 0 else []

            # Filter negative paths
            neg_paths = value['negative_path'] if source_sign * target_sign < 0 else []

            # Update the filtered results dictionary
            filtered_results[(source, target)] = {
                'positive_path': pos_paths,
                'positive_distance': value['positive_distance'],
                'negative_path': neg_paths,
                'negative_distance': value['negative_distance']
            }

            # Generate a list of tuples (source, interaction type, target) from the filtered paths
            for path in pos_paths + neg_paths:
                for i in range(len(path) - 1):
                    interaction_type = 'P' if self.G[path[i]][path[i+1]]['weight'] > 0 else 'N'
                    sif_tuples.append((path[i], interaction_type, path[i+1]))

        # Convert sif_tuples to DataFrame and drop duplicates
        sif_df = pd.DataFrame(sif_tuples, columns=['source', 'interaction', 'target']).drop_duplicates()

        self.sscp_results = (filtered_results, sif_df)

    def get_filtered_subnetwork(self):
        if not hasattr(self, 'sscp_results'):
            print("Please run filter_signed_consistent_paths first.")
            return

        H = nx.DiGraph()
        all_results = self.sscp_results[0]

        for key in all_results:
            for path in all_results[key]['positive_path']:
                for i in range(len(path) - 1):
                    H.add_edge(path[i], path[i+1], weight=self.G[path[i]][path[i+1]]['weight'])

            for path in all_results[key]['negative_path']:
                for i in range(len(path) - 1):
                    H.add_edge(path[i], path[i+1], weight=self.G[path[i]][path[i+1]]['weight'])

        
        sources = list(self.source_dict.keys())
        targets = list(self.target_dict.keys())

        # Draw the subnetwork H using graphviz
        A = pgv.AGraph(directed=True, strict=True, rankdir='LR')
        for node in H.nodes():
            if node in sources:
                color = 'blue'
                # Changed 'red' to 'lightcoral'
                fillcolor = 'lightblue' if self.source_dict.get(node, 1) > 0 else 'lightcoral' 
                A.add_node(node, shape='box', color=color, style='filled', fillcolor=fillcolor)
            elif node in targets:
                color = 'green'
                # Changed 'lightred' to 'lightcoral'
                fillcolor = 'lightblue' if self.target_dict.get(node, 1) > 0 else 'lightcoral' 
                A.add_node(node, shape='ellipse', color=color, style='filled', fillcolor=fillcolor)
            else:
                A.add_node(node, shape='diamond', color='gray', style='filled', fillcolor='white')


        for u, v, data in H.edges(data=True):
            edge_color = 'green' if data['weight'] > 0 else 'red'
            A.add_edge(u, v, color=edge_color)

        A.layout(prog='dot')
        A.draw('filtered_subnetwork.png')

    def find_negative_cycles(self):
        G = self.G
        cycles = nx.simple_cycles(G)
        negative_cycles = []

        for cycle in cycles:
            cycle_weight = sum(G[cycle[i]][cycle[i+1]]['weight'] for i in range(len(cycle)-1))
            cycle_weight += G[cycle[-1]][cycle[0]]['weight']  # Add the weight connecting the last to the first node
            
            if cycle_weight < 0:
                print(cycle)
                negative_cycles.append(cycle)

        return negative_cycles



In [330]:

# Create the network
G = nx.DiGraph()
G.add_edge('A', 'B', weight=1)
G.add_edge('B', 'C', weight=-1)
G.add_edge('A', 'D', weight=-1)
G.add_edge('D', 'E', weight=1)
G.add_edge('E', 'F', weight=-1)
G.add_edge('F', 'G', weight=1)
G.add_edge('G', 'H', weight=1)
G.add_edge('H', 'I', weight=-1)
G.add_edge('A', 'I', weight=1)
G.add_edge('I', 'J', weight=1)
G.add_edge('B', 'J', weight=-1)
G.add_edge('J', 'K', weight=1)
G.add_edge('A', 'K', weight=1)
G.add_edge('K', 'L', weight=-1)
G.add_edge('C', 'L', weight=1)
G.add_edge('L', 'M', weight=1)
G.add_edge('D', 'M', weight=-1)
G.add_edge('M', 'N', weight=-1)
G.add_edge('F', 'N', weight=1)
G.add_edge('N', 'O', weight=1)
G.add_edge('X', 'Y', weight=1)  # Dummy nodes
G.add_edge('Y', 'Z', weight=-1)  # Dummy nodes

G_solver = SignedPathSolver(G)
G_solver.source_dict = {'A': -1, 'B': -1, 'D': -1}
G_solver.target_dict = {'L': -1, 'N': -1, 'O': -1}
G_solver.draw_graph()
G_solver.shortest_signed_paths()
G_solver.get_subnetwork()
G_solver.filter_signed_consistent_paths()
G_solver.get_filtered_subnetwork()


In [None]:
G_solver.sscp_results[0]

{('A', 'L'): {'positive_path': [['A', 'B', 'J', 'K', 'L']],
  'positive_distance': 4,
  'negative_path': [],
  'negative_distance': 2},
 ('A', 'N'): {'positive_path': [],
  'positive_distance': 4,
  'negative_path': [['A', 'D', 'M', 'N']],
  'negative_distance': 3},
 ('A', 'O'): {'positive_path': [],
  'positive_distance': 5,
  'negative_path': [['A', 'D', 'M', 'N', 'O']],
  'negative_distance': 4},
 ('B', 'L'): {'positive_path': [['B', 'J', 'K', 'L']],
  'positive_distance': 3,
  'negative_path': [],
  'negative_distance': 2},
 ('B', 'N'): {'positive_path': [],
  'positive_distance': 4,
  'negative_path': [['B', 'J', 'K', 'L', 'M', 'N']],
  'negative_distance': 5},
 ('B', 'O'): {'positive_path': [],
  'positive_distance': 5,
  'negative_path': [['B', 'J', 'K', 'L', 'M', 'N', 'O']],
  'negative_distance': 6},
 ('D', 'L'): {'positive_path': [],
  'positive_distance': inf,
  'negative_path': [['D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L']],
  'negative_distance': 8},
 ('D', 'N'): {'positi

In [None]:

I = nx.DiGraph()

# Adding positive edges
I.add_edge('a', 'b', weight=-1)
I.add_edge('b', 'c', weight=-1)
I.add_edge('c', 'd', weight=1)

In [None]:
I_solver = SignedPathSolver(I)
I_solver.source_dict = {'a': -1}
I_solver.target_dict = {'d': -1}
I_solver.draw_graph()
I_solver.shortest_signed_paths()
I_solver.get_subnetwork()
I_solver.filter_signed_consistent_paths()
I_solver.get_filtered_subnetwork()

In [None]:
H = nx.DiGraph()

# Adding positive edges
H.add_edge('a', 'b', weight=-2)
H.add_edge('a', 'c', weight=1)
H.add_edge('c', 'b', weight=1)
H.add_edge('a', 'd', weight=1)
H.add_edge('d', 'b', weight=1)

In [None]:
H_solver = SignedPathSolver(H)
H_solver.source_dict = {'a': 1}
H_solver.target_dict = {'b': 1}
H_solver.draw_graph()
H_solver.shortest_signed_paths()
H_solver.get_subnetwork()
H_solver.filter_signed_consistent_paths()
H_solver.get_filtered_subnetwork()

In [None]:
H_solver.results

({('a', 'b'): {'positive_path': [['a', 'c', 'b'], ['a', 'd', 'b']],
   'positive_distance': 2,
   'negative_path': [['a', 'b']],
   'negative_distance': 2}},
   source interaction target
 0      a           P      c
 1      c           P      b
 2      a           P      d
 3      d           P      b
 4      a           N      b)

## Trynska

In [181]:
collectri = pd.read_csv('network_collectri.sif', sep='\t')
data_col = collectri.pop('data')
collectri.insert(2, 'data', data_col)
collectri.to_csv('collectri_network_sscp.sif', sep='\t', index=None)

In [192]:
target_df = pd.read_csv('downstream_hits.tsv', sep='\t', header=None)
target_df.drop(1, inplace=True, axis=1)
target_df

Unnamed: 0,0,2
0,CINP,+
1,ATP1B1,+
2,DNAJC12,+
3,SCCPDH,+
4,IDS,+
...,...,...
95,IKZF1,-
96,SDHD,+
97,RASGRP3,+
98,BIRC6,-


In [200]:
target_df[2].replace({'+': 1, '-': -1}, inplace=True)

target_dict = target_df.set_index(0)[2].to_dict()

In [326]:
H = nx.read_weighted_edgelist('collectri_network_sscp.sif', delimiter = '\t', create_using = nx.DiGraph)

In [331]:
# There are negative cycles in the PK that still need to be addressed. 

solver = SignedPathSolver(H)
'TGFB1', 'TGFB3', 'TGFB2', 'IL2'
solver.source_dict = {'TGFB1': 1, 'TGFB2': 1, 'TGFB3': 1, 'IL2': 1}
solver.target_dict = target_dict
solver.shortest_signed_paths()
solver.get_subnetwork()
solver.filter_signed_consistent_paths()
solver.get_filtered_subnetwork()

KeyboardInterrupt: 

In [328]:
negative_cycle = nx.find_cycle(solver.G)


In [329]:
negative_cycle

[('TRPC1', 'TRPC3'), ('TRPC3', 'TRPC1')]

In [240]:
H.get_edge_data('TRPC1', 'PKD2')

{'weight': 1.0}

In [241]:
H.get_edge_data('PKD2', 'TRPV4')

{'weight': 1.0}

In [245]:
H.remove_edge('TRPV6', 'TRPV5')