# Domain Definitions

In [4]:
import pandas as pd
import random
from dataclasses import dataclass
from numpy.random import default_rng
from numpy.random import Generator
from pathlib import Path
from typing import Optional, List, Set, Dict, Tuple

In [5]:
@dataclass(frozen=True)
class Item:
    name: str
    value: float
    weight: int
    time_to_steal: int

    @property
    def tts(self):
        return self.time_to_steal

In [6]:
@dataclass(frozen=False)
class Node:
    name: str
    item: Item

    def __init__(self, name: str):
        self.name = name
        self.item = None

    def with_item(self, name: str, value: float, weight, tts: int) -> 'Node':
        return self._with_item(Item(name, value, weight, time_to_steal=tts))

    def _with_item(self, item: Item) -> 'Node':
        self.item = item
        return self

    def to_dict(self):
        try:
            return {
                'name': self.name,
                'item': self.item.name,
                'value': self.item.value,
                'weight': self.item.weight,
                'tts': self.item.tts
            }
        except Exception:
            return {
                'name': self.name,
                'item': None,
            }

    def __hash__(self) -> int:
        return self.name.__hash__()

In [7]:
@dataclass(frozen=True)
class Edge:
    source: Node
    destination: Node
    travelling_time: int
    travelling_cost: float

    @property
    def time(self):
        return self.travelling_time

    @property
    def cost(self):
        return self.travelling_cost

    def to_dict(self):
        return {
            'source': self.source.name,
            'destination': self.destination.name,
            'travelling_time': self.time,
            'travelling_cost': self.cost
        }

In [8]:
class Graph:
    nodes: Dict[str, Node]
    edges: Dict[Tuple[Node, Node], Edge]
    connections: Dict[Node, Set[Edge]]

    def __init__(self):
        self.nodes = dict()
        self.edges = dict()
        self.connections = dict()

    def get_node(self, name: str) -> Optional[Node]:
        return self.nodes.get(name)

    def get_all_nodes(self) -> List[Node]:
        return list(self.nodes.values())

    def get_all_edges(self) -> List[Edge]:
        return list(self.edges.values())

    def get_connections_from_node(self, node: Node) -> List[Edge]:
        return self.connections.get(node, {})

    def get_connections_from(self, node_name: str) -> List[Edge]:
        node: Optional[Node] = self.get_node(node_name)
        return self.get_connections_from_node(node)

    def find_connection_between(self, src: Node, dst: Node) -> Optional[Edge]:
        return self.edges.get((src, dst), None)

    def add_edge(self, src: str, dst: str, tt: int, tc: float) -> 'Graph':
        source = self.nodes.get(src, Node(src))
        destination = self.nodes.get(dst, Node(dst))
        edge = Edge(source, destination, travelling_time=tt, travelling_cost=tc)
        self._add_node(source)._add_node(destination)
        self.edges[(source, destination)] = edge
        self.connections[source] = self.connections.get(source, []) + [edge]
        return self

    def add_node(self, node_name: str) -> 'Graph':
        self.nodes[node_name] = Node(node_name)
        return self

    def _add_node(self, node: Node) -> 'Graph':
        self.nodes[node.name] = node
        return self

# Graph Construction

In [17]:
datasets_dir = Path().absolute().joinpath('datasets')
travelling_map = f"{datasets_dir}/travelling_map.csv"
treasure_map = f"{datasets_dir}/treasure_map.csv"

In [20]:
travels = pd.read_csv(travelling_map)
treasures = pd.read_csv(treasure_map)

In [21]:
g = Graph()

In [22]:
travels.apply(lambda row: g.add_edge(src=row['source'],
                                     dst=row['destination'],
                                     tt=row['travelling_time'],
                                     tc=row['travelling_cost'])
                           .add_edge(src=row['destination'],
                                     dst=row['source'],
                                     tt=row['travelling_time'],
                                     tc=row['travelling_cost']), axis=1)

treasures.apply(lambda row: g.get_node(row['city'])
                             .with_item(row['name'],
                                        row['value'],
                                        row['weight'],
                                        row['tts']), axis=1);

# Problem Solving 

In [24]:
@dataclass(frozen=False)
class Individual:
    genes: List[Node]
    extra_genes: List[Node]
    total_income: float
    travelling_cost: float
    traveling_time: int
    time_to_steal: int
    carry_weight: int
    generation: int

    @property
    def profit(self):
        return self.total_income - self.travelling_cost

    @property
    def total_time(self):
        return self.traveling_time + self.time_to_steal

    @property
    def score(self):
        return self.profit/self.total_time

    def pretty_print(self):
        names: List[str] = [gene.name for gene in self.genes]
        path: str = " -> ".join(names)
        print(f"""
        Path: {path}
        Profit: {self.profit}
        Total time: {self.total_time}
        Score: {self.score}
        Generation: {self.generation}""")

    def __eq__(self, other):
        try:
            return self.genes == other.genes
        except Exception:
            return False

    def __hash__(self):
        return sum([n.__hash__() for n in self.genes])

