In [1]:
import osmnx as ox
import numpy as np
from collections import defaultdict
import networkx as nx
from shapely.geometry import Polygon, MultiPolygon
import pickle
import matplotlib.pyplot as plt
from random import randint, random, randrange, choice

In [2]:
class Name:
    def __init__(self, name: str):
        self.name = name
        
        self.gdf = ox.gdf_from_place(name)
        self.official_name = self.gdf.place_name.values[0]
        self.geometry = self.gdf.geometry[0]
        if not type(self.geometry) is Polygon or type(self.geometry) is MultiPolygon:
            raise TypeError("Location geometry was not a Polygon or a MultiPolygon")
        
    def summary(self):
        print(f"Input Name: {self.name}")
        print(f"Official Name: {self.official_name}")
        print(type(self.geometry))
        

In [3]:
class Bbox:
    def __init__(self, north, south, east, west):
        self.bbox = (north, south, east, west)
        
    def __iter__(self):
        return (b for b in self.bbox)
    
    def summary(self):
        width = self.bbox[0] - self.bbox[1]
        height = self.bbox[2] - self.bbox[3]
        area = width * height
        print(f"Width: {width}")
        print(f"Height: {height}")
        print(f"Area: {area}")

In [4]:
class NodesGeometry:
    def __init__(self, G, nodes, segments):
        self.segments = segments
        nodes_xy = self.create_nodes_xy(G, nodes)
        nodes_xy = self.update_nodes_xy(nodes_xy, segments)
        self.nodes = self.create_nodes(nodes_xy)
    
    @staticmethod
    def xy_vec(nodes_xy, n1, n2):
        """
        Calculates the vector that takes you from n1 to n2
        """
        return nodes_xy[n2] - nodes_xy[n1]

    @staticmethod
    def segment_vec(nodes_xy, segment):
        return NodesGeometry.xy_vec(nodes_xy, segment[0], segment[1])

    @staticmethod
    def segment_unit_vec(nodes_xy, segment):
        arr = NodesGeometry.segment_vec(nodes_xy, segment)
        return arr / np.linalg.norm(arr)
    
    def create_nodes_xy(self, G, nodes):
        nodes_xy = {}
        for node in nodes:
            xy = G.nodes()[node[0]]
            x = xy['x']
            y = xy['y']
            nodes_xy[node] = np.array((x,y))
            
        return nodes_xy
    
    def update_nodes_xy(self, nodes_xy, segments):
        dist = .00001 #change to 10 feet once you figure out units
        for segment in segments:
            if segment[2]['has_comp']:
                unit_vec = NodesGeometry.segment_unit_vec(nodes_xy, segment)
                perp_vec = np.array([unit_vec[1],unit_vec[0]*-1])
                nodes_xy[segment[0]] = nodes_xy[segment[0]] + ((unit_vec * dist) + (perp_vec * dist / 2))
                nodes_xy[segment[1]] = nodes_xy[segment[1]] - ((unit_vec * dist) - (perp_vec * dist / 2))
            else:
                nodes_xy[segment[0]] = nodes_xy[segment[0]] + (unit_vec * dist)
                nodes_xy[segment[1]] = nodes_xy[segment[1]] - (unit_vec * dist)
    
        return nodes_xy
    
    def create_nodes(self, nodes_xy):
        nodes_graph = {}
        for k,v in nodes_xy.items():
            nodes_graph[k] = {'x':v[0], 'y':v[1]}
        nodes = [(k, v) for k, v in nodes_graph.items()]
        return nodes

In [5]:
class IntersectionBuilder:
    def __init__(self, in_out):
        """
        :param in_out: in out dictionary of the init_graph nodes
        """
        self.intersections = self.create_intersections(in_out)
    
    def create_intersections(self, in_out):
        intersections = []
        for k, v in in_out.items():
            # every in connects to every out - unless same node
            for n_in in v['in']:
                n1 = (k, n_in, 'in')
                for n_out in v['out']:
                    n2 = (k, n_out, 'out')
                    if n_in != n_out:
                        intersections.append((n1, n2, {'type':'intersection'}))
        return intersections


