In [1]:
from pywgraph import *

In [2]:
grafo = WeightedDirectedGraph.from_dict(
    {
        "A": {"B": 1, "F": 1},
        "B": {"C": 1, "A":1},
        "C": {"A":1, "D":1},
        "D": {"A":1, "E":1},
        "E": {"B":1, "F": 1},
        "F":{"E":1, "A":1}
    }
)

In [3]:
from functools import cached_property


def _cycle_representations(cycle: list[str]) -> list[list[str]]:
    return [cycle[-i:] + cycle[:-i] for i in range(len(cycle))]


def _canonic_representation(cycle: list[str]) -> list[str]:
    first = min(cycle)
    canonic_representation = [
        representation
        for representation in _cycle_representations(cycle)
        if representation[0] == first
    ][0]
    return canonic_representation


class Path(list[str]):
    """Class that represents a path"""

    def __init__(self, path: list[str]) -> None:
        if not path:
            raise ValueError("A path can not be empty")
        if any(path[i] == path[i + 1] for i in range(len(path) - 1)):
            raise ValueError("A path can not contain equal consecutive elements")
        super().__init__(path)
        self._path = path

    @property
    def is_cycle(self) -> bool:
        return self[0] == self[-1]

    def __len__(self) -> int:
        return super().__len__() - 1

    def __hash__(self) -> int:  # type: ignore
        return hash(
            tuple(self)
        )  # Mypy don't allow to override hash for list since lists are not hashable

    def __bool__(self) -> bool:
        return True


class Cycle(Path):

    def __init__(self, cycle: list[str]) -> None:
        if cycle[0] != cycle[-1]:
            raise ValueError("A cycle must start and end with the same element")
        super().__init__(cycle)

        if len(cycle) == 1:
            self._clean_cycle = cycle
        else: 
            self._clean_cycle = cycle[:-1]

    @cached_property
    def canonic_representation(self) -> list[str]:
        return _canonic_representation(self._clean_cycle)

    @cached_property
    def equivalent_representations(self) -> list[list[str]]:
        return _cycle_representations(self._clean_cycle)

    @classmethod
    def from_path(cls, path: Path) -> "Cycle":
        if path.is_cycle:
            return cls(path._path)
        else:
            raise ValueError("The path is not a cycle")

    def __hash__(self) -> int:  # type: ignore
        return hash(
            tuple(self.canonic_representation)
        )  # Mypy don't allow to override hash for list since lists are not hashable

    def __eq__(self, other: object) -> bool:
        other_ = other
        if isinstance(other, Path):
            other_ = Cycle.from_path(other)
        if isinstance(other_, Cycle):
            if len(other_) != len(self):
                return False
            return other_.canonic_representation == self.canonic_representation
        return False

    def __repr__(self) -> str:
        return f"Cycle({self.canonic_representation})"


class PathExplorer:
    """Auxiliary object to help in the searching of paths on a graph"""

    def __init__(self, path: Path, visitations: dict[str, int] = {}) -> None:
        self._path = path
        self._visitations = visitations

    @property
    def path(self) -> Path:
        return self._path

    @property
    def visitations(self) -> dict[str, int]:
        return self._visitations

    def __hash__(self) -> int:
        hash_list = hash(tuple(self.path))
        hash_dict = hash(
            tuple(sorted(list(self.visitations.items()), key=lambda x: x[0]))
        )
        return hash_list ^ hash_dict

    def __eq__(self, other: object) -> bool:
        if isinstance(other, PathExplorer):
            return (self.path, self.visitations) == (other.path, other.visitations)
        return False

    def __len__(self) -> int:
        return len(self.path)

    def __repr__(self) -> str:
        return f"PathExplorer({self.path}, {self.visitations})"

In [4]:
def iter_aux(
    grafo: WeightedDirectedGraph,
    explorer: PathExplorer,
    target: str,
    general_max_visitations: int,
    specific_max_visitations: dict[str, int],
) -> tuple[Path, list[PathExplorer]]:
    current_node = explorer.path[-1]
    visitations = explorer.visitations.copy()
    visitations[current_node] = visitations.get(current_node, 0) + 1
    current_node_vistiations = visitations[current_node]
    current_node_max_vistiations = specific_max_visitations.get(
        current_node, general_max_visitations
    )
    found_path = [] 
    new_explorers = []

    if current_node_vistiations > current_node_max_vistiations:
        return found_path, new_explorers
    
    if current_node == target:
        found_path = [explorer.path]

    children = grafo.children(current_node)
    forbidden_nodes = {
        node
        for node, rep in visitations.items()
        if rep >= specific_max_visitations.get(node, general_max_visitations)
    }
    unexplored_nodes = children - forbidden_nodes
    if not unexplored_nodes:
        return found_path, new_explorers

    new_explorers = [
        PathExplorer(Path(explorer.path + [node]), visitations)
        for node in unexplored_nodes
    ]
    return found_path, new_explorers

In [6]:
bb = [Cycle.from_path(a) for a in aa]
bb

NameError: name 'aa' is not defined

In [None]:
grafo.get_node_cycles("A") - {Cycle.from_path(x) for x in aa}


In [None]:
sorted(list(grafo.get_node_cycles("A")), key=len)

In [None]:
class PathExplorer(list[str]):

    def __init__(self, path: list[str] = [], visited: set[str] = set()):
        self._path = path
        self._visited = visited

    @property
    def path(self) -> list[str]:
        return self._path
    
    @property
    def visited(self) -> set[str]:
        return self._visited
    
    def __hash__(self) -> int:
        return hash((tuple(self.path), tuple(self.visited)))
    
    def __eq__(self, other: object) -> bool:
        if isinstance(other, PathExplorer):
            return (self.path, self.visited) == (other.path, other.visited)
        return False
    
    def __repr__(self) -> str:
        return f"PathExplorer({self._path}, {self._visited})"
    
    def __len__(self) -> int:
        return len(self._path)

