In [None]:
# imports
import json
import geopandas as gpd
import pandas as pd
import shapely
from shapely.ops import substring
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.cluster import DBSCAN


probe_data = pd.read_csv("23608577_probe_data.csv")

full_topology_data = gpd.read_file(r"23608577_full_topology_data.geojson")
combined_data = gpd.read_file(r"23608577_combined.geojson")
access_chars_data = gpd.read_file(r"23608577_access_chars.geojson")
signs_data = gpd.read_file(r"23608577_signs.geojson")
validations_data = gpd.read_file(r"23608577_validations.geojson")

In [None]:
# getting existence score for each sign

def get_existence_score(conf):
    if isinstance(conf, str):
        try:
            conf = json.loads(conf)
        except json.JSONDecodeError:
            return None
    if isinstance(conf, dict) and "simpleScores" in conf:
        for score_entry in conf["simpleScores"]:
            if score_entry.get("scoreType") == "EXISTENCE":
                return score_entry.get("score")
    return None

signs_data["existenceScore"] = signs_data["confidence"].apply(get_existence_score)
# print(signs_data["existenceScore"])
signs_data.shape

(5662, 26)

In [None]:
# filtering for motorway signs

motorway_signs = signs_data[signs_data['signType'] == 'MOTORWAY']
print(motorway_signs['existenceScore'])

5465    1.0
5647    1.0
Name: existenceScore, dtype: float64


In [None]:
# scenario 1

for _, vio in validations_data.iterrows():
    feature_id = vio.get("Feature ID")

    match = motorway_signs[motorway_signs["id"] == feature_id]

    if not match.empty:
        existence_score = match.iloc[0].get("existenceScore")

        if existence_score is not None:
            if existence_score > 0.7:
                print(f"Feature ID {feature_id}: existenceScore {existence_score:.2f} > 0.7 ----- CHECKING SCENARIO 2")
            else:
                print(f"Feature ID {feature_id}: existenceScore {existence_score:.2f} <= 0.7 ----- RESOLVED") # drop this sign from validations dataset - it is not a violation
        else:
            print(f"Feature ID {feature_id}: existenceScore is None")
    else:
        print(f"Feature ID {feature_id} not found in motorway_signs")

# validations_filtered = validations_data[validations_data["Feature ID"] == motorway_signs[motorway_signs["existenceScore"] > 0.7]]
filtered_motorway_signs = motorway_signs[motorway_signs["existenceScore"] > 0.7]
feature_ids = filtered_motorway_signs["id"]
validations_filtered = validations_data[validations_data["Feature ID"].isin(feature_ids)]

print(validations_filtered.shape)
print(validations_filtered['geometry'])

Feature ID urn:here::here:signs:1622369126135402107: existenceScore 1.00 > 0.7 ----- CHECKING SCENARIO 2
(1, 12)
0    POINT (10.0047 53.51555)
Name: geometry, dtype: geometry


In [None]:
# function to calculate tile bounds (latitudes and longitudes)
def calculate_tile_bounds(latitude, longitude, tile_size_meters=1000):
    # radius of earth in meters
    earth_radius = 6378137.0

    # convert tile size from meters to degrees
    # for latitude, 1 degree is approximately 111,111 meters (varies slightly with latitude)
    # for longitude, 1 degree varies with latitude
    half_tile_size = tile_size_meters / 2.0

    # calculate latitude bounds (north-south)
    # 111,111 meters per degree of latitude is a good approximation
    lat_meters_per_degree = 111111.0
    lat_delta = half_tile_size / lat_meters_per_degree

    north_bound = latitude + lat_delta
    south_bound = latitude - lat_delta

    # calculate longitude bounds (east-west)
    # cos(lat) accounts for the convergence of meridians
    lon_meters_per_degree = 111111.0 * np.cos(np.radians(latitude))
    lon_delta = half_tile_size / lon_meters_per_degree

    east_bound = longitude + lon_delta
    west_bound = longitude - lon_delta

    return {"north": north_bound, "south": south_bound, "east": east_bound, "west": west_bound}


In [None]:
#make geojson output file

import json
import geopandas as gpd
import pandas as pd
import shapely
from shapely.ops import substring

##########################################################
### LOAD FULL TOPOLOGY TILE
##########################################################
topology_gdf = gpd.read_file(r"23608577_full_topology_data.geojson")

##########################################################
### FILTER TO THE SELECTED PARAMETERIZED ATTRIBUTE
##########################################################
access_characteristics_gdf = topology_gdf[['id','accessCharacteristics', 'topologyCharacteristics', 'geometry']].copy()

