In [1]:
from typing import List, Tuple, Optional, Union
import json
import itertools
import graphviz
import os

# Defining Class

In [2]:
class User():
    def __init__(self, username):
        self.username = username

    def __str__(self):
        return self.username

In [3]:
class Graph(dict):
    """
    A graph object based on a dictionary implementation.

    Attributes:
    ----------
    sps: n*n (2-dimensional) matrix to store shortest paths between all nodes

    Methods:
    ----------
    add_vertex()
    """
    def __init__(self) -> None:
        """
        Initialize the Graph object as an empty dictionary.
        """
        super().__init__()
        self.sps = None  # Shortest path matrix, initialized as None

    def add_vertex(self, user: object) -> None:
        """
        Adds a vertex to the graph.

        Parameters:
        ----------
        user: The user object or identifier to be added as a vertex.
        """
        # Use the string representation of the user as the key
        self[user] = []

    def add_edge(self, origin: object, target: object) -> None:
        """
        Adds an edge to the graph between `origin` and `target`.

        Parameters:
        ----------
        origin: The originating vertex.
        target: The target vertex.
        """
        if origin in self.keys():
            self[origin].append(target)
            self[origin].sort(key=lambda user: user.username)
        else:
            self[origin] = [target]
        if target in self.keys():
            self[target].append(origin)
            self[origin].sort(key=lambda user: user.username)
        else:
            self[target] = [origin]

    def remove_edge(self, edge: Tuple[object, object]) -> None:
        """
        Removes an edge from the graph.

        Parameters:
        ----------
        edge: Tuple containing the vertices that form the edge.
        """
        self[edge[0]].remove(edge[1])
        self[edge[1]].remove(edge[0])

    def remove_vertex(self, user: object) -> None:
        """
        Removes a vertex and all its edges from the graph.

        Parameters:
        ----------
        user: The user object or identifier to be removed.
        """
        del self[user]

    def dfs(self, start: object) -> List[str]:
        """
        Depth-first search starting from `start` vertex.

        Parameters:
        ----------
        start: The starting vertex.

        Returns:
        ----------
        List of visited vertices.
        """
        visited = set()
        result = []
        stack = [start]

        while stack:
            node = stack.pop()
            if node not in visited:
                visited.add(node)
                result.append(node)
                stack.extend(reversed(self[node]))

        return result


    def get_subgraphs(self) -> List[List[str]]:
        """
        Finds disconnected subgraphs (clusters) in the graph.

        Returns:
        ----------
        List of subgraphs, where each subgraph is a list of vertices.
        """
        visited = set()
        subgraphs = []

        for vertex in self.keys():
            if vertex not in visited:
                # DFS returns all nodes connected to this vertex
                component = self.dfs(vertex)

                # Mark all nodes in this component as visited
                visited.update(component)

                # Add the component to the list of subgraphs
                subgraphs.append(component)

        return subgraphs


    def shortest_path(self, start: object, end: object) -> Union[List[str],None]:
        """
        Breadth-first search from `start` to `end` with path tracking to identify the shortest path.

        Parameters:
        ----------
        start: The starting vertex.
        end: The end vertex.

        Returns:
        ----------
        List of vertices forming the shortest path from start to end, or None if there is no path.
        """
        if start == end:
            return [start]

        visited = set([start])
        previous = {}          # child -> parent
        queue = [start]
        index = 0              # avoids pop(0)

        while index < len(queue):
            current = queue[index]
            index += 1

            for neighbor in self[current]:
                if neighbor not in visited:
                    visited.add(neighbor)
                    previous[neighbor] = current

                    # âœ… as soon as we reach `end`, reconstruct & return
                    if neighbor == end:
                        path = [end]
                        while path[-1] != start:
                            path.append(previous[path[-1]])
                        path.reverse()
                        return path

                    queue.append(neighbor)

        # No path found
        return []

    def most_influential(self) -> Tuple[str, float]:
        """
        Identifies the most influential user based on average shortest path length.

        Returns:
        ----------
        Tuple containing the most influential user and its average shortest path length.
        """
        # Initialize a matrix to store shortest paths between all possible node combinations
	    # (including those with 'None' values)

        # Populate the matrix with shortest paths between all possible node combinations

        # Calculate the average shortest path length for each node

        # Identify the most influential node based on minimum average shortest path length and return both
        min_closeness = 999
        list_influence = []
        for vertex_start in self.keys():
            list_shortest_path = []
            for vertex_end in self.keys():
                if vertex_start == vertex_end:
                    continue
                path = self.shortest_path(vertex_start, vertex_end)
                if path:
                    list_shortest_path.append(len(path))

            if not list_shortest_path:
                continue
            closeness = sum(list_shortest_path) / len(list_shortest_path)

            if closeness < min_closeness:
                list_influence = [(vertex_start, closeness)]
                min_closeness = closeness
            elif closeness == min_closeness:
                list_influence.append((vertex_start, closeness))

        return list_influence


    def edge_in_sp(self, pair: Tuple[str, str], sp: List[str]) -> bool:
        """
        Checks if an edge exists in the given shortest path.

        Parameters:
        ----------
        pair: Tuple containing the users that form the edge.
        sp: The shortest path, represented as a list of vertices.

        Returns:
        ----------
        Boolean value indicating the presence of the edge in the shortest path.
        """
        # Check if the shortest path exists or is too short to contain an edge

        # Create list of edge pairs in the shortest path

        # Check if the given edge pair is in the shortest path

    def compute_sps(self) -> None:
        """
        Computes shortest paths between every pair of nodes and stores them in `self.sps`.
        """
        # Create mappings from node keys to indices and vice versa

        # Initialize the shortest paths matrix with 'None' values

        # Populate the shortest paths matrix

    def edge_to_remove(self) -> Tuple[str, str]:
        """
        Identifies the edge to remove based on edge betweenness.

        Returns:
        ----------
        Tuple containing the vertices of the edge to remove.
        """
        # Generate all unique pairs of nodes (potential edges)

        # Initialize a data structure to keep track of shortest path appearances for an edge

        # Calculate total number of possible shortest paths

        # Calculate edge betweenness for each pair

        # Sort edges by betweenness and return the one with highest betweenness


    def girvan_newman_algorithm(self, clusters: int) -> List[List[str]]:
        """
        Applies the Girvan-Newman algorithm to decompose the graph into specified
        number of clusters (disconnected subgraphs).

        Pseudocode for the Girvan-Newman algorithm:
        -------------------------------------------
        1. Calculate the betweenness of all existing edges in the mastodon_network.
        2. Remove the edge with the highest betweenness.
        3. Calculate the number of disconnected subgraphs.
        4. Repeat steps 1-3 until the number of disconnected subgraphs equals the predefined number of clusters.

        Parameters:
        ----------
        clusters: The number of clusters to decompose the graph into.

        Returns:
        ----------
        List of clusters, where each cluster is a list of vertices.
        """
        # Get the initial count of disconnected subgraphs

        # Loop until we have the desired number of clusters

            # Compute shortest paths for all pairs of nodes

            # Identify the edge to be removed based on betweenness

            # Remove the identified edge

            # Update the number of disconnected subgraphs

        # Return the final clusters

    def parse_data(self, filepath: str = 'ressources/graph_52n.json') -> None:
        """
        Parses graph data from a JSON file and populates the graph.

        Parameters:
        ----------
        filepath: Path to the JSON file containing the graph data.
        """
        # Open and read the JSON file
        with open(filepath, 'r') as file:
            data = json.load(file)

        # Remove the first key-item pair from data (if applicable)
        if data:
            first_key = list(data.keys())[0]
            del data[first_key]

        # Iterate over the data to populate vertices and edges
        for key, neighbors in data.items():
            key_user = User(key)

            # Add vertex for the user represented by 'key'
            self.add_vertex(key_user)

            # Add edges between 'key' and its neighbors
            for neighbor in neighbors:
                neighbor_user = User(neighbor)
                self.add_edge(key_user, neighbor_user)

    def show(self) -> None:
        """
        Generates and displays a visual representation of the graph.
        """
        # Initialize a Graphviz graph
        graph = graphviz.Graph(format='png', strict=True, filename='')

        # Add nodes to the Graphviz graph
        for node in self.keys():
            graph.node(str(node), str(node))

        # Add edges to the Graphviz graph
        for node in self.keys():
            for target in self[node]:
                graph.edge(str(node), str(target))

        # Render the graph and create a PNG file
        graph.render()

        # Remove temporary files if needed
        os.remove('')