class SegmentBuilder:
    def __init__(self, in_out):
        """
        :param in_out: in out dictionary of the init_graph nodes
        """
        self.segments = self.create_segments(in_out)
        self.nodes = self.extract_nodes(self.segments)
    
    def extract_nodes(self, segments):
        nodes = set()
        for segment in segments:
            nodes.add(segment[0])
            nodes.add(segment[1])
        return nodes
        
    def complement_dir(self, s: str):
        """
        TODO: Switch to True and False so I don't have to write this function
        """
        if s == 'in':
            return 'out'
        elif s == 'out':
            return 'in'
        else:
            print("complement_dir failed")
            
    def complement_segment(self, segment):
        """
        Computes the complementary segment of the given segment. The
        complementary segement represents the other direction in a two
        way street.
        """
        n0 = segment[0]
        n1 = segment[1]

        n3 = (n0[0], n0[1], self.complement_dir(n0[2]))
        n2 = (n1[0], n1[1], self.complement_dir(n1[2]))

        return (n2, n3)
    
    def create_segments_set(self, in_out):
        segments_set = set()
        for k, v in in_out.items():
            for node in v['in']:
                n1 = (node, k, 'out')
                n2 = (k, node, 'in')
                segments_set.add((n1, n2))
        return segments_set
    
    def create_segments_list(self, segments):
        segment_list = []
        for segment in segments:
            if (self.complement_segment(segment) in segments):
                has_comp = True
            else:
                has_comp = False
            segment_list.append((segment[0], segment[1], {'type': 'segment', 'has_comp':has_comp}))
        return segment_list
    
    def create_segments(self, in_out):
        segments_set = self.create_segments_set(in_out)
        segments_list = self.create_segments_list(segments_set)
        return segments_list

In [6]:
class GraphBuilder:
    def __init__(self, bound):
        """
        The "run" function to make Graph objects
        
        :param bound: user desired bounds of the graph 
        :type bound: Name or Bbox
        
        TODO: Make into callable function that returns a Graph object
        TODO: Figure out what should be saved as an attribute and what should be temp
        """
        self.bound = bound
        self.init_graph = self.initialize_map(self.bound)
        self.in_out = self.create_in_out_dict(self.init_graph)
        segmentBuilder = SegmentBuilder(self.in_out)
        self.segments = segmentBuilder.segments
        nodes = segmentBuilder.nodes
        self.nodes = NodesGeometry(self.init_graph, nodes, self.segments).nodes
        self.intersections = IntersectionBuilder(self.in_out).intersections
        self.edges = self.create_edges(self.segments, self.intersections)
        self.DG = self.create_dg()
        self.node_map = self.create_node_map()
        self.convert_to_int_graph()
    
    def initialize_map(self, bound):
        """
        initialize_map takes in a bound and uses osmnx to create an inital
        map of the desired area.
        
        :param bound: user desired bounds of the graph 
        :type bound: Name or Bbox
        """
        init_graph = None
        if type(bound) is Name:
            init_graph = ox.graph_from_place(bound.official_name)
        elif type(bound) is Bbox:
            init_graph = ox.graph_from_bbox(*bbox)
        else:
            raise RuntimeError("Could not create graph from specified bound")
        return init_graph
                
    def create_in_out_dict(self, G):
        """
        Creates a dictionary where each key is a node in a graph whos value
        corresponds to the another dictionary that tells what nodes can be traversed
        to by following edges "out" of the key node and what edges lead "in" to the key
        node by following directed edges
        
        :param G: input graph whos nodes and edges are used to create in_out dict
        :type G: MultiDiGraph
        """
        def make_dict():
            return {'in':[],'out':[]}

        in_out = defaultdict(make_dict)
        for start, end in G.edges():
            if start == end:
                continue
            in_out[end]['in'].append(start)
            in_out[start]['out'].append(end)
        return in_out
    
    def create_node_map(self):
        """
        Creates a node map associating the intitial nodes with the corresponding
        expanded nodprint(node)es. This will allow us to add data to an entire intersection by
        just locating the closest node in the initial graph.
        """
        node_map = {x:[] for x in self.init_graph.nodes}
        for node in self.DG.nodes:
            if node[2] == 'in':
                node_map[node[0]].append(node)
            elif node[2] == 'out':
                node_map[node[1]].append(node)
            else:
                raise Exception(f"Found bad node: {node}")
        return node_map
                        
    def create_edges(self, segments, intersections):
        edges = segments + intersections
        edges = [(u,v,0,d) for u,v,d in edges]
        return edges
    
    def create_dg(self):
        G = nx.DiGraph()
        G.add_nodes_from(self.nodes)

        G.add_edges_from([(e[0], e[1], e[3]) for e in self.edges])
        return G
    
    def convert_to_int_graph(self):
        node_to_int = {node:i for i,node in enumerate(self.DG.nodes)}

        int_nodes = []
        for node,data in self.DG.nodes(data=True):
            int_nodes.append((node_to_int[node],data))

        int_edges = []
        for n1,n2,data in list(self.DG.edges(data=True)):
            int_edges.append((node_to_int[n1],node_to_int[n2],data))
            
        G = nx.DiGraph()
        G.add_nodes_from(int_nodes)
        G.add_edges_from(int_edges)
        self.DG = G
        
        self.node_map = {k:[node_to_int[n] for n in ns] for k,ns in self.node_map.items()}
    
    def plot_graph(self, fig_height=10):
        ox.plot_graph(self.G, fig_height=fig_height)
        
    def plot_map(self, fig_height=10):
        """
        Helper function to the initial 
        """
        ox.plot_graph(self.init_graph, fig_height=fig_height)

