# 

In [1]:
from shapely.geometry import LineString, MultiPolygon, Polygon
from shapely.geometry.base import CAP_STYLE, JOIN_STYLE


def buffered_shape(shape, width: float = 1.0) -> Polygon:
    """Generates a shape with a buffer of `width` around the original shape."""
    ls = LineString(shape).buffer(
        width / 2,
        1,
        cap_style=CAP_STYLE.flat,
        join_style=JOIN_STYLE.round,
        mitre_limit=5.0,
    )
    if isinstance(ls, MultiPolygon):
        # Sometimes it oddly outputs a MultiPolygon and then we need to turn it into a convex hull
        ls = ls.convex_hull
    elif not isinstance(ls, Polygon):
        raise RuntimeError("Shapely `object.buffer` behavior may have changed.")
    return ls

 

In [17]:
class LaneShape:
    def __init__(
            self,
            shape,
            width: float,
    ):
        shape = buffered_shape(shape.getShape(), shape.getWidth())
        self.shape = shape

@dataclass
class RoadShape:
    left_border: np.ndarray
    right_border: np.ndarray

    def __post_init__(self):
        self.polygon = self.left_border + list(reversed(self.right_border))

class JunctionNode:
    def __init__(self, sumolib_obj):
        self.sumolib_obj: sumolib.net.node = sumolib_obj
        self.name = sumolib_obj.getID()
        self.type = sumolib_obj.getType()
        self.shape = sumolib_obj.getShape()
        self.area: float = 0.0
        self.route_dist: float = 0.0

        self.incoming: List[RoadNode] = []
        self.outgoing: List[RoadNode] = []
        self.roads: List[RoadNode] = []
        self.lanes: List[LaneNode] = []

class LaneNode:
    def __init__(self, sumolib_obj):
        self.sumolib_obj: sumolib.net.lane = sumolib_obj
        self.name: str = sumolib_obj.getID()
        self.edge_type: str = sumolib_obj.getEdge().getType()
        self.index: int = sumolib_obj.getIndex()
        if len(self.edge_type):
            self.type = self.edge_type.split('|')[self.index]
        else:
            self.type = 'driving'
        self.width: float = sumolib_obj.getWidth()
        self.length: float = sumolib_obj.getLength()
        self.shape: LaneShape = LaneShape(
            sumolib_obj,
            self.width
        )
        self.road = None
        self.left_neigh: Optional[LaneNode] = None
        self.right_neigh: Optional[LaneNode] = None
        self.incoming: List[LaneNode] = []
        self.outgoing: List[LaneNode] = []
        self.function: Optional[str] = None

class RoadNode:
    def __init__(
            self,
            sumolib_obj,
            lanes,
            from_junction,
            to_junction,
    ):
        self.sumolib_obj: sumolib.net.edge = sumolib_obj
        self.name: str = sumolib_obj.getID()
        self.cross_edges = sumolib_obj.getCrossingEdges()
        if sumolib_obj.getFunction() == 'crossing':
            for lane in lanes:
                lane.type = 'crossing'
        self.type = sumolib_obj.getType()
        self.lanes: List[LaneNode] = lanes
        self.from_junction: JunctionNode = from_junction
        self.to_junction: JunctionNode = to_junction
        self.width: float = sum([lane.width for lane in lanes])
        self.length: float = max([lane.length for lane in lanes])
        self.priority: int = sumolib_obj.getPriority()
        self.function: str = sumolib_obj.getFunction()

        # leftmost_lane = list(filter(lambda l: l.left_neigh is None, lanes))[0]
        # rightmost_lane = list(filter(lambda l: l.right_neigh is None, lanes))[0]
        # road_shape = RoadShape(
        #     leftmost_lane.shape.left_border,
        #     rightmost_lane.shape.right_border,
        # )
        # self.shape: RoadShape = road_shape

        for lane in self.lanes:  # Link to parent road
            lane.road = self
            lane.function = self.function

        self.junction: Optional[JunctionNode] = None
        self.incoming: List[RoadNode] = []
        self.outgoing: List[RoadNode] = []