## Test

In [4]:
# Create users
marissa = User("Marissa")
sundar = User("Sundar")
tim = User("Tim")
mark = User("Mark")
elon = User("Elon")
adam = User("Adam")
jack = User("Jack")

emanuel = User("Emanuel")
olaf = User("Olaf")
joe = User("Joe")
rishi = User("Rishi")

brittany = User("Brittany")
stephanie = User("Stephanie")
serge = User("Serge")
mary = User("Mary")

g = Graph()

users = [
    marissa, sundar, tim, mark, elon, adam, jack,
    emanuel, olaf, joe, rishi,
    brittany, stephanie, serge, mary
]

for user in users:
    g.add_vertex(user)


# Add edges
g.add_edge(sundar, marissa)
g.add_edge(sundar, tim)
g.add_edge(sundar, mark)
g.add_edge(sundar, elon)
g.add_edge(sundar, adam)

g.add_edge(mark, elon)
g.add_edge(marissa, elon)

g.add_edge(elon, adam)
g.add_edge(adam, jack)

g.add_edge(sundar, emanuel)

g.add_edge(emanuel, olaf)
g.add_edge(emanuel, joe)
g.add_edge(emanuel, rishi)

g.add_edge(olaf, rishi)

g.add_edge(brittany, stephanie)
g.add_edge(brittany, serge)
g.add_edge(serge, mary)


## DFS

In [5]:
list_dfs = g.dfs(mark)

In [6]:
list_user = []
for i in range(len(list_dfs)):
    list_user.append(list_dfs[i].username)

In [7]:
list_user

['Mark',
 'Elon',
 'Adam',
 'Jack',
 'Sundar',
 'Emanuel',
 'Joe',
 'Olaf',
 'Rishi',
 'Marissa',
 'Tim']

## Subnetwork

In [8]:
subgraphs = g.get_subgraphs()

for i, cluster in enumerate(subgraphs, start=1):
    usernames = [user.username for user in cluster]
    print(f"Cluster {i}: {usernames}")


Cluster 1: ['Marissa', 'Elon', 'Adam', 'Jack', 'Sundar', 'Emanuel', 'Joe', 'Olaf', 'Rishi', 'Mark', 'Tim']
Cluster 2: ['Brittany', 'Serge', 'Mary', 'Stephanie']


## Shortest path

In [9]:
shortest_path = g.shortest_path(mark, jack)

In [10]:
list_user = []
for i in range(len(shortest_path)):
    list_user.append(shortest_path[i].username)

list_user

['Mark', 'Elon', 'Adam', 'Jack']

In [11]:
list_influencers = g.most_influential()

for user, score in list_influencers:
    print(user.username, round(score, 2))

Brittany 2.33
Serge 2.33