In [7]:
class Graph:
    """
    A wrapper for nxgraphs that we should probably have
    
    Allows conversions between MultiDiGraphs (visualization) and
    DiGraphs (A*)all_angles
    """
    def __init__(self):
        pass
        
    @staticmethod
    def from_bound(bound):
        G = Graph()
        graph_builder = GraphBuilder(bound)
        G.DiGraph = graph_builder.DG
        G.init_graph = graph_builder.init_graph
        G.node_map = graph_builder.node_map
        return G
    
    @staticmethod
    def from_file(filepath):
        """
        Unpickle a graph object
        
        :param filepath: filepath to the object
        :type filepath: string
        :rtype: Graph
        """
        with open(filepath, 'rb') as f:
            return pickle.load(f)
        
    def save(self, filepath):
        """
        Pickle graph object to a file
        
        :param filepath: filepath to save the object
        :type filepath: string
        :rtype: Graph
        """
        with open(filepath, 'wb') as f:
            pickle.dump(self, f)
            
    def create_mdg(self):
        G = nx.MultiDiGraph()
        G.graph = {'name': 'Test Graph','crs': {'init': 'epsg:4326'},'simplified': True}
        G.add_nodes_from(self.DiGraph.nodes(data=True))
        G.add_edges_from([(n1,n2,0,data) for n1,n2,data in self.DiGraph.edges(data=True)])
        return G
            
    def plot_graph(self, fig_height=10):
        MDG = self.create_mdg()
        ox.plot_graph(MDG, fig_height=fig_height)

In [8]:
bbox = Bbox(38.88300016, 38.878726840000006, -77.09939832, -77.10500768)

In [14]:
G = Graph.from_bound(bbox)

In [207]:
def random_intersection1():
    return {'turn':random()*160,
            'bike_lane':choice([True, False]),
            'crosswalk':choice([True, False]),
            'separate_path':choice([True, False]),
            'speed_limit':randrange(25,36,5),
            'signalized':choice(['stop_sign','traffic_light','no_signal']),
            'traffic_volume':random()*1000}

In [208]:
def random_intersection2():
    stop_sign, traffic_light = choice([(0,0),(0,1),(1,0)])
    return {'turn':random()*160,
            'bike_lane':randint(0,1),
            'crosswalk':randint(0,1),
            'separate_path':randint(0,1),
            'speed_limit':randrange(25,36,5),
            'stop_sign':stop_sign,
            'traffic_light':traffic_light,
            'traffic_volume':random()*1000}