class RoadLaneJunctionGraph:
    def __init__(
            self,
            sumo_net_path,
    ):

        self.sumo_net = sumolib.net.readNet(sumo_net_path, withInternal=True, withPedestrianConnections=True, withPrograms=True)

        # self.tls = self.sumo_net.getTrafficLights()
        
        self.roads: Dict[str, RoadNode] = {}
        self.lanes: Dict[str, LaneNode] = {}
        self.junctions: Dict[str, JunctionNode] = {}

        for edge in self.sumo_net.getEdges(withInternal=True):  # Normal edges first
            lanes = []
            lane_index_to_lane = {}
            for lane in edge.getLanes():  # Create initial LaneNode objects
                lane_node = LaneNode(lane)
                self.lanes[lane_node.name] = lane_node
                lanes.append(lane_node)
                lane_index_to_lane[lane.getIndex()] = lane_node

            for lane in lanes:  # Setting left and right neighbors
                if lane.index - 1 in lane_index_to_lane:
                    lane.right_neigh = lane_index_to_lane[lane.index - 1]
                if lane.index + 1 in lane_index_to_lane:
                    lane.left_neigh = lane_index_to_lane[lane.index + 1]

            junctions = []  # Create initial JunctionNode objects connected to current road
            for i, node in enumerate([edge.getFromNode(), edge.getToNode()]):
                name = node.getID()
                if node.getID() not in self.junctions:
                    junction_node = JunctionNode(node)
                    self.junctions[name] = junction_node
                else:
                    junction_node = self.junctions[name]
                junctions.append(junction_node)

            # Create RoadShape for Road
            name = edge.getID()
            road_node = RoadNode(
                edge,
                lanes,
                junctions[0],  # from_node
                junctions[1],  # to_node
            ) 
            self.roads[name] = road_node

        for junction_id, junction in self.junctions.items():
            for incoming in junction.sumolib_obj.getIncoming():  # Link junction
                junction.incoming.append(self.roads[incoming.getID()])
            for outgoing in junction.sumolib_obj.getOutgoing():
                junction.outgoing.append(self.roads[outgoing.getID()])

            conns = junction.sumolib_obj.getConnections()
            for conn in conns:
                from_lane_id = conn.getFromLane().getID()  # Link lanes
                to_lane_id = conn.getToLane().getID()
                via_lane_id = conn.getViaLaneID()

                from_road_id = conn.getFrom().getID()  # Link roads
                to_road_id = conn.getTo().getID()
                if via_lane_id == '':  # Maybe we could skip this, but not sure
                    self.lanes[from_lane_id].outgoing.append(self.lanes[to_lane_id])
                    self.lanes[to_lane_id].incoming.append(self.lanes[from_lane_id])
                    self.roads[from_road_id].outgoing.append(self.roads[to_road_id])
                    self.roads[to_road_id].incoming.append(self.roads[from_road_id])
                else:
                    via_road_id = self.sumo_net.getLane(conn.getViaLaneID()).getEdge().getID()
                    self.lanes[from_lane_id].outgoing.append(self.lanes[via_lane_id])
                    self.lanes[to_lane_id].incoming.append(self.lanes[via_lane_id])
                    self.lanes[via_lane_id].incoming.append(self.lanes[from_lane_id])
                    self.lanes[via_lane_id].outgoing.append(self.lanes[to_lane_id])
                    self.roads[from_road_id].outgoing.append(self.roads[via_road_id])
                    self.roads[to_road_id].incoming.append(self.roads[via_road_id])
                    self.roads[via_road_id].incoming.append(self.roads[from_road_id])
                    self.roads[via_road_id].outgoing.append(self.roads[to_road_id])

                    junction.roads.append(self.roads[via_road_id])  # Add roads/lanes to junction
                    junction.lanes.append(self.lanes[via_lane_id])
                    self.roads[via_road_id].junction = junction  # Add junction reference

        lane_dividers, edge_dividers = self._compute_traffic_dividers()
        
        self.lane_dividers = lane_dividers
        self.edge_dividers = edge_dividers
                    
    def _compute_traffic_dividers(self, threshold=1):
        lane_dividers = []  # divider between lanes with same traffic direction
        edge_dividers = []  # divider between lanes with opposite traffic direction
        edge_borders = []
        for edge in self.sumo_net.getEdges():
            if edge.getFunction() in ["internal", "walkingarea", 'crossing']:
                continue

            lanes = edge.getLanes()
            for i in range(len(lanes)):
                shape = lanes[i].getShape()
                left_side = sumolib.geomhelper.move2side(
                    shape, - lanes[i].getWidth() / 2
                )
                right_side = sumolib.geomhelper.move2side(
                    shape,  lanes[i].getWidth() / 2
                )

                if i == 0:
                    edge_borders.append(right_side)

                if i == len(lanes) - 1:
                    edge_borders.append(left_side)
                else:
                    lane_dividers.append(left_side)

        # The edge borders that overlapped in positions form an edge divider
        for i in range(len(edge_borders) - 1):
            for j in range(i + 1, len(edge_borders)):
                edge_border_i = np.array(
                    [edge_borders[i][0], edge_borders[i][-1]]
                )  # start and end position
                edge_border_j = np.array(
                    [edge_borders[j][-1], edge_borders[j][0]]
                )  # start and end position with reverse traffic direction

                # The edge borders of two lanes do not always overlap perfectly, thus relax the tolerance threshold to 1
                if np.linalg.norm(edge_border_i - edge_border_j) < threshold:
                    edge_dividers.append(edge_borders[i])

        return lane_dividers, edge_dividers

