In [23]:
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point,LineString
import networkx as nx
from shapely.ops import split, snap
from shapely.geometry import Point, LineString


In [24]:
# Read the road dataset
roads = gpd.read_file(r"C:\Users\bsf31\Documents\post-meds\data\signal\tl_2023_06083_roads_sbco.gpkg")
# Dissolve the road lines to merge connected segments
dissolved_roads = roads.dissolve(by='LINEARID') 


# Intersections & Deadends

In [25]:
# Extract the start and end points of each line
start_points = dissolved_roads.geometry.apply(lambda x: Point(x.coords[0]))
end_points = dissolved_roads.geometry.apply(lambda x: Point(x.coords[-1]))
# Concatenate the start and end points into a single GeoSeries using pd.concat()
all_points = gpd.GeoSeries(pd.concat([start_points, end_points]), crs=roads.crs)
# Count the occurrence of each point
point_counts = all_points.value_counts()


# Filter the points that appear only once (intersections & deadends)
intersections_deadends = point_counts[point_counts == 1]
# Create a dictionary with the dead end points and their counts
intersections_deadends_data = {'geometry': intersections_deadends.index, 'count': intersections_deadends.values}
 #Create a GeoDataFrame with the dead end points
intersections_deadends_data_points = gpd.GeoDataFrame(intersections_deadends_data, crs=roads.crs)
# Save the dead end points to a new shapefile
intersections_deadends_data_points.to_file(r"C:\Users\bsf31\Documents\post-meds\data\signal\sbco_intersection_deadends.gpkg", driver='GPKG')


# Deadends
## converts a dataset of roads into a graph representation and identifies dead-end points within that network.

* Graph Initialization: The process begins with the initialization of a graph. In this context, a graph is a mathematical structure used to model pairwise relations between objects. Here, roads and intersections are modeled as edges and nodes, respectively.
* Geometry Simplification and Conversion: Each road segment (line geometry) from the dataset is simplified to remove minor irregularities and potential complexities. This simplification helps in avoiding inaccuracies when converting these geometries into a graph format. The simplified line geometries are then converted into a series of points (nodes in the graph), and consecutive points are connected by edges. This effectively builds the network where paths (edges) between points (nodes) represent segments of the road.
* Identification of Dead Ends: Once the graph is built, the code identifies dead ends in the network. A dead end is recognized by having a node with only one connected edge (degree of 1). This means there is no alternate path from that node, signifying the end of a road without any junctions.

In [26]:
roads_dissolved = roads.dissolve(by='LINEARID') 
# Initialize a new graph. This will be used to store the road network where nodes represent junctions or endpoints of roads, and edges represent the roads themselves.
G = nx.Graph()

# Iterate over each row in the dissolved roads GeoDataFrame. Each row corresponds to a road segment.
for index, row in roads_dissolved.iterrows():
    # Simplify the geometry of the road segment to remove minor variations and potential self-intersections. This is important for accurately converting the geometries into a graph structure without unnecessary complexity.
    line = row.geometry.simplify(tolerance=0.01)
    
    # Check if the simplified geometry is a LineString (a simple line shape). We only want to process line shapes as these represent the roads.
    if isinstance(line, LineString):
        # Convert the line's coordinates into a list of tuples. Each tuple represents a point (node) on the line.
        points = list(map(tuple, line.coords))
        
        # Add these points as a path in the graph. This connects each consecutive pair of points with an edge, effectively building the road network in graph form.
        nx.add_path(G, points)

# Identify all nodes in the graph that have a degree of 1. A node with a degree of 1 only has one connection (edge) to another node, which means it is an endpoint without any further continuation - a dead end.
dead_ends = [node for node, degree in G.degree() if degree == 1]

# Convert the list of dead end nodes (points) back into a GeoDataFrame for further spatial analysis. This allows us to use geographic functions on the points.
dead_ends_gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy([p[0] for p in dead_ends], [p[1] for p in dead_ends]), crs=roads.crs)


In [27]:
# Save the dead end points to a new shapefile
dead_ends_gdf.to_file(r"C:\Users\bsf31\Documents\post-meds\data\signal\sbco_deadends.gpkg", driver='GPKG')

# Intersections

In [29]:
# Perform a spatial difference
 #This operation returns the geometric difference of points in intersections_deadends that are not in dead_end_points.
true_intersections = gpd.overlay(intersections_deadends_data_points, dead_ends_gdf, how='difference')

# Now true_intersections contains only the intersection points that are not dead ends.
# Optionally, save to file
true_intersections.to_file(r"C:\Users\bsf31\Documents\post-meds\data\signal\sbco_intersections.gpkg", driver='GPKG')

# QGIS

