In [2]:
from progress_utils import run_with_progress_bar  # Import the function
import osm2gmns as og
import osmnx as ox
import geopandas as gpd
import pandas as pd
from shapely import wkt
import rasterio
import numpy as np
from shapely.geometry import Point
import os
import time
import ipywidgets as widgets
from IPython.display import display
from threading import Thread

osm2gmns, 0.7.5


# Getting the base dataset from osm using osm2gmns

Provide the number corresponding to the location you are interested in from OpenStreetMap. Check the example screenshot [here](https://osm2gmns.readthedocs.io/en/latest/_images/osm_id.png).


In [5]:
def get_osm():
    net2 = og.downloadOSMData(179032, './data/raw_data/map.osm')
run_with_progress_bar(get_osm)

IntProgress(value=0, description='0% Complete | Time: 0s', layout=Layout(width='100%'), style=ProgressStyle(de…

valid reponses got from API server.
receving data...
map data has been written to ./data/raw_data/map.osm


Extract the necessary files from the OSM file, specifically `link.csv`, `poi.csv`, and `node.csv`.

In [6]:
def get_all_datasets():
    net = og.getNetFromFile('./data/raw_data/map.osm',network_types=('auto'), POI = True, default_lanes= True, default_speed = True, default_capacity=True)
    og.generateNodeActivityInfo(net)
    og.connectPOIWithNet(net)
    og.outputNetToCSV(net, './data/raw_data/')
# Run the combined function with a single progress bar
run_with_progress_bar(get_all_datasets)

IntProgress(value=0, description='0% Complete | Time: 0s', layout=Layout(width='100%'), style=ProgressStyle(de…

arguments used for network parsing:
  filename: ./data/raw_data/map.osm
  network_types: auto
  link_types: all
  POI: True
  POI_sampling_ratio: 1.0
  strict_mode: True
  offset: no
  min_nodes: 1
  combine: False
  bbox: None
  default_lanes: True
  default_speed: True
  default_capacity: True
  start_node_id: 0
  start_link_id: 0

Building Network from OSM file
  reading osm file
  parsing osm network
    generating nodes and links
    generating POIs
  number of nodes: 5949, number of links: 14296, number of pois: 34402
Generating Node Activity Information
Outputting Network Files


#### Loading the exported dataset for further analysis

In [7]:
#load in the link datasets
df = pd.read_csv('./data/raw_data/link.csv', encoding='ISO-8859-1',low_memory=False)

# Step 2: Convert the 'geometry' column (containing LINESTRING in WKT format) to shapely geometries
df['geometry'] = df['geometry'].apply(wkt.loads)

# Step 3: Convert the DataFrame to a GeoDataFrame
link_gdf = gpd.GeoDataFrame(df, geometry='geometry')

# Optional: Set the Coordinate Reference System (CRS)
link_gdf.set_crs(epsg=4326, inplace=True)  # Example for WGS84 (change EPSG code if necessary)

# load the node datasets

df_node = pd.read_csv('./data/raw_data/node.csv', encoding='ISO-8859-1',low_memory=False)
df_node['activity_type'] = df_node['activity_type'].fillna('poi')

# Convert the DataFrame to a GeoDataFrame
nodes = gpd.GeoDataFrame(df_node, geometry=gpd.points_from_xy(df_node['x_coord'], df_node['y_coord']))

#Set the Coordinate Reference System (CRS) for the GeoDataFrame
nodes.set_crs(epsg=4326, inplace=True)

# load the node datasets
df_poi = pd.read_csv('./data/raw_data/poi.csv', encoding='ISO-8859-1',low_memory=False)

print(' You have successfully import all the needed dataset for this steps' )

 You have successfully import all the needed dataset for this steps


#### Rename and Calculate for Number of lanes, Road Type, Speed limit, Travel time, and width 

In [8]:
# Create a copy to avoid modifying the original joined_gdf directly
updated_gdf = link_gdf.copy()

# Rename columns
updated_gdf.rename(columns={'lanes': 'Number of Lanes', 'link_type_name': 'Road Type'}, inplace=True)

# Convert free_speed from km/h to m/s for travel_time calculation (1 km/h = 1000/3600 m/s)
updated_gdf['free_speed_m/s'] = updated_gdf['free_speed'] * (1000 / 3600)  # convert km/h to m/s

# Calculate travel_time (in seconds), using length (in meters) and free_speed (in m/s)
updated_gdf['travel_time'] = updated_gdf['length'] / updated_gdf['free_speed_m/s']

# Calculate Speed Limit in mph (1 km/h = 0.621371 mph)
updated_gdf['Speed Limit'] = updated_gdf['free_speed'] * 0.621371

# Define road width based on road type
def assign_road_width(road_type, num_lanes):
    road_widths = {
        'Residential': 10,
        'Motorway': 12,
        'Unclassified': 10,
        'Tertiary': 10,
        'Secondary': 11,
        'Primary': 12,
        'Trunk': 12,
        'Service': 9,
        'Living Street': 9,
        'Track': 8,
        'Footway': 5,
        'Connector': 10
    }
    base_width = road_widths.get(road_type, 10)  # Default to 10 feet if the road type isn't found
    return base_width * num_lanes  # Multiply by the number of lanes

# Apply the road width calculation and add it as a 'width' column
updated_gdf['width'] = updated_gdf.apply(lambda row: assign_road_width(row['Road Type'], row['Number of Lanes']), axis=1)

# Remove the 'free_speed_m/s' column since it's only for calculation purposes
updated_gdf.drop(columns=['free_speed_m/s'], inplace=True)

print('You have added Number of Lanes, Road type, Speed Limit, Travel time, and Road Width column to your datasets')


You have added Number of Lanes, Road type, Speed Limit, Travel time, and Road Width column to your datasets


#### Calculate the The Grade

In [11]:
# Load DEM (assuming you have a raster file containing elevation data)
dem_path = "./data/raw_data/Wilmington, NC_Copernicus_DSM_COG_10_N34_00_W078_00_HAND.tif"
dem = rasterio.open(dem_path)

# Function to extract elevation using rasterio
def extract_elevation(geometry, dem):
    coords = [(geometry.x, geometry.y)]
    for val in dem.sample(coords):
        return val[0]

# Function to calculate the grade
def calculate_grade(row, node_gdf):
    # Skip calculation for "Connector" road types
    if row['Road Type'] == 'Connector':
        return np.nan, np.nan
    
    # Extract node information using 'from_node_id' and 'to_node_id'
    start_node_elev = node_gdf.loc[node_gdf['node_id'] == row['from_node_id'], 'elevation'].values[0]
    end_node_elev = node_gdf.loc[node_gdf['node_id'] == row['to_node_id'], 'elevation'].values[0]
    
    # Calculate the elevation difference and horizontal distance (length)
    elevation_diff = end_node_elev - start_node_elev
    distance = row['length']
    
    # Calculate grade (elevation difference divided by length)
    if distance == 0:
        return np.nan, np.nan  # Avoid division by zero
    grade = elevation_diff / distance
    grade_abs = abs(grade)
    
    return grade, grade_abs

# Extract elevation for each point in nodes GeoDataFrame
nodes['elevation'] = nodes['geometry'].apply(lambda geom: extract_elevation(geom, dem))

# Calculate grade and grade_abs for updated_gdf (skipping 'Connector' road types)
updated_gdf['grade'], updated_gdf['grade_abs'] = zip(*updated_gdf.apply(calculate_grade, axis=1, node_gdf=nodes))

# Remove elevation calculation for activity_type = 'poi'
nodes.loc[nodes['activity_type'] == 'poi', 'elevation'] = np.nan

print('We now have the elevation information for our datasets')

We now have the elevation information for our datasets


#### Getting the elevation for the links

In [18]:
links = updated_gdf.copy()

# Ensure 'node_id' and 'elevation' columns exist in the nodes file
if 'node_id' not in nodes.columns or 'elevation' not in nodes.columns:
    raise ValueError("The nodes file must contain 'node_id' and 'elevation' columns.")

# Ensure 'from_node_id' and 'to_node_id' exist in the links file
if 'from_node_id' not in links.columns or 'to_node_id' not in links.columns:
    raise ValueError("The links file must contain 'from_node_id' and 'to_node_id' columns.")

# Create a mapping from node_id to elevation
node_elevation_map = dict(zip(nodes['node_id'], nodes['elevation']))

# Add elevation_from and elevation_to columns to the links file
def get_elevation(node_id):
    return node_elevation_map.get(node_id, None)

links['elevation_from'] = links['from_node_id'].map(get_elevation)
links['elevation_to'] = links['to_node_id'].map(get_elevation)

# Set elevation_to and elevation_from to null for connectors
links.loc[links['Road Type'] == 'Connector', ['elevation_from', 'elevation_to']] = None

print("Elevation columns have been successfully added to the links file.")


Elevation columns have been successfully added to the links file.


In [19]:
nodes.to_csv('./data/node.csv', index=False)
links.to_csv('./data/link.csv', index=False)
df_poi.to_csv('./data/poi.csv', index=False)

### Rencode to UTF-8

In [20]:
def convert_to_utf8(input_dir, file_name):
    input_file = os.path.join(input_dir, file_name)
    
    try:
        # Read the file with the current encoding (ISO-8859-1 or Windows-1252, etc.)
        with open(input_file, 'r', encoding='ISO-8859-1') as f:
            content = f.read()
        
        # Write the content back to the original file in UTF-8 encoding
        with open(input_file, 'w', encoding='utf-8') as f:
            f.write(content)
        
        print(f"File '{file_name}' successfully converted to UTF-8.")
    except Exception as e:
        print(f"Failed to convert file '{file_name}': {e}")

if __name__ == "__main__":
    input_dir = './data'
    
    # List of files to convert
    files_to_convert = ['poi.csv', 'node.csv', 'link.csv']
    
    # Loop over the files and convert each one
    for file_name in files_to_convert:
        convert_to_utf8(input_dir, file_name)


File 'poi.csv' successfully converted to UTF-8.
File 'node.csv' successfully converted to UTF-8.
File 'link.csv' successfully converted to UTF-8.
