In [1]:
from heapq import heappush, heappop, heapify  
from math import radians, cos, sin, asin, sqrt 

class MinHeap: 
      
    # Constructor to initialize a heap 
    def __init__(self): 
        self.heap = []  
  
    def parent(self, i): 
        return (i-1)//2
      
    # Inserts a new key 'k' 
    def insertKey(self, k): 
        heappush(self.heap, k)            
  
    # Decrease value of key at index 'i' to new_val 
    # It is assumed that new_val is smaller than heap[i] 
    def decreaseKey(self, i, new_val): 
        self.heap[i][0] = new_val
        #print(self.heap[self.parent(i)][0])
        while(i != 0 and self.heap[self.parent(i)][0] > self.heap[i][0]): 
            # Swap heap[i] with heap[parent(i)] 
            self.heap[i][0] , self.heap[self.parent(i)][0] = self.heap[self.parent(i)][0], self.heap[i][0]
            self.heap[i][1] , self.heap[self.parent(i)][1] = self.heap[self.parent(i)][1], self.heap[i][1]
              
    # Method to remove minium element from min heap 
    def extractMin(self): 
        return heappop(self.heap) 
  
    # This functon deletes key at index i. It first reduces 
    # value to minus infinite and then calls extractMin() 
    def deleteKey(self, i): 
        self.decreaseKey(i, float("-inf")) 
        self.extractMin() 
  
    # Get the minimum element from the heap 
    def getMin(self): 
        return self.heap[0] 

In [2]:
from typing import Tuple, Optional

def dijkstra(adj_list: dict, start_vertex: int, weights: Optional[dict] = None) -> Tuple[dict]:
    counter = 0
    curr_num = 0
    dist = {}
    preds = {}
    heap = MinHeap()
    if weights == None:
        weights = {}
        no_weights = True
    else:
        no_weights = False
    for key in adj_list.keys():
        if key == start_vertex:
            dist[key] = 0
            preds[key] = None
        else:
            dist[key] = float("inf")
            preds[key] = None
        if no_weights:
            weights[key] = 1
        heap.insertKey([dist[key], key])
    while len(heap.heap):
        distance, vertex_id = heap.extractMin()
        if distance > dist[vertex_id]:
            continue
        neighs = adj_list[vertex_id]
        for neigh in neighs.keys():
            edge_dist = adj_list[vertex_id][neigh][0]['length'] * weights[neigh]
            if edge_dist + distance < dist[neigh]:
                dist[neigh] = edge_dist + distance
                preds[neigh] = vertex_id
                heap.insertKey([edge_dist + distance, neigh])
    return (dist, preds)
        
        
    
        
    

In [3]:
def nearest_list_for_list(adj_list: dict, list1: list, list2: list, weights: Optional[dict] = None) -> dict:
    nearest = {}
    for obj in list1:
        distances, _ = dijkstra(adj_list, obj, weights)
        min_ = float('inf')
        min_id = -1
        for obj2 in list2:
            if distances[obj2] < min_:
                min_id = obj2
                min_ = distances[obj2]
        nearest[obj] = (min_, min_id)
    return nearest


def nearest_fwd_bwd_list_for_list(adj_list: dict, list1: list, list2: list, weights: Optional[dict] = None) -> dict:
    nearest = {}
    distances_fwd = {}
    distances_bwd = {}
    for obj in list1:
        distances_fwd[obj], _ = dijkstra(adj_list, obj, weights)
    for obj in list2:
        distances_bwd[obj], _ = dijkstra(adj_list, obj, weights)

    for obj in list1:
        min_ = float("inf")
        min_id = -1
        for obj2 in list2:
            if distances_fwd[obj][obj2] + distances_bwd[obj2][obj] < min_:
                min_ = distances_fwd[obj][obj2] + distances_bwd[obj2][obj]
                min_id = obj2
        nearest[obj] = (min_, min_id)
    return nearest

def nearest_bwd_list_for_list(adj_list: dict, list1: list, list2: list, weights: Optional[dict] = None) -> dict:
    nearest = {}
    distances_bwd = {}
    for obj in list2:
        distances_bwd[obj], _ = dijkstra(adj_list, obj, weights)

    for obj in list1:
        min_ = float("inf")
        min_id = -1
        for obj2 in list2:
            if distances_bwd[obj2][obj] < min_:
                min_ = distances_bwd[obj2][obj]
                min_id = obj2
        nearest[obj] = (min_, min_id)
    return nearest