class StreetMap:
    def __init__(self):
        self.graph = None

    def reset(self, sumo_net_path: str):
        self.sumo_net_path = sumo_net_path
        self.graph = RoadLaneJunctionGraph(self.sumo_net_path)

In [18]:
street_map = StreetMap()

street_map.reset('../examples/net/single-intersection.net.xml')
# street_map.reset('./net.net.xml')



# tlss = street_map.graph.tls
# for tls in tlss:
#     print(tls.getID())
#     tlPrograms = tls.getPrograms()
#     print(tlPrograms)
    #     for tlProgram in tlPrograms.values():
    #         tlLogic = TlLogic.createFromTLSProgram(tls, tlProgram, net=net, debug=options.debug)
    #         tlLogic.createSignalGroupsFromPhases(group=options.group)
    #         tlLogics.append(tlLogic)
    # # check for same signal groups
    # if options.group:
    #     if not len(set([len(tll._signalGroups) for tll in tlLogics])) == 1:
    #         print("Signal states of TL %s cannot be grouped unambiguously. "
    #               "Please remove the group option or the contradictory tll file." % tls.getID())
    #         return
    # tlLogicsTotal.extend(tlLogics)

POLYGON ((-6.391562499523163 6.410000610351283, -6.388437500476837 10.409999389648718, 6.411562499523163 10.399999389648718, 6.408437500476837 6.400000610351284, -6.391562499523163 6.410000610351283))
POLYGON ((6.41 6.4, 10.41 6.4, 10.41 -6.4, 6.41 -6.4, 6.41 6.4))
POLYGON ((6.4 -6.4, 6.4 -10.4, -6.4 -10.4, -6.4 -6.4, 6.4 -6.4))
POLYGON ((-6.4 -6.4, -10.4 -6.4, -10.4 6.4, -6.4 6.4, -6.4 -6.4))


In [372]:
import logging
import os
import tempfile
from dataclasses import dataclass
from os.path import join
from typing import Union

import numpy as np
from metadrive.scenario import ScenarioDescription as SD
from metadrive.type import MetaDriveType
from shapely.geometry.linestring import LineString
from shapely.geometry.multilinestring import MultiLineString


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
import geopandas as gpd
from shapely.ops import unary_union

In [373]:
for junction_id, junction in street_map.graph.junctions.items():
    conns = junction.sumolib_obj.getConnections()
    for conn in conns:
        print(conn.getState())
        break
    break

M


In [376]:
def extract_map_features(street_map: StreetMap):

    from shapely.geometry import Polygon, MultiPolygon, LineString, Point, box
    
    ret = {}
    # # build map boundary
    polygons = []
    
    for junction_id, junction in street_map.graph.junctions.items():
        boundary_polygon = Polygon(junction.shape)
        boundary_polygon = [(x,y) for x, y in boundary_polygon.exterior.coords]
        id = "junction_{}".format(junction.name)
        ret[id] = {
                    SD.TYPE: MetaDriveType.LANE_SURFACE_STREET,
                    SD.POLYLINE: junction.shape,
                    SD.POLYGON: boundary_polygon,
                }

    # build map lanes
    for road_id, road in street_map.graph.roads.items():
        for lane in road.lanes:
            
            id = "lane_{}".format(lane.name)
            
            boundary_polygon = [(x,y) for x, y in lane.shape.shape.exterior.coords]
            if lane.type == 'driving':
                ret[id] = {
                    SD.TYPE: MetaDriveType.LANE_SURFACE_STREET,
                    SD.POLYLINE: lane.sumolib_obj.getShape(),
                    SD.POLYGON: boundary_polygon,
                }
            # elif lane.type == 'sidewalk':
            #     ret[id] = {
            #         SD.TYPE: MetaDriveType.BOUNDARY_SIDEWALK,
            #         SD.POLYGON: boundary_polygon,
            #     }
            # elif lane.type == 'shoulder':
                
            #     ret[id] = {
            #         SD.TYPE: MetaDriveType.BOUNDARY_SIDEWALK,
            #         SD.POLYGON: boundary_polygon,
            #     }


    for lane_divider_id, lane_divider in enumerate(street_map.graph.lane_dividers):
        id = "lane_divider_{}".format(lane_divider_id)
        ret[id] = {SD.TYPE: MetaDriveType.LINE_BROKEN_SINGLE_WHITE, SD.POLYLINE: lane_divider}

    for edge_divider_id, edge_divider in enumerate(street_map.graph.edge_dividers):
        id = "edge_divider_{}".format(edge_divider_id)
        ret[id] = {SD.TYPE: MetaDriveType.LINE_SOLID_SINGLE_YELLOW, SD.POLYLINE: edge_divider}
            
    return ret
        
    

    # if len(road.lanes) > 1:
    #     print(road.shape.polygon)
    #     for lane in road.lanes:
    #         print(lane.index, lane.name)
    #         print(lane.shape.left_border)
    #         print(lane.shape.right_border)
            
    #     #     print(lane.shape.right_border)
    #     print("====")
    #     break
        


  

