In [None]:
import pandas as pd  
import movingpandas as mpd 
import numpy 
from shapely.geometry import LineString
import matplotlib.pyplot as plt
from shapely import Point
import math

In [None]:
#for loading files in data
file_path = "trajectories.csv" 
df = pd.read_csv(file_path)

Data Cleaning and Preprocessing: 
add length filtering compared to closed road

In [None]:
def filter_data(df, closed_road, distance_threshold=0.02):
    """
    Filter data to only include points with non-zero speed and within a certain distance to a closed road.
    
    Args:
    - df: DataFrame with columns ['longitude', 'latitude', 'speed']
    - closed_road: Shapely LineString representing the closed road
    - distance_threshold: Maximum distance (in kilometers) from the LineString
    
    Returns:
    - Filtered DataFrame
    """
    def is_near_closed_road(row):
        point = Point(row['longitude'], row['latitude'])
        return closed_road.distance(point) * 111 <= distance_threshold  
    
    # Filter for non-zero speed and proximity to the closed road
    filtered_df = df[(df['speed'] != 0) & df.apply(is_near_closed_road, axis=1)]
    return filtered_df

def construct_trajectory_linestrings(df):
    """
    Construct trajectories as LineStrings from points grouped by traj_id.
    Args:
    - df: DataFrame with trajectory data

    Returns:
    - Dictionary of trajectories {traj_id: LineString}
    """
    trajectories = {}
    grouped = df.groupby('traj_id')
    for traj_id, group in grouped:
        points = [(lon, lat) for lon, lat in zip(group['longitude'], group['latitude'])]
        if len(points) > 1:  
            trajectories[traj_id] = LineString(points)
    return trajectories

trajectories = construct_trajectory_linestrings(df)

def get_orientation(linestring):
    """
    Compute the primary orientation (heading) of a road from a LineString.
    Args:
    - linestring: Shapely LineString representing the road

    Returns:
    - Heading (degrees) in [0, 360], measured clockwise from north
    """
    start = linestring.coords[0]  
    end = linestring.coords[-1]  
    dx = end[0] - start[0] 
    dy = end[1] - start[1]  
    
    angle = math.degrees(math.atan2(dx, dy))  
    heading = (angle + 360) % 360  
    return heading

def filter_deviating_trajectories(trajectories, road_heading, threshold=30):
    """
    Filter trajectories with headings deviating significantly from the road's primary direction.
    Args:
    - trajectories: Dictionary of LineStrings {traj_id: LineString}
    - road_heading: Heading of the road (degrees)
    - threshold: Maximum allowable deviation (degrees)

    Returns:
    - Dictionary of deviating trajectories {traj_id: LineString}
    """
    deviating_trajectories = {}
    for traj_id, trajectory in trajectories.items():
        traj_heading = get_orientation(trajectory)
        heading_diff = abs(traj_heading - road_heading) % 360
        if heading_diff > 180: 
            heading_diff = 360 - heading_diff
        
        if heading_diff > threshold:
            deviating_trajectories[traj_id] = trajectory
    
    return deviating_trajectories


Data Processing

In [None]:
def compare_trajectories_linestrings(trajectories, closed_road, proximity_threshold=0.02):
    """
    Compare LineStrings (trajectories) to the closed road LineString.
    Args:
    - trajectories: Dictionary of LineStrings {traj_id: LineString}
    - closed_road: Shapely LineString representing the closed road
    - proximity_threshold: Maximum distance (km) for proximity

    Returns:
    - Dictionary of results with {traj_id: {'distance': float, 'nearby': bool}}
    """
    results = {}
    for traj_id, linestring in trajectories.items():
        distance = linestring.distance(closed_road) * 111  
        nearby = distance <= proximity_threshold
        results[traj_id] = {
            'distance': distance,
            'nearby': nearby
        }
    return results

def is_road_open(results, min_trajectories=3):
    """
    Determine if a closed road is open based on the proximity of trajectories.
    Args:
    - results: Dictionary containing trajectory data with {traj_id: {'distance': float, 'nearby': bool}}
    - min_trajectories: Minimum number of unique trajectories required to mark the road as open

    Returns:
    - Boolean: True if the road is considered open, False otherwise
    """
    unique_trajectories = results['traj_id'].nunique()
    return unique_trajectories >= min_trajectories



Data Visualization

In [None]:
def plot_trajectories_and_road(trajectories, closed_road):
    """
    Plot all trajectories as LineStrings and the closed road.
    """
    plt.figure(figsize=(10, 8))
    xs, ys = closed_road.xy
    plt.plot(xs, ys, label="Closed Road", color="red", linewidth=2)

    for traj_id, linestring in trajectories.items():
        xs, ys = linestring.xy
        plt.plot(xs, ys, label=f"Trajectory {traj_id}", alpha=0.6)

    plt.xlabel("Longitude")
    plt.ylabel("Latitude")
    plt.title("Trajectories and Closed Road")
    plt.legend()
    plt.show()