In [25]:
class GeneticAlgorithm:
    graph: Graph
    genes: Set[Node]
    max_hours: int
    max_weight: int
    rng: Generator

    def __init__(self, graph: Graph, with_max_hours: int, with_max_weight: int):
        self.graph = graph
        self.genes = list(graph.get_all_nodes())
        self.max_hours = with_max_hours
        self.max_weight = with_max_weight
        self.rng = default_rng()

    def generate_initial_batch(self, size: int = 10) -> List[Individual]:
        length = len(self.genes)
        return [self._generate_individual(length) for i in range(size)]

    def _generate_individual(self, number_of_genes: int):
        while True:
            genes, extra_genes = self._generate_individual_genes(number_of_genes)
            indv: Individual = self.generate_individual_with(genes=genes, extra_genes=extra_genes)

            if indv == None:
                continue

            if self.fitness(indv) > 0:
                return indv

    def _generate_individual_genes(self, number: int) -> Tuple[List[Node], List[Node]]:
        indexes: List[int] = self.rng.choice(number, size=number, replace=False)
        genes: List[Node] = [self.genes[idx] for idx in indexes]

        starting_point: Node = self.graph.get_node("Escondidos")
        endpoint_idx: int = genes.index(starting_point)

        individual_genes: List[Node] = [starting_point] + genes[0:endpoint_idx+1]
        extra_genes: List[Node] = genes[endpoint_idx+1:]
        return individual_genes, extra_genes

    def generate_individual_with(self, genes, extra_genes, generation: int = 0):
        tuples: List[Tuple[Node, Node]] = [(genes[idx], genes[num]) for idx, num in enumerate(range(1, len(genes)))]
        connections: List[Edge] = [self.graph.find_connection_between(src, dest) for src, dest in tuples if (src != dest)]

        if len(connections) == 0:
            return None

        travelling_cost: float = 0.0
        travelling_time: int = 0
        total_income: float = 0.0
        time_to_steal: int = 0
        total_weight: int = 0

        for cx in connections:
            travelling_cost += cx.travelling_cost
            travelling_time += cx.travelling_time
            total_income += cx.destination.to_dict().get('value', 0)
            time_to_steal += cx.destination.to_dict().get('tts', 0)
            total_weight += cx.destination.to_dict().get('weight', 0)

        indv = Individual(genes, total_income=total_income, time_to_steal=time_to_steal,
                          travelling_cost=travelling_cost, traveling_time=travelling_time,
                          carry_weight=total_weight, extra_genes=extra_genes,
                          generation=generation)
        return indv


    def fitness(self, indv: Individual) -> int:
        if indv.carry_weight > self.max_weight or indv.total_time > self.max_hours:
            return 0

        return indv.score

    def mutate(self, indv: Individual) -> Optional[Individual]:
        left: List[Node] = list(indv.genes)
        right: List[Node] = list(indv.extra_genes)

        left_idx: int = random.randint(1, len(left)-2)
        right_idx: int = random.randint(0, len(right)-1)

        tmp = left[left_idx]
        left[left_idx] = right[right_idx]
        right[right_idx] = tmp

        new_indv = self.generate_individual_with(genes=left, 
                                                 extra_genes=right, 
                                                 generation=indv.generation+1)
        if self.fitness(new_indv) > 0:
            return new_indv

        return None


    def cross_over(self, indvA: Individual, indvB: Individual) -> List[Individual]:
        if indvA is None or indvB is None:
            return []

        gene = [self.graph.get_node('Escondidos')]
        next_gen = max(indvA.generation, indvB.generation)+1

        indvC = self.generate_individual_with(genes=list(gene + indvB.extra_genes + gene),
                                              extra_genes=list(indvA.genes[1:-1]),
                                              generation = next_gen)

        indvD = self.generate_individual_with(genes=list(gene + indvA.extra_genes + gene),
                                              extra_genes=list(indvA.genes[1:-1]), 
                                              generation = next_gen)

        cross_over_population = list()

        if self.fitness(indvC) > 0:
            cross_over_population.append(indvC)

        if self.fitness(indvD) > 0:
            cross_over_population.append(indvC)

        return cross_over_population






In [26]:
ga = GeneticAlgorithm(g, with_max_hours=72, with_max_weight=20)

In [27]:
population = ga.generate_initial_batch(size=100)

In [28]:
for i in range(10):
    new_population = list()

    for idx, indv in enumerate(population[:-1]):
        a = indv
        b = population[idx+1]
        a_mut = ga.mutate(a)
        b_mut = ga.mutate(b)

        new_population += ga.cross_over(a, b)
        new_population += ga.cross_over(a, b_mut)
        new_population += ga.cross_over(a_mut, b)
        new_population += ga.cross_over(a_mut, b_mut)

    population += new_population
    population = sorted(set(population), key=lambda indv: indv.score, reverse=True)

In [29]:
len(population)

785

In [30]:
for p in population[0:100]:
    p.pretty_print()


        Path: Escondidos -> Riacho de Fevereiro -> Escondidos
        Profit: 6870.0
        Total time: 8
        Score: 858.75
        Generation: 0

        Path: Escondidos -> Leão -> Guardião -> Ponte-do-Sol -> Campos -> Granada -> Porto -> Riacho de Fevereiro -> Escondidos
        Profit: 25472.0
        Total time: 41
        Score: 621.2682926829268
        Generation: 9

        Path: Escondidos -> Leão -> Guardião -> Santa Paula -> Campos -> Granada -> Porto -> Riacho de Fevereiro -> Escondidos
        Profit: 33201.0
        Total time: 56
        Score: 592.875
        Generation: 4

        Path: Escondidos -> Riacho de Fevereiro -> Lagos -> Campos -> Escondidos
        Profit: 15716.0
        Total time: 29
        Score: 541.9310344827586
        Generation: 12

        Path: Escondidos -> Limões -> Limões -> Riacho de Fevereiro -> Porto -> Granada -> Campos -> Lagos -> Leão -> Escondidos
        Profit: 27517.0
        Total time: 51
        Score: 539.5490196078431
  