In [377]:
import ast
import copy
import inspect
import logging
import math
import multiprocessing
import os
import pickle
import shutil
from functools import partial
from metadrive.scenario.scenario_description import ScenarioDescription

from scenarionet.builder.utils import merge_database
from scenarionet.common_utils import save_summary_anda_mapping
from scenarionet.converter.pg.utils import convert_pg_scenario, make_env

result = SD()
result[SD.ID] = 0
result[SD.VERSION] = "sumo" 
# result[SD.LENGTH] = scenario.get_number_of_iterations()
# metadata
result[SD.METADATA] = {}
result[SD.DYNAMIC_MAP_STATES] = {}
result[SD.TRACKS] = {}
result[SD.LENGTH] = 10000
result[SD.METADATA]["dataset"] = street_map.sumo_net_path
result[SD.METADATA]["map"] = street_map.sumo_net_path
result[SD.METADATA][SD.METADRIVE_PROCESSED] = False
result[SD.METADATA]["map_version"] = "sumo-maps-v1.0"
result[SD.METADATA]["coordinate"] = "right-handed"
result[SD.METADATA]["scenario_token"] = 0
result[SD.METADATA]["scenario_id"] = 0
result[SD.METADATA][SD.ID] = 0
result[SD.METADATA]["scenario_type"] = "scenario_type"
all_objs = set()
all_objs.add("ego")
tracks = {
    k: dict(
        type=MetaDriveType.UNSET,
        state=dict(
            position=np.concatenate([np.array([0.001, 0.001, 0]).reshape(-1,3)*i for i in range(10000)],axis=0),
            heading=np.zeros(shape=(10000, )),
            velocity=np.ones(shape=(10000, 2))*0.001,
            valid=np.ones(shape=(10000, )),
            length=np.zeros(shape=(10000, 1)),
            width=np.zeros(shape=(10000, 1)),
            height=np.zeros(shape=(10000, 1))
        ),
        metadata=dict(track_length=10000, type=None, object_id=k, original_id=k)
    )
    for k in list(all_objs)
}

result[SD.TRACKS] = tracks
result[SD.METADATA][SD.SDC_ID] = "ego"

# # map
result[SD.MAP_FEATURES] = extract_map_features(street_map)

sd_scenario = result

output_path = "./"
dataset_name = "sumo"
dataset_version = "v1.0"

save_path = copy.deepcopy(output_path)
output_path = output_path + "_tmp"
# meta recorder and data summary
if os.path.exists(output_path):
    shutil.rmtree(output_path)
os.makedirs(output_path, exist_ok=False)

summary_file = SD.DATASET.SUMMARY_FILE
mapping_file = SD.DATASET.MAPPING_FILE

summary_file_path = os.path.join(output_path, summary_file)
mapping_file_path = os.path.join(output_path, mapping_file)

summary = {}
mapping = {}

count = 0

scenario_id = sd_scenario[SD.ID]
export_file_name = SD.get_export_file_name(dataset_name, dataset_version, scenario_id)


 # add agents summary
summary_dict = {}
for track_id, track in sd_scenario[SD.TRACKS].items():
    print(track_id)
    summary_dict[track_id] = SD.get_object_summary(state_dict=track, id=track_id)
sd_scenario[SD.METADATA][SD.SUMMARY.OBJECT_SUMMARY] = summary_dict

# count some objects occurrence
sd_scenario[SD.METADATA][SD.SUMMARY.NUMBER_SUMMARY] = SD.get_number_summary(sd_scenario)


# update summary/mapping dicy
summary[export_file_name] = copy.deepcopy(sd_scenario[SD.METADATA])
mapping[export_file_name] = ""  # in the same dir

# sanity check
sd_scenario = sd_scenario.to_dict()

# dump
p = os.path.join(output_path, export_file_name)
with open(p, "wb") as f:
    pickle.dump(sd_scenario, f)

count += 1

# store summary file
save_summary_anda_mapping(summary_file_path, mapping_file_path, summary, mapping)



ego