In [None]:
def _cycle_aux(
    grafo: WeightedDirectedGraph, explorer: PathExplorer, target: str
) -> tuple[list, str] | tuple[Cycle, str]:
    current_node = explorer.path[-1]
    if current_node == target:
        return Cycle(explorer.path[:-1]), "cycle"
    
    children = grafo.children(current_node)
    unexplored_nodes = children - explorer.visited
    if not unexplored_nodes:
        return [], "dead explorer"
    
    updated_visited = explorer.visited | {current_node}
    new_explorers = [
        PathExplorer(explorer.path + [node], updated_visited)
        for node in unexplored_nodes
    ]
    return new_explorers, "continue"

In [None]:
def _get_node_cycles(grafo: WeightedDirectedGraph, start: str) -> list[list[str]]:
    first_children = grafo.children(start)
    if not first_children:
        return []

    explorers: list[PathExplorer] = [
        PathExplorer([start, child]) for child in first_children
    ]
    cycles: list[Cycle] = []

    while explorers: 
        result, state = _cycle_aux(grafo, explorers.pop(0), start)                              
        if state == "cycle":
            cycles.append(result)
        elif state == "continue":
            new_explorers = list(set(result) - set(explorers))
            explorers.extend(new_explorers)

    return cycles

In [None]:
def _path_aux(
    grafo: WeightedDirectedGraph, explorer: PathExplorer, target: str
) -> tuple[list, str] | tuple[Cycle, str]:
    current_node = explorer.path[-1]
    if current_node == target:
        return explorer.path, "path"
    
    children = grafo.children(current_node)
    unexplored_nodes = children - explorer.visited
    if not unexplored_nodes:
        return [], "dead explorer"
    
    updated_visited = explorer.visited | {current_node}
    new_explorers = [
        PathExplorer(explorer.path + [node], updated_visited)
        for node in unexplored_nodes
    ]
    return new_explorers, "continue"

In [None]:
def _find_all_paths(grafo: WeightedDirectedGraph, start: str, end: str) -> list[list[str]]:
    first_children = grafo.children(start)
    if not first_children:
        return []

    explorers: list[PathExplorer] = [
        PathExplorer([start, child]) for child in first_children
    ]
    all_paths: list[list[str]] = []

    while explorers:
        result, state = _path_aux(grafo, explorers.pop(0), end)                  
        if state == "path":
            all_paths.append(result)
        elif state == "continue":
            new_explorers = list(set(result) - set(explorers))
            explorers.extend(new_explorers)

    return all_paths

In [None]:
grafo.get_node_cycles("A")

In [None]:
_find_all_paths(grafo, "A", "F")

In [None]:
grafo.get_node_cycles("B")

In [None]:
grafo.get_node_cycles("C")

In [None]:
grafo.get_node_cycles("D")

In [None]:
grafo.get_node_cycles("E")

In [None]:
grafo.get_node_cycles("F")

In [None]:
grafo.cycles

In [None]:
def vector_group_multiplication() -> Group:
    group = Group(
        name="Vectors of dimension 2 with multiplication",
        identity=np.ones(2),
        operation=lambda x, y: x * y,
        inverse_function=lambda x: 1 / x,
        hash_function=lambda x: hash(tuple(x)),
    )
    return group


def vector_group_addition() -> Group:
    group = Group(
        name="Vectors of dimension 2 with addition",
        identity=np.zeros(2),
        operation=lambda x, y: x + y,
        inverse_function=lambda x: -x,
        hash_function=lambda x: hash(tuple(x)),
    )
    return group


_array_dict_graph: dict[str, dict[str, Any]] = {
    "A": {
        "B": np.array([1.0, 2.0]),
        "C": np.array([3.0, 4.0]),
    },
    "B": {
        "C": np.array([-5.0, 1.3]),
        "D": np.array([2.0, 1.0]),
    },
    "C": {
        "D": np.array([-1.0, 1.0]),
    },
    "D": {},
    "Z": {},
}


def multiplication_graph() -> WeightedDirectedGraph:
    graph = WeightedDirectedGraph.from_dict(
        _array_dict_graph, vector_group_multiplication()
    )
    return graph


def addition_graph() -> WeightedDirectedGraph:
    graph = WeightedDirectedGraph.from_dict(_array_dict_graph, vector_group_addition())
    return graph

In [None]:
addition_graph().add_reverse_edges().is_conmutative

In [None]:
complete_graph = addition_graph().add_reverse_edges()
group = complete_graph.group
identity = group.identity
bad = [
    cycle
    for cycle in complete_graph.cycles
    if not group.equal(complete_graph.path_weight(cycle), identity)
]
bad 

In [None]:
[
    complete_graph.path_weight(cycle)
    for cycle in bad
]

In [None]:
complete_graph

In [None]:
todos = set()
for node in complete_graph.nodes: 
    todos.update(complete_graph.get_node_cycles(node))

todos

In [None]:
[cycle for cycle in complete_graph.cycles]

In [None]:
grafo = WeightedDirectedGraph.from_dict(
    {
        "A": {"B": np.array([1, 2.0]), "C": np.array([3.0, 4.0])},
        "B": {"C": np.array([2.0, 2.0])},
        "C": {},
    },
    vector_group_addition(),
)

In [None]:
grafo.add_reverse_edges().cycles