* using  '***Extract specific vertices***' with 0, -1 as input on original roads layer, produces 'Vertices'
* Take difference between 'Vertices' and 'sbco_deadends' produced from this notebook, produces 'Difference'
* Using 'Difference' again take difference using 'true_intersection' produced from this notebook, this produces another 'Differece' layer which is the remaining intersections not captured in this code.

Result is dependent on quality of original road layer. REQUIRES manual inspection, especially in long isolated roads and in urban areas with many intersections. In long roads a random vertix can be sometimes found that ties one road together. In urban areas with high number of intersections the many vertices are harder to capture. 

If 100% coverage of intersections is desired, — this analysis doesn't absolutely require it as the roads that are missing intersections reach an intersection or deadend — then the reccomendation is that after manual inspection and removal of unwanted points, merge the three layers of sbco_deadends, sbco_intersections, and the final difference layer. In qgis use the extract all vertices tool and take the difference with the merged layer. This will produce an extreme amount of points, create a blank shapefile, cut and paste the intersection points desired manually.

Using simplify on roads  may help reduce when extracting all vertices.

# Single Egress Points

In [None]:
# add intersection points as nodes in your graph 
for index, point in true_intersections.iterrows():
    G.add_node((point.geometry.x, point.geometry.y))

# Assuming you have a method to associate roads to these nodes
# (edges would already be added in the graph construction phase).

# Now, let's analyze single egress by removing intersection nodes temporarily
single_egress_points = []
for node in G.nodes():
    if G.degree(node) > 1:  # Ensure it's an intersection node
        G_temp = G.copy()
        G_temp.remove_node(node)
        if nx.is_connected(G_temp) == False:
            # Check how many components are formed
            components = list(nx.connected_components(G_temp))
            if len(components) > 1:
                # If more than one component, this node is a single egress point
                single_egress_points.append(node)

# Convert single egress points back to GeoDataFrame for use in GIS
single_egress_points_gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy([p[0] for p in single_egress_points], [p[1] for p in single_egress_points]), crs=roads.crs)

# Optionally, save to file
single_egress_points_gdf.to_file(r"C:\Users\bsf31\Documents\post-meds\data\signal\single_egress_points.gpkg", driver='GPKG')

In [None]:
# # Identify nodes that, when removed, increase the number of connected components - these nodes represent single egress points.
# cut_points = list(nx.articulation_points(G))

# # Convert these nodes back to a GeoDataFrame for spatial analysis
# cut_points_gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy([p[0] for p in cut_points], [p[1] for p in cut_points]))

In [None]:
# # Assuming roads_dissolved is your GeoDataFrame of roads
# G = nx.Graph()

# # Convert line geometries to graph, focusing only on true endpoints
# for line in roads_dissolved.geometry:
#     if isinstance(line, LineString):
#         # Simplify the line to minimize unnecessary intermediate points
#         simplified_line = line.simplify(tolerance=0.01)
#         endpoints = [simplified_line.coords[0], simplified_line.coords[-1]]
#         nx.add_path(G, endpoints)

# # Use DBSCAN or similar to cluster endpoints to mitigate multiple close points issue
# coords = [point for point, degree in G.nodes(data=True) if G.degree(point) == 1]
# db = DBSCAN(eps=10, min_samples=1).fit(coords)  # Adjust eps based on your spatial resolution needs
# clusters = db.labels_

# # Convert clustered endpoints back to a GeoDataFrame
# clustered_points = [MultiPoint([coords[i] for i in range(len(coords)) if clusters[i] == k]).centroid for k in set(clusters)]
# dead_ends_gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy([p.x for p in clustered_points], [p.y for p in clustered_points]))


In [None]:
# Identifying articulation points and analyzing their impact
articulation_points = list(nx.articulation_points(G))
impact_dict = {}

# Analyzing the size of components when an articulation point is removed
for point in articulation_points:
    G_temp = G.copy()
    G_temp.remove_node(point)
    components = list(nx.connected_components(G_temp))
    # Find the largest component size when the point is removed
    largest_component_size = max([len(comp) for comp in components])
    total_size = len(G_temp.nodes())
    isolated_size = total_size - largest_component_size
    # Store the impact information
    impact_dict[point] = isolated_size

# Filtering to find significant single egress points
significant_points = {k: v for k, v in impact_dict.items() if v > 50}  # adjust 50 to your specific threshold

# Convert significant articulation points back to GeoDataFrame
significant_points_gdf = gpd.GeoDataFrame(
    geometry=gpd.points_from_xy([p[0] for p in significant_points.keys()], [p[1] for p in significant_points.keys()]),
    crs=roads.crs
)