def distances_fwd(adj_list: dict, list1: list, list2: list, weights: Optional[dict] = None) -> Tuple[dict]:
    distances = {}
    preds = {}
    for obj in list1:
        distances[obj], preds[obj] = dijkstra(adj_list, obj, weights)
    return (distances, preds)

def distances_bwd(adj_list: dict, list1: list, list2: list, weights: Optional[dict] = None) -> Tuple[dict]:
    distances = {}
    preds = {}
    for obj2 in list2:
        dist, pred = dijkstra(adj_list, obj2, weights)
        for obj in list1:
            if obj not in distances:
                distances[obj], preds[obj] = {obj2: dist[obj]}, pred
            else:
                distances[obj].update({obj2: dist[obj]})
    return (distances, preds) 

def distances_fwd_bwd(adj_list: dict, list1: list, list2: list, weights: Optional[dict] = None) -> Tuple[dict]:
    distances = {}
    distances_fwd = {}
    distances_bwd = {}
    preds_fwd = {}
    preds_bwd = {}
    preds = {}
    for obj in list1:
        distances_fwd[obj], preds_fwd[obj] = dijkstra(adj_list, obj, weights)
    for obj in list2:
        distances_bwd[obj], preds_bwd[obj] = dijkstra(adj_list, obj, weights)
    
    for obj in list1:
        for obj2 in list2:
            if obj not in distances:
                distances[obj], preds[obj] = {obj2: distances_fwd[obj][obj2] + distances_bwd[obj2][obj]}, {'fwd': preds_fwd[obj],
                                                                                                           'bwd': preds_bwd[obj2]}
            else:
                distances[obj].update({obj2: distances_fwd[obj][obj2] + distances_bwd[obj2][obj]})
    return (distances, preds)

In [4]:
adj_list = {1:{2:{0: {'length': 5}}}, 2:{1:{0: {'length': 3}}}}
distances_fwd(adj_list, [1], [2])

({1: {1: 0, 2: 5}}, {1: {1: None, 2: 1}})

In [5]:
import xml.etree.ElementTree as ET
root = ET.parse('./Ekb.osm').getroot()
nodes = root.findall('./node')
ways = root.findall('./way')

In [6]:
import networkx as nx
import osmnx as ox
from functools import partial
import xml.etree.ElementTree as ET
import random