In [209]:
def random_segment1():
    return {'bike_lane':choice([True, False]),
            'separate_path':choice([True, False]),
            'speed_limit':randrange(25,36,5),
            'traffic_volume':random()*1000}

In [210]:
def random_segment2():
    return {'bike_lane':randint(0,1),
            'separate_path':randint(0,1),
            'speed_limit':randrange(25,36,5),
            'traffic_volume':random()*1000}

In [211]:
random_data = {}
for e in G.DiGraph.edges(data=True):
    if e[2]['type'] == 'intersection':
        k = (e[0],e[1])
        v = random_intersection1()
        random_data[k] = v
    elif e[2]['type'] == 'segment':
        k = (e[0],e[1])
        v = random_segment1()
        random_data[k] = v
    else:
        raise Exception(f"Edge ({e}) does not have a road 'type'")

In [213]:
nx.set_edge_attributes(G=G.DiGraph, values=random_data)

In [214]:
list(G.DiGraph.edges(data=True))[:3]

[(1,
  46,
  {'type': 'intersection',
   'turn': 132.93712368411803,
   'bike_lane': False,
   'crosswalk': True,
   'separate_path': True,
   'speed_limit': 35,
   'signalized': 'traffic_light',
   'traffic_volume': 386.7640248756685}),
 (1,
  173,
  {'type': 'intersection',
   'turn': 142.82747792213928,
   'bike_lane': False,
   'crosswalk': False,
   'separate_path': True,
   'speed_limit': 25,
   'signalized': 'stop_sign',
   'traffic_volume': 136.24429074961986}),
 (2,
  221,
  {'type': 'intersection',
   'turn': 43.49395287414899,
   'bike_lane': True,
   'crosswalk': False,
   'separate_path': True,
   'speed_limit': 30,
   'signalized': 'traffic_light',
   'traffic_volume': 564.2766285079763})]

In [202]:
list(G.init_graph.edges(data=True))[:5]

[(63339530,
  63327688,
  {'osmid': 8795890,
   'name': 'North Lincoln Street',
   'highway': 'residential',
   'oneway': False,
   'length': 174.14600000000002,
   'geometry': <shapely.geometry.linestring.LineString at 0x7f69ae164b70>}),
 (2219272722,
  5662408421,
  {'osmid': 586076545,
   'highway': 'footway',
   'oneway': False,
   'length': 19.297}),
 (63349011,
  63339546,
  {'osmid': 354443669,
   'lanes': '4',
   'name': 'Wilson Boulevard',
   'highway': 'secondary',
   'oneway': False,
   'length': 43.388}),
 (63349011,
  5441299875,
  {'osmid': 354443669,
   'lanes': '4',
   'name': 'Wilson Boulevard',
   'highway': 'secondary',
   'oneway': False,
   'length': 15.933}),
 (63339539,
  63327688,
  {'osmid': 8795890,
   'name': 'North Lincoln Street',
   'highway': 'residential',
   'oneway': False,
   'length': 79.437})]

In [None]:
keys

In [None]:
nx.set_edge_attributes()

In [30]:
raw_dict = {"stop_sign":1, "traffic_light":0, "bike_lane":0, "separate_path":0, "crosswalk":1, "traffic_volume": 4, "speed_limit":35}

In [33]:
set(raw_dict.keys())

{'bike_lane',
 'crosswalk',
 'separate_path',
 'speed_limit',
 'stop_sign',
 'traffic_light',
 'traffic_volume'}

In [None]:
node_map[list(G.init_graph.nodes)[5]]

In [None]:
filename = 'test.pkl'
G.save(filename)
G = Graph.from_file(filename)

In [None]:
name = Name("Needham, MA")
G = Graph.from_bound(name)

In [None]:
plt.ion()

In [None]:
pickle.dump()

In [None]:
bbox = Bbox(38.88300016, 38.878726840000006, -77.09939832, -77.10500768)

GB = GraphBuilder(bbox)