In [13]:
# Save the dead end points to a new shapefile
significant_points_gdf.to_file(r"C:\Users\bsf31\Documents\post-meds\data\signal\sbco_single_egress_points2.gpkg", driver='GPKG')

In [None]:
# # Extract the start and end points of each line
# start_points = dissolved_roads.geometry.apply(lambda x: Point(x.coords[0]))
# end_points = dissolved_roads.geometry.apply(lambda x: Point(x.coords[-1]))
# # Concatenate the start and end points into a single GeoSeries using pd.concat()
# all_points = gpd.GeoSeries(pd.concat([start_points, end_points]), crs=roads.crs)
# # Count the occurrence of each point
# point_counts = all_points.value_counts()


# # Filter the points that appear only once (dead ends)
# dead_ends = point_counts[point_counts == 1]
# # Create a dictionary with the dead end points and their counts
# dead_end_data = {'geometry': dead_ends.index, 'count': dead_ends.values}
#  #Create a GeoDataFrame with the dead end points
# dead_end_points = gpd.GeoDataFrame(dead_end_data, crs=roads.crs)
# # Save the dead end points to a new shapefile
# dead_end_points.to_file(r"C:\Users\bsf31\Documents\post-meds\data\signal\sbco_deadends.gpkg", driver='GPKG')
# #2

# # Create a spatial index for the road lines
# spatial_index = dissolved_roads.sindex

# # Function to check if a point is a dead end
# def is_dead_end(point):
#     # Create a small buffer around the point
#     buffer_dist = 1e-5  # Adjust the buffer distance as needed
#     buffer = point.buffer(buffer_dist)
    
#     # Find the indices of the lines that intersect the buffer
#     possible_matches_index = list(spatial_index.intersection(buffer.bounds))
#     possible_matches = dissolved_roads.iloc[possible_matches_index]
    
#     # Check if the point touches exactly one line geometry
#     touching_lines = [line for line in possible_matches.geometry if point.touches(line)]
    
#     return len(touching_lines) == 1
# # Function to check if a point is a dead end
# def is_dead_end(point):
#     # Create a small buffer around the point
#     buffer_dist = 1e-8  # Adjust the buffer distance as needed
#     buffer = point.buffer(buffer_dist)
    
#     # Find the indices of the lines that intersect the buffer
#     possible_matches_index = list(spatial_index.intersection(buffer.bounds))
#     possible_matches = dissolved_roads.iloc[possible_matches_index]
    
#     # Check if the point touches any line geometries
#     touches_line = any(point.touches(line) for line in possible_matches.geometry)
    
#     return not touches_line
 

# # Identify the dead end points
# dead_ends2 = all_points[all_points.apply(is_dead_end)]
# # Create a GeoDataFrame with the dead end points
# dead_end_points2 = gpd.GeoDataFrame(geometry=dead_ends2, crs=roads.crs)


# # Read the road dataset
# roads = gpd.read_file(r"C:\Users\bsf31\Documents\post-meds\data\signal\tl_2023_06083_roads_sbco.gpkg")
# # Dissolve the road lines to merge connected segments
# dissolved_roads = roads.dissolve(by='LINEARID') 
# # Reset the index of dissolved_roads
# dissolved_roads = dissolved_roads.reset_index()
# # Extract the start and end points of each line
# start_points = dissolved_roads.geometry.apply(lambda x: Point(x.coords[0]))
# end_points = dissolved_roads.geometry.apply(lambda x: Point(x.coords[-1]))
# # Concatenate the start and end points into a single GeoSeries
# all_points = pd.concat([start_points, end_points])

# # Create a DataFrame where LINEARID is repeated for each start and end point
# repeated_linearid = dissolved_roads.loc[dissolved_roads.index.repeat(2), 'LINEARID'].reset_index(drop=True)
# # Create a GeoDataFrame with all points and their corresponding LINEARID
# all_points_gdf = gpd.GeoDataFrame({'geometry': all_points, 'LINEARID': repeated_linearid})

# # Count the number of occurrences of each LINEARID for each point
# linearid_counts = all_points_gdf.groupby(all_points_gdf.geometry)['LINEARID'].count()

# # Identify the dead end points (points with only one occurrence of LINEARID)
# dead_ends = linearid_counts[linearid_counts == 1].index

# # Create a GeoDataFrame with the dead end points
# dead_end_points = gpd.GeoDataFrame(geometry=dead_ends, crs=roads.crs)

# # Save the dead end points to a new shapefile
# dead_end_points.to_file(r"C:\Users\bsf31\Documents\post-meds\data\signal\sbco_deadends.gpkg", driver='GPKG')
# # Load the road data
# roads = gpd.read_file(r"C:\Users\bsf31\Documents\post-meds\data\signal\tl_2023_06083_roads_sbco.gpkg")