class MetricsCalculator():
    
    def __init__(self, path_to_osm: str, objects_list: list, inf_objects_list: list) -> None:
        self.graph = ox.core.graph_from_file(path_to_osm, simplify=False, retain_all=True)
        self.objs = objects_list
        self.inf_objs = inf_objects_list
        self.root = ET.parse(path_to_osm).getroot()
        self.weights = {}
        for obj in self.inf_objs:
            self.weights[obj] = random.random() + 1
        for obj in self.graph.adj.keys():
            if obj not in self.weights:
                self.weights[obj] = 1
        
        self.func_dict_nearest = {
                                  "fwd_node": partial(nearest_list_for_list, self.graph.adj, self.objs, self.inf_objs, self.weights),
                                  "fwd_inf": partial(nearest_list_for_list, self.graph.adj, self.inf_objs, self.objs, self.weights),
                                  "fwd_bwd_node": partial(nearest_fwd_bwd_list_for_list, self.graph.adj, self.objs, self.inf_objs, self.weights),
                                  "fwd_bwd_inf": partial(nearest_fwd_bwd_list_for_list, self.graph.adj, self.inf_objs, self.objs, self.weights),
                                  "bwd_node": partial(nearest_bwd_list_for_list, self.graph.adj,  self.objs, self.inf_objs, self.weights),
                                  "bwd_inf": partial(nearest_fwd_bwd_list_for_list, self.graph.adj, self.inf_objs, self.objs, self.weights)
                                 }
        self.func_dict_distances = {
                                    "fwd_node": partial(distances_fwd, self.graph.adj, self.objs, self.inf_objs, self.weights),
                                    "fwd_inf": partial(distances_fwd, self.graph.adj, self.inf_objs, self.objs, self.weights),
                                    "bwd_node": partial(distances_bwd, self.graph.adj, self.objs, self.inf_objs, self.weights),
                                    "bwd_inf": partial(distances_bwd, self.graph.adj, self.inf_objs, self.objs, self.weights),
                                    "fwd_bwd_node": partial(distances_fwd_bwd, self.graph.adj, self.objs, self.inf_objs, self.weights),
                                    "fwd_bwd_inf": partial(distances_fwd_bwd, self.graph.adj, self.inf_objs, self.objs, self.weights)
                                    }
            
    def nearest(self, mode: str, start: str) -> dict:
        return self.func_dict_nearest[mode + "_" + start]()
    
    
    def closer_than_x(self, distance: int, mode: str, start: str) -> dict:
        closer_than_x = {}
        distances, _ = self.func_dict_distances[mode + "_" + start]()
        if start == "node":
            tmp = {'from': self.objs,
                   'to': self.inf_objs}
        elif start == "inf":
            tmp = {'from': self.inf_objs,
                   'to': self.objs}
            
        for obj in tmp['from']:
            for obj2 in tmp['to']:
                if distances[obj][obj2] <= distance * 1000:
                    if obj not in closer_than_x:
                        closer_than_x[obj] = [obj2]
                    else:
                        closer_than_x[obj].append(obj2)

        return closer_than_x
    
    def closest_inf_in_summary(self, mode: str, start: str) -> tuple:
        distances, _ = self.func_dict_distances["fwd_inf"]()
        min_ = float("inf")
        min_id = -1
        for obj in self.inf_objs:
            dist = 0
            for obj2 in self.objs:
                dist += distances[obj][obj2]
            if dist < min_:
                min_ = dist
                min_id = obj
        return (min_, min_id)
    
    def min_furthest_for_inf(self, mode: str) -> tuple:
        distances, _ = self.func_dict_distances[mode + "_" + "inf"]()
        min_ = float("inf")
        min_id = -1
        for obj in self.inf_objs:
            if min_ > max([(distances[obj][obj2], obj) for obj2 in self.objs])[0]:
                min_, min_id = max([(distances[obj][obj2], obj) for obj2 in self.objs])
        return (min_, min_id)
    
    def min_weight_tree(self) -> tuple:
        distances, preds = self.func_dict_distances['fwd_inf']()
        min_ = float("inf")
        min_id = -1
        for obj in self.inf_objs:
            edges = set()
            sum_ = 0
            for obj2 in self.objs:
                pred = preds[obj]
                curr = obj2
                while curr != obj:
                    if (curr, pred[curr]) not in edges:
                        edges.add((curr, pred[curr]))
                        sum_ += (distances[obj][curr] - distances[obj][pred[curr]])
                    curr = pred[curr]
            if sum_ < min_:
                min_ = sum_
                min_id = obj
        for obj in self.objs:
            print(distances[min_id][obj])
        return (min_, min_id)
            
                
        

In [7]:
m = MetricsCalculator('./Ekb.osm', [458889254, 176677232, 1130816321], [647048670, 360178196, 5093647004, 5093647006, 419751370])

In [281]:
m.graph.adj[458889254]

AdjacencyView({1037480168: {0: {'osmid': 17037375, 'lanes': '2', 'name': 'Проезжая улица', 'highway': 'primary', 'maxspeed': '60', 'oneway': False, 'length': 47.014}}, 1037480174: {0: {'osmid': 38715108, 'highway': 'residential', 'oneway': False, 'length': 11.149}}, 176677187: {0: {'osmid': 141915725, 'lanes': '2', 'name': 'Проезжая улица', 'highway': 'primary', 'maxspeed': '60', 'oneway': False, 'length': 20.323}}})

In [8]:
%%time
print(m.min_weight_tree())
#a = dijkstra(graph.adj, 458889254)
#for key, value in a.items():
#    if value <= 5000:
#        print(key)

7278.957000000001
11583.120000000003
13148.554999999993
(24731.675, 360178196)
CPU times: user 2.3 s, sys: 31.2 ms, total: 2.33 s
Wall time: 2.44 s
