<a href="https://colab.research.google.com/github/vladlead5/Scooter/blob/main/Scooter.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Система навигации курьеров по замене батарей

In [None]:
# !pip install osmnx networkx

Collecting osmnx
  Downloading osmnx-1.9.3-py3-none-any.whl (107 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m107.2/107.2 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: osmnx
Successfully installed osmnx-1.9.3


## Импорт библиотек

In [None]:
import json
import random
import numpy as np
import osmnx as ox
import networkx as nx
from tqdm import tqdm
import matplotlib.pyplot as plt

## Загрузка графа центрального района Москвы

In [None]:
district_name = "Central Administrative Okrug, Moscow, Russia"
G = ox.graph_from_place(district_name, network_type='walk')
G = G.to_undirected()
ox.save_graphml(G, filepath='central_moscow.graphml')

## Подсчёт расстояний

In [None]:
G = ox.load_graphml('central_moscow.graphml')

nodes = G.nodes

SCOOTERS_CNT = 150
STATIONS_CNT = 10

scooter_nodes = np.random.choice(G.nodes, SCOOTERS_CNT, replace=False)
station_nodes = np.random.choice(np.setdiff1d(G.nodes, scooter_nodes), STATIONS_CNT, replace=False)

np.savetxt('scooter_nodes.txt', scooter_nodes)
np.savetxt('station_nodes.txt', station_nodes)


all_nodes = list(station_nodes) + list(scooter_nodes)

graph_dist = [[float('inf') for i in range(len(all_nodes))] for j in range(len(all_nodes))]

for i in tqdm(range(len(all_nodes))):
    for j in range(i + 1, len(all_nodes)):
        dist = nx.shortest_path_length(G, source=all_nodes[i], target=all_nodes[j], weight='length')
        graph_dist[i][j] = dist
        graph_dist[j][i] = dist

with open('graph_dist.json', 'w') as file:
    data = dict()

    data['graph_dist'] = graph_dist

    json.dump(data, file)

## Реализация муравьиного алгоритма для нахождения приближённых решений задачи коммивояжёры

In [None]:


class Graph():
    def __init__(self, nodes, distance, default_pheromone_level = None):
        self.nodes = nodes
        self.distance = distance

        assert distance.shape[1] == distance.shape[0]
        if default_pheromone_level:
            self.intensity = np.full_like(distance, default_pheromone_level).astype('float64')
        else:
            self.intensity = np.full_like(distance, self.distance.mean()*10).astype('float64')


    def __str__(self):
        return f'nodes: {str(self.nodes)}\n{self.distance}\n{self.intensity}'

alpha = 0.9
beta = 1.5

def cycle_length(g, cycle):
    length = 0
    i = 0
    while i < len(cycle) -1:
        length += g.distance[cycle[i]][cycle[i+1]]
        i+=1
    length+= g.distance[cycle[i]][cycle[0]]
    return length


def ant_colony_optimization(g, verbose=True, iterations = 100, ants_per_iteration = 50, q = None, degradation_factor = .9, use_inertia = False, run_experiment_break=False, run_experiment_artificial_good_cycle=False, scooters_cnt=150, stations_cnt=10):
    total_ants = 0

    if q is None:
        q = g.distance.mean()

    scooters_power = np.random.randint(10, 50, scooters_cnt)

    best_area_power = sum(scooters_power) / len(scooters_power)

    best_cycle = None
    best_length = float('inf')

    for iteration in range(iterations):
        print(f'iteration {iteration} \n' if (verbose and iteration%50==0) else '', end='')
        print(f'best weight so far: {round(best_length,2)}\n' if (verbose and iteration%50==0) else '', end='')
        print(f'average intensity {g.intensity.mean()}\n' if (verbose and iteration%50==0) else '', end='')

        cycles = [traverse_graph(g, 11, scooters_power, stations_cnt) for _ in range(ants_per_iteration)]

        cycles.sort(key = lambda x: x[1])
        cycles = cycles[: ants_per_iteration//2]
        total_ants+=ants_per_iteration

        if best_cycle: #elitism
            cycles.append((best_cycle, best_length, best_area_power))

            if use_inertia:
                old_best = best_length

        for cycle, total_length, area_p in cycles:
            if total_length < best_length:
                best_length = total_length
                best_cycle = cycle
                best_area_power = area_p


            delta = q/total_length

            i = 0

            while i < len(cycle) -1:
                g.intensity[cycle[i]][cycle[i+1]]+= delta
                i+=1
            g.intensity[cycle[i]][cycle[0]] += delta
            g.intensity *= degradation_factor


    return best_cycle, sum(scooters_power) / len(scooters_power), best_area_power


def calculate_denominator(g, from_node, remaining_nodes):
    denominator = 0

    for node in remaining_nodes:
        denominator += max(g.intensity[from_node][node], 1e-5) ** alpha * (1 / g.distance[from_node][node]) ** beta

    return denominator

def traverse_graph(g, source_node, scooters_p, stations_cnt):
    visited = np.asarray([1 for _ in range(g.nodes)])
    visited[source_node] = 0

    scooters_power = [elem for elem in scooters_p]

    cycle = [source_node]
    steps = 0
    current = source_node
    remaining_nodes = set([i for i in range(stations_cnt, g.nodes) if i != source_node])
    remaining_stations = set([i for i in range(stations_cnt)])
    area_power = 0.4
    cur_accums = 20

    while area_power < 80:

        jumps_neighbors = []
        jumps_values = []

        if cur_accums > 0:


            for node in range(stations_cnt, g.nodes):
                if visited[node] != 0:
                    sediment = max(g.intensity[current][node], 1e-5)
                    v = (sediment**0.9 ) / (g.distance[current][node]**1.5)
                    jumps_neighbors.append(node)
                    jumps_values.append(v)
        else:


            for node in range(stations_cnt):
                if visited[node] != 0:
                    sediment = max(g.intensity[current][node], 1e-5)
                    v = (sediment**0.9 ) / (g.distance[current][node]**1.5)
                    jumps_neighbors.append(node)
                    jumps_values.append(v)


        next_node = random.choices(jumps_neighbors, weights = jumps_values)[0]


        visited[next_node] = 0
        current = next_node

        if cur_accums > 0:
            scooters_power[current - stations_cnt] = 100
            remaining_nodes.remove(current)
            area_power = sum(scooters_power) / len(scooters_power)
            cur_accums -= 1
        else:
            remaining_stations.remove(current)
            cur_accums = 20

        cycle.append(current)
        steps+=1

    total_length = cycle_length(g, cycle)
    assert len(list(set(cycle))) == len(cycle)
    return cycle, total_length, area_power


## Описание

In [None]:
distance = []

with open('graph_dist.json') as file:
    data = json.load(file)
    distance = data['graph_dist']


to_add_stations = 5
to_add_scooters = 10

distance = np.asarray(distance)

def add_scooters_and_stations(num_scooters, num_stations):
    res = distance

    res = np.delete(res, [el for el in range(10 + num_stations, 20)], axis=1)
    res = np.delete(res, [el for el in range(10 + num_stations, 20)], axis=0)

    res = np.delete(res, [el for el in range(170 + num_scooters - (20 - (10 + num_stations)), 190 - (20 - (10 + num_stations)))], axis=1)
    res = np.delete(res, [el for el in range(170 + num_scooters - (20 - (10 + num_stations)), 190 - (20 - (10 + num_stations)))], axis=0)

    return res


cur = add_scooters_and_stations(to_add_scooters, to_add_stations)

cur[np.isinf(cur)] = 0

graph = Graph(160 + to_add_scooters + to_add_stations, cur)

solution, prev_area_power, cur_area_power = ant_colony_optimization(graph, iterations=50, scooters_cnt=150+ to_add_scooters, stations_cnt=10+to_add_stations)

solution += [11]

G = ox.load_graphml('central_moscow.graphml')

stations_nodes = np.loadtxt('station_nodes.txt').astype(np.int64)
scooters_nodes = np.loadtxt('scooter_nodes.txt').astype(np.int64)

stations_nodes = stations_nodes[:10 + to_add_stations]
scooters_nodes = scooters_nodes[:150 + to_add_scooters]

nodes_to_calculate = list(stations_nodes) + list(scooters_nodes)

positions = {node: (G.nodes[node]['x'], G.nodes[node]['y']) for node in nodes_to_calculate}

plt.figure(figsize=(12, 10))

ox.plot_graph(G, node_size=30, edge_linewidth=1, show=False, close=False)

plt.scatter(
    [positions[node][0] for node in scooters_nodes],
    [positions[node][1] for node in scooters_nodes],
    c='blue', s=50, label='Самокаты'
)
plt.scatter(
    [positions[node][0] for node in stations_nodes],
    [positions[node][1] for node in stations_nodes],
    c='green', s=50, label='Зарядные станции'
)

plt.scatter(
    [positions[nodes_to_calculate[11]][0]],
    [positions[nodes_to_calculate[11]][1]],
    c='yellow', s=50, label='Точка старта'
)

for i in range(len(solution) - 1):
    plt.plot(
        [G.nodes[nodes_to_calculate[solution[i]]]['x'], G.nodes[nodes_to_calculate[solution[i + 1]]]['x']],
        [G.nodes[nodes_to_calculate[solution[i]]]['y'], G.nodes[nodes_to_calculate[solution[i + 1]]]['y']],
        c='red', alpha=0.6
    )

    plt.text((G.nodes[nodes_to_calculate[solution[i]]]['x'] + G.nodes[nodes_to_calculate[solution[i + 1]]]['x']) /2, (G.nodes[nodes_to_calculate[solution[i]]]['y'] + G.nodes[nodes_to_calculate[solution[i + 1]]]['y']) / 2, str(i + 1), fontsize=7, ha='center', va='center')

with open('result.txt', 'w') as file_out:
    file_out.write('Before area_power:\n')
    file_out.write(f'{prev_area_power}%\n')
    file_out.write('After area_power:\n')
    file_out.write(f'{cur_area_power}%\n')
    file_out.write('Best cycle:\n')
    file_out.write(f"[{','.join(map(str, solution))}]\n")
    file_out.write('Length:\n')
    file_out.write(str(cycle_length(graph, solution[:-1])))


plt.legend()
plt.title("Граф района с объектами")
plt.show()