class MapLoader:
    def __init__(self):
        pass
        

bbox = (38.88300016, 38.878726840000006, -77.09939832, -77.10500768)

bbox_graph = ox.graph_from_bbox(*bbox)

ox.plot_graph(bbox_graph, fig_height=3);



In [None]:
GB = GraphBuilder(bbox)

In [None]:
in_d = {n[0]:n[1] for n in GB.nodes if n[0][2] == 'in'}
out_d = {n[0]:n[1] for n in GB.nodes if n[0][2] == 'out'}

In [None]:
out_nodes = [n for n in GB.nodes if n[0][2] == 'out']
in_nodes = [list(GB.MDG[n[0]])[0] for n in out_nodes]
in_nodes = [(n, in_d[n]) for n in in_nodes]
out_in = list(zip(out_nodes, in_nodes))

In [None]:
outs_nodes = [list(GB.MDG[n[0]]) for n in in_nodes]

In [None]:
outs_nodes = [[(n, out_d[n]) for n in ns] for ns in outs_nodes]

In [None]:
in_outs = list(zip(in_nodes, outs_nodes))

In [None]:
def calc_angle(v1, v2):
    """
    calculate the angle between two vectors
    
    :param v1: the nodes that construct an out->in segment edge
    :param v2: the nodes that construct an in->out intersection edge
    
    TODO: sometimes angle is way too large (ex. 335), fix the wrap around issue.
    TODO: visualize the angle to verify that the function is actually working reasonably
    """
    # convert vector to numpy
    v1 = np.array((v1[1][1]['x'] - v1[0][1]['x'], v1[1][1]['y'] - v1[0][1]['y']))
    v2 = np.array((v2[1][1]['x'] - v2[0][1]['x'], v2[1][1]['y'] - v2[0][1]['y']))
    
    # normalize vectors
    v1 = v1 / np.linalg.norm(v1)
    v2 = v2 / np.linalg.norm(v2)
    
    # https://stackoverflow.com/questions/21483999/using-atan2-to-find-angle-between-two-vectors
    angle = np.arctan2(v2[1], v2[0]) - np.arctan2(v1[1], v1[0])
    
    return angle * (180 / np.pi)

In [None]:
def calc_angles(v1, v2s):
    """
    calculate all the angles for the all edges within the v2 intersection
    
    :param v1: the nodes that construct an out->in edge segment
    :param v2: the nodes that construct all the in->out edges for an intersection
    """
    
    angles = []
    for v2b in v2s[1]:
        v2 = (v2s[0],v2b)
        angles.append(calc_angle(v1, v2))
        
    return angles

In [None]:
def angle_turn(angle):
    """
    determines what type of turn based on a single angle. Note that the angle
    is currently between the roadway segment and the path through the intersection.
    As a result, an angle of 45 degrees is actually a turn of 90 degrees.
    """
    
    if angle > 22.5:
        return 'left'
    elif angle < -22.5:
        return 'right'
    else:
        return "straight"

In [None]:
v1 = out_in[0]

In [None]:
i = 3
calc_angles(out_in[i], in_outs[i])

In [None]:
all_angles = [calc_angles(a,b) for (a,b) in zip(out_in,in_outs)]
all_turns = [[angle_turn(angle) for angle in angles] for angles in all_angles]
all_edges = [[(in_out[0][0], o[0], 0) for o in in_out[1]] for in_out in in_outs]

In [None]:
d = GB.MDG.edges[all_edges[1][0]]
d['turn'] = all_turns[1][0]
edge = [(all_edges[1][0][0], all_edges[1][0][1], all_edges[1][0][2], d)]

In [None]:
GB.MDG.add_edges_from(edge)

In [None]:
GB.MDG.edges[all_edges[1][0]]

In [None]:
set([o[2].get('turn') for o in list(GB.MDG.edges(data=True))])

In [None]:
all_angles

In [None]:
len(out_in), len(in_outs)

In [None]:
sum([len(ns) for ns in outs_nodes])

In [None]:
len(GB.MDG.nodes)

In [None]:
len(GB.MDG.edges)