##########################################################
### NORMALIZE THE COLUMN WITH JSON LOADS
##########################################################
def convert_data(x):
            try:
                return json.loads(x)
            except:
                return {}
access_characteristics_gdf['accessCharacteristics'] = access_characteristics_gdf['accessCharacteristics'].apply(convert_data)
access_characteristics_gdf['topologyCharacteristics'] = access_characteristics_gdf['topologyCharacteristics'].apply(convert_data)

def extract_is_motorway(top_char):
    try:
        return top_char.get('isMotorway', [{}])[0].get('value', None)
    except (TypeError, IndexError, AttributeError):
        return None

access_characteristics_gdf['isMotorway'] = access_characteristics_gdf['topologyCharacteristics'].apply(extract_is_motorway)

##########################################################
### EXPLODE DATASET BASED ON THE PARAMETERIZED ATTRIBUTION COLUMN (essentially duplicate some geoms before cutting)
##########################################################
access_characteristics_gdf = access_characteristics_gdf.explode('accessCharacteristics')
access_characteristics_gdf['id'] = access_characteristics_gdf['id']
# access_characteristics_gdf['adasTopology'] = access_characteristics_gdf['adasTopology'][]
# def extract_endNode_id(adas_topology):
#     try:
#         return adas_topology['endNodeTraversals'][0]['references'][0]['id']
#     except (KeyError, IndexError, TypeError):
#         return None

# access_characteristics_gdf['id'] = access_characteristics_gdf['adasTopology'].apply(extract_endNode_id)

##########################################################
### GET START AND END OFFSET
##########################################################
access_characteristics_gdf['startOffset'] = access_characteristics_gdf['accessCharacteristics'].str['range'].str['startOffset']
access_characteristics_gdf['endOffset'] = access_characteristics_gdf['accessCharacteristics'].str['range'].str['endOffset']

##########################################################
### SPLIT SHAPELY GEOMETRY
##########################################################
def split_shapely_string(geom_column, start_offset, end_offset):
    try:
        result = substring(geom_column, start_dist=start_offset, end_dist=end_offset, normalized=True)
        result = shapely.transform(result, lambda x: x, include_z=False)
    except Exception as e:
        result = None
        print("SKIPPED GEOMETRY SUBSTRING SPLIT", e)
    return result
access_characteristics_gdf['geometry'] = access_characteristics_gdf.apply(lambda x: split_shapely_string(x.geometry, x['startOffset'], x['endOffset']), axis=1)

##########################################################
### JSON NORMALIZE NEW GDF TO SPLIT OUT COLUMNS
##########################################################
access_characteristics_gdf = pd.json_normalize(data=pd.DataFrame.to_dict(access_characteristics_gdf, orient='records'))
access_characteristics_gdf = gpd.GeoDataFrame(data=access_characteristics_gdf, geometry=access_characteristics_gdf['geometry'])

access_characteristics_gdf.to_file("output577_new.geojson")

  write(


In [None]:
#make csv file from data which has id pedestrian and start and end and motorway
import geopandas as gpd
import pandas as pd

# Load GeoJSON data
geojson_path = 'output577_new.geojson'
gdf = gpd.read_file(geojson_path)

# Extract required fields
processed_data = []
for _, row in gdf.iterrows():
    geometry = row.geometry
    properties = row

    # Extract numeric ID at the end
    id_full = properties.get('id', '')
    id_ = id_full.split(':')[-1] if id_full else None

    pedestrian = row.get('accessCharacteristics.pedestrian')
    # Access pedestrian and topologyCharacteristics values correctly
    #access_char = properties.get('accessCharacteristics', {})
    motorway = properties.get('isMotorway', {})


    # Extract isMotorway safely
    # is_motorway = None
    # try:
    #     is_motorway = topology_char.get('isMotorway', [{}])[0].get('value', None)
    # except (IndexError, AttributeError, TypeError):
    #     pass

    # Start and end points of geometry
    if geometry and geometry.coords:
        startpt = list(geometry.coords)[0]
        endpt = list(geometry.coords)[-1]
    else:
        startpt = endpt = None

    processed_data.append({
        "id": id_,
        "Pedestrian": pedestrian,
        "Startpt": startpt,
        "Endpt": endpt,
        "isMotorway": motorway
    })

# Convert to DataFrame and save as CSV
df = pd.DataFrame(processed_data)
df.to_csv('output577_motorway.csv', index=False)

print("CSV file created")


CSV file created


In [None]:
#make math csv

# import pandas as pd
# import numpy as np
from ast import literal_eval

# Load the CSV
df = pd.read_csv("output577_motorway.csv")  # Replace with your path if needed

# Convert Startpt and Endpt string values to tuples
df["Startpt"] = df["Startpt"].apply(literal_eval)
df["Endpt"] = df["Endpt"].apply(literal_eval)

# Define the point to test against (longitude, latitude)
# test_point = (10.0, 53.5)  # <- Update this point as needed

# Function to compute perpendicular distance and intersection coordinate
def perpendicular_distance_and_intersection(p, a, b):
    y0, x0 = p
    y1, x1 = a
    y2, x2 = b

    AB = np.array([x2 - x1, y2 - y1])
    AP = np.array([x0 - x1, y0 - y1])

    ab_squared = np.dot(AB, AB)
    if ab_squared == 0:
        intersect = a  # Line segment is a point
    else:
        t = np.dot(AP, AB) / ab_squared
        t = max(0, min(1, t))  # Clamp to segment
        intersect = (float(x1 + t * AB[0]), float(y1 + t * AB[1]))

    dist = np.linalg.norm(np.array(intersect) - np.array(p))
    return dist, intersect

for _, vio in validations_filtered.iterrows():
  latitude = validations_filtered['geometry'].iloc[0].y
  longitude = validations_filtered['geometry'].iloc[0].x

  test_point = (latitude, longitude)
  print(test_point)
  # Apply function to each row
  df["perpendicular_dist"] = df.apply(
      lambda row: perpendicular_distance_and_intersection(test_point, row["Startpt"], row["Endpt"])[0],
      axis=1
  )

  df["intersect_cord"] = df.apply(
      lambda row: perpendicular_distance_and_intersection(test_point, row["Startpt"], row["Endpt"])[1],
      axis=1
  )

# Save the updated CSV (optional)
df.to_csv("output_with_distances.csv", index=False)


(53.51555, 10.0047)


In [None]:
import re

def extract_topology_id(error_message):
    # Regular expression to match the topology urn
    match = re.search(r"urn:here::here:Topology:(\d+)", error_message)

    if match:
        return match.group(1)  # Return the topology ID (the part after "urn:here::here:Topology:")
    else:
        return None  # Return None if no match is found

# Example usage
# error_message = "Motorway Sign urn:here::here:signs:1622369126135402107 at Lat 10.00470037883686 Lon 53.515550601801934 is associated to a Topology urn:here::here:Topology:89894481 that has a range for Pedestrian = TRUE within 20m distance."

# topology_id = extract_topology_id(error_message)
# print(topology_id)  # Should print '89894481'


In [None]:
# scenario 2
# import numpy as np
# for _, vio in validations_filtered.iterrows():
#   latitude = vio['geometry'].y
#   longitude = vio['geometry'].x
#   bounds = calculate_tile_bounds(latitude, longitude)
#   # print(latitude)
#   # print(longitude)
#   # print(bounds)
#   temp_df = pd.DataFrame(columns = df.columns)
#   # TODO
#   # FOREACH SEGMENT
#   for _, seg in df.iterrows():
#     if (seg['intersect_cord'][0] < bounds['north'] and seg['intersect_cord'][0] > bounds['south'] and
#         seg['intersect_cord'][1] > bounds['west'] and seg['intersect_cord'][1] < bounds['east']):
#       temp_df = pd.concat([temp_df, pd.DataFrame([seg])], ignore_index=True)
#     else:
#       continue

#     if temp_df.empty:
#         print("No segments found in bounds.")
#         continue
#     temp_df.sort_values(by='perpendicular_dist')
#     flag = 0

#     for _, row in temp_df.iterrows():
#       if (row['isMotorway']):
#         flag = 1
#         top_id = extract_topology_id(validations_filtered['Error Message'])
#         if (temp_df['id'] != top_id):
#           print(f"Feature ID {feature_id}: needs to be assigned to Topological Segment {top_id} ----- RESOLVED")
#         else:
#           print(f"Feature ID {feature_id}: already assigned to Topological Segment {top_id} ----- CHECKING SCENARIO 3")
#         break
#     if (flag == 0):
#       print("No Motorways found in this area - confirm for sign existence")

    # if intersection point is in bounds
      # append segment to temp_df
    # else
      # continue
  # sort temp_df by perpendicular distance
  # set flag = 0
  # FOREACH ROW IN TEMP_DF
    # if (isMotorway == true):
      # set flag = 1
      # if (already assigned to sign):
        # fallthrough to scenario 3 ---- CHECKING SCENARIO 3
      # else
        # change assignment to current row
        # remove from validations_data_filtered ---- RESOLVED
      # BREAK
  # if flag == 0
    # print no mways found in area



In [None]:
import numpy as np
print(validations_filtered.shape)
if (validations_filtered.shape[0] > 0):
  for i, vio in validations_filtered.iterrows():
      latitude = vio['geometry'].y
      longitude = vio['geometry'].x
      bounds = calculate_tile_bounds(latitude, longitude)
      print(bounds)

      temp_df = pd.DataFrame(columns=df.columns)

      for _, seg in df.iterrows():
          if (seg['intersect_cord'][0] < bounds['north'] and seg['intersect_cord'][0] > bounds['south'] and
              seg['intersect_cord'][1] > bounds['west'] and seg['intersect_cord'][1] < bounds['east']):
              temp_df = pd.concat([temp_df, pd.DataFrame([seg])], ignore_index=True)

      if temp_df.empty:
          print("No segments found in bounds.")
          continue

      print(temp_df.shape[0])

      temp_df = temp_df.sort_values(by='perpendicular_dist')
      flag = 0

      for _, row in temp_df.iterrows():
          if row['isMotorway']:
              flag = 1
              top_id = extract_topology_id(vio['Error Message'])
              feature_id = vio['Feature ID']
              if row['id'] != top_id:
                  print(f"Feature ID {feature_id}: needs to be assigned to Topological Segment {top_id} ----- RESOLVED")
                  validations_filtered = validations_filtered.drop(i)
              else:
                  print(f"Feature ID {feature_id}: already assigned to Topological Segment {top_id} ----- CHECKING SCENARIO 3")
              break

      if flag == 0:
          print(f"Feature ID {vio['Feature ID']}: No Motorways found in this area - confirm for sign existence")

print(validations_filtered.shape)


(1, 12)
{'north': 53.5200500045, 'south': 53.511049995499995, 'east': np.float64(10.012268062047452), 'west': np.float64(9.997131937952547)}


  temp_df = pd.concat([temp_df, pd.DataFrame([seg])], ignore_index=True)


66
Feature ID urn:here::here:signs:1622369126135402107: No Motorways found in this area - confirm for sign existence
(1, 12)


In [None]:
# scenario 3
print(validations_filtered.shape)
if (validations_filtered.shape[0] > 0):
  for i, vio in validations_filtered.iterrows():
    latitude = vio['geometry'].y
    longitude = vio['geometry'].x
    bounds = calculate_tile_bounds(latitude, longitude)

    probe_filtered = probe_data[(probe_data['latitude'] >= bounds['south']) & (probe_data['latitude'] <= bounds['north']) & (probe_data['longitude'] >= bounds['west']) & (probe_data['longitude'] <= bounds['east'])]
    # print(probe_filtered)
    if probe_filtered['speed'].mean() > 70:
      print(f"Feature ID {vio['Feature ID']}: Average speed is too high for pedestrians. Pedestrian Flag needs to be set to False. ---- RESOLVED")
      validations_filtered = validations_filtered.drop(i)

print(validations_filtered.shape)


(1, 12)
(1, 12)


In [None]:
# scenario 4

if (validations_filtered.shape[0] == 0):
  print("All violations resolved")
else:
  for i, vio in validations_filtered.iterrows():
    print(f"Feature ID {vio['Feature ID']}: Represents legitimate exception. ---- ALL CASES RESOLVED")

Feature ID urn:here::here:signs:1622369126135402107: Represents legitimate exception. ---- ALL CASES RESOLVED


In [None]:
# bounds output checker

import math

def haversine(lat1, lon1, lat2, lon2):
    # Radius of the Earth in meters
    R = 6371000

    # Convert latitude and longitude from degrees to radians
    lat1_rad = math.radians(lat1)
    lon1_rad = math.radians(lon1)
    lat2_rad = math.radians(lat2)
    lon2_rad = math.radians(lon2)

    # Haversine formula
    dlat = lat2_rad - lat1_rad
    dlon = lon2_rad - lon1_rad
    a = math.sin(dlat / 2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon / 2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

    # Calculate distance
    distance = R * c
    return distance

# Define the coordinates from your bounds
north = 53.51573000018
south = 53.515369999819995
east = 10.005002722481898
west = 10.004397277518102

# Calculate distances between each pair of latitudes and longitudes
distance_north_south = haversine(north, east, south, east)  # North-South distance
distance_east_west = haversine(north, east, north, west)  # East-West distance

print(f"North-South distance: {distance_north_south} meters")
print(f"East-West distance: {distance_east_west} meters")


North-South distance: 40.03021362283044 meters
East-West distance: 40.030043572189854 meters
