In [1]:
import traci
import sys
import os
import csv

In [2]:
def initialize_simulation(sumocfg_file):
    try:
        traci.start(['sumo', '-c', sumocfg_file]) #sumo-gui
        print("TraCI started successfully")
    except Exception as e:
        print("TraCI failed to start: ", e)
        sys.exit()

In [3]:
def get_veh_ids():
    try:
        vehicle_ids = traci.vehicle.getIDList()
        return vehicle_ids
    except traci.exceptions.TraCIException as e:
        print(f"Error retrieving vehicle IDs: {e}")
        return []
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return []

In [4]:
def get_distance_to_vehicle_in_front(av_id):
    leader_info = traci.vehicle.getLeader(av_id)
    if leader_info is not None:
        _, distance = leader_info
        return distance
    else:
        return float('inf')

In [5]:
def check_blockers_and_non_blockers(veh_id):
    try:
        right_blockers_mode = 7
        left_blockers_mode = 6
        right_non_blockers_mode = 3  
        left_non_blockers_mode = 2   

        right_blockers = traci.vehicle.getNeighbors(veh_id, right_blockers_mode)
        left_blockers = traci.vehicle.getNeighbors(veh_id, left_blockers_mode)
        right_non_blockers = traci.vehicle.getNeighbors(veh_id, right_non_blockers_mode)
        left_non_blockers = traci.vehicle.getNeighbors(veh_id, left_non_blockers_mode)

        return {
            "left_blockers": left_blockers,
            "right_blockers": right_blockers,
            "left_non_blockers": left_non_blockers,
            "right_non_blockers": right_non_blockers
        }

    except traci.exceptions.TraCIException as e:
        print(f"Error checking neighbors for vehicle {veh_id}: {e}")
        return {"left_blockers": [], "right_blockers": [], "left_non_blockers": [], "right_non_blockers": []}
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return {"left_blockers": [], "right_blockers": [], "left_non_blockers": [], "right_non_blockers": []}

In [6]:
def calculate_longitudinal_distances(av_id, lane_offset):
    try:
        followers = []
        leaders = []

        if lane_offset == -1:
            followers = traci.vehicle.getLeftFollowers(av_id)
            leaders = traci.vehicle.getLeftLeaders(av_id)
        elif lane_offset == 1:
            followers = traci.vehicle.getRightFollowers(av_id)
            leaders = traci.vehicle.getRightLeaders(av_id)

        if followers:
            follower_id, follower_distance = followers[0]
            follower_velocity = traci.vehicle.getSpeed(follower_id)
        else:
            follower_id, follower_distance, follower_velocity = None, float('inf'), 0

        if leaders:
            leader_id, leader_distance = leaders[0]
        else:
            leader_id, leader_distance = None, float('inf')

        return {
            'leader_id': leader_id, 'leader_distance': leader_distance,
            'follower_id': follower_id, 'follower_distance': follower_distance, 'follower_velocity': follower_velocity
        }

    except traci.exceptions.TraCIException as e:
        print(f"Error calculating longitudinal distances for vehicle {av_id}: {e}")
        return {
            'leader_id': None, 'leader_distance': float('inf'),
            'follower_id': None, 'follower_distance': float('inf'), 'follower_velocity': 0
        }
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return {
            'leader_id': None, 'leader_distance': float('inf'),
            'follower_id': None, 'follower_distance': float('inf'), 'follower_velocity': 0
        }

In [7]:
def calculate_safety_distances(av_id):
    try:
        reaction_time = 1.5
        comfortable_braking = traci.vehicle.getDecel(av_id)
        max_braking = traci.vehicle.getEmergencyDecel(av_id)
        current_speed = traci.vehicle.getSpeed(av_id)

        dCrash = current_speed * reaction_time + (1/2 * comfortable_braking * reaction_time**2) \
                 + (current_speed + reaction_time * comfortable_braking)**2 / (2 * max_braking)
    
        dResponse = current_speed * reaction_time + (1/2 * comfortable_braking * reaction_time**2) \
                    + (current_speed + reaction_time * comfortable_braking)**2 / (2 * comfortable_braking)

        return dCrash, dResponse

    except traci.exceptions.TraCIException as e:
        print(f"Error calculating safety distances for vehicle {av_id}: {e}")
        return float('inf'), float('inf')
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return float('inf'), float('inf')


In [8]:
def assess_lane_change_safety(av_id, target_lane_distances, dResponse, target_lane_offset):
    try:
        leader_distance = target_lane_distances['leader_distance']
        follower_distance = target_lane_distances['follower_distance']

        follower_vehicles = traci.vehicle.getRightFollowers(av_id) if target_lane_offset > 0 else traci.vehicle.getLeftFollowers(av_id)
        follower_vehicle_id = follower_vehicles[0][0] if follower_vehicles else None

        follower_speed = traci.vehicle.getSpeed(follower_vehicle_id) if follower_vehicle_id else 0

        current_speed = traci.vehicle.getSpeed(av_id)

        is_safe_to_leader = leader_distance >= dResponse - (3.5 * current_speed)
        is_safe_to_follower = follower_distance >= dResponse + (3.5 * follower_speed)

        return is_safe_to_leader and is_safe_to_follower

    except traci.exceptions.TraCIException as e:
        print(f"Error assessing lane change safety for vehicle {av_id}: {e}")
        return False
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return False

In [9]:
def select_target_lane(av_id, left_lane_distances, right_lane_distances, left_lane_safety, right_lane_safety):
    try:
        current_lane_index = traci.vehicle.getLaneIndex(av_id)
        edge_id = traci.vehicle.getRoadID(av_id)
        num_lanes = traci.edge.getLaneNumber(edge_id)

        if current_lane_index > 0 and left_lane_safety:
            if left_lane_distances['follower_distance'] > right_lane_distances['follower_distance']:
                return -1

        if current_lane_index < num_lanes - 1 and right_lane_safety:
            return 1 

        return 0 

    except Exception as e:
        print(f"Error in selecting target lane for vehicle {av_id}: {e}")
        return 0  


In [10]:
def adjust_vehicle_behavior(av_id, dResponse, target_lane_offset):
    try:
        current_lane_index = traci.vehicle.getLaneIndex(av_id)
        edge_id = traci.vehicle.getRoadID(av_id)
        num_lanes = traci.edge.getLaneNumber(edge_id)

        target_lane_index = current_lane_index + target_lane_offset
        
        if 0 <= target_lane_index < num_lanes:
            if target_lane_offset != 0:

                traci.vehicle.changeLane(av_id, target_lane_index, int(traci.simulation.getDeltaT()))
        else:
            leader_info = traci.vehicle.getLeader(av_id)
            if leader_info is not None:
                _, leader_distance = leader_info
                if leader_distance < dResponse:
                    current_speed = traci.vehicle.getSpeed(av_id)
                    new_speed = max(0, current_speed - traci.vehicle.getDecel(av_id) * traci.simulation.getDeltaT())
                    traci.vehicle.setSpeed(av_id, new_speed)
            else:
                traci.vehicle.setSpeed(av_id, traci.vehicle.getMaxSpeed(av_id))

    except traci.exceptions.TraCIException as e:
        print(f"Error adjusting behavior for vehicle {av_id}: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

In [11]:
def check_lane_availability(av_id):

    try:
        current_lane_index = traci.vehicle.getLaneIndex(av_id)
        edge_id = traci.vehicle.getRoadID(av_id)
        num_lanes = traci.edge.getLaneNumber(edge_id)
        
        left_lane_available = current_lane_index > 0
        right_lane_available = current_lane_index < num_lanes - 1

        return {
            'left': left_lane_available,
            'right': right_lane_available
        }

    except traci.exceptions.TraCIException as e:
        print(f"Error checking lane availability for vehicle {av_id}: {e}")
        return {'left': False, 'right': False}
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return {'left': False, 'right': False}

In [12]:
def apply_lane_management_algorithm(av_id, dFront, dCrash, dResponse, vCurrent, vLimit, laneAvailability):
    try:
        newVelocity = vCurrent
        laneChange = 0

        if dFront < dResponse:
            newVelocity = max(0, vCurrent - traci.vehicle.getDecel(av_id) * traci.simulation.getDeltaT())

        if dFront < dCrash and (laneAvailability['left'] or laneAvailability['right']):
            left_lane_distances = calculate_longitudinal_distances(av_id, -1) if laneAvailability['left'] else {'leader_distance': float('inf'), 'follower_distance': float('inf')}
            right_lane_distances = calculate_longitudinal_distances(av_id, 1) if laneAvailability['right'] else {'leader_distance': float('inf'), 'follower_distance': float('inf')}

            left_lane_safety = assess_lane_change_safety(av_id, left_lane_distances, dResponse, -1)
            right_lane_safety = assess_lane_change_safety(av_id, right_lane_distances, dResponse, 1)

            laneChange = select_target_lane(av_id, left_lane_distances, right_lane_distances, left_lane_safety, right_lane_safety)

        if laneChange == 0 and newVelocity == vCurrent:
            newVelocity = min(vLimit, vCurrent + traci.vehicle.getAccel(av_id) * traci.simulation.getDeltaT())

        return newVelocity, laneChange

    except Exception as e:
        print(f"Error applying lane management algorithm for vehicle {av_id}: {e}")
        return vCurrent, 0


In [13]:
def execute_vehicle_commands(av_id, newVelocity, laneChange):
    try:
        previous_lane_index = traci.vehicle.getLaneIndex(av_id)
        traci.vehicle.setSpeed(av_id, newVelocity)

        new_lane_index = previous_lane_index
        if laneChange != 0:
            new_lane_index = previous_lane_index + laneChange
            traci.vehicle.changeLane(av_id, new_lane_index, traci.simulation.getDeltaT())

        return previous_lane_index, new_lane_index

    except traci.exceptions.TraCIException as e:
        print(f"Error executing commands for vehicle {av_id}: {e}")
        return None  # Return None to indicate an error
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return None  # Return None to indicate an error



In [14]:
def manage_av(av_id):
    try:
        dFront = get_distance_to_vehicle_in_front(av_id)
        dCrash, dResponse = calculate_safety_distances(av_id)
        vCurrent = traci.vehicle.getSpeed(av_id)
        vLimit = traci.vehicle.getMaxSpeed(av_id)

        lane_blockers = check_blockers_and_non_blockers(av_id)
        lane_availability = check_lane_availability(av_id)
        left_lane_available = lane_availability['left'] == True
        right_lane_available = lane_availability['right'] == True
        

        longitudinal_distances_left = calculate_longitudinal_distances(av_id, -1)
        longitudinal_distances_right = calculate_longitudinal_distances(av_id, 1)

        left_lane_safety = assess_lane_change_safety(av_id, longitudinal_distances_left, dResponse, -1)
        right_lane_safety = assess_lane_change_safety(av_id, longitudinal_distances_right, dResponse, 1)

        newVelocity, laneChange = apply_lane_management_algorithm(
            av_id, dFront, dCrash, dResponse, vCurrent, vLimit, 
            {'left': left_lane_available, 'right': right_lane_available}
        )

        command_result = execute_vehicle_commands(av_id, newVelocity, laneChange)
        if command_result is None:
            previous_lane, new_lane = -1, -1  # Set default values in case of error
        else:
            previous_lane, new_lane = command_result

        left_follower_id = longitudinal_distances_left['follower_id'] if longitudinal_distances_left.get('follower_id') else None
        right_follower_id = longitudinal_distances_right['follower_id'] if longitudinal_distances_right.get('follower_id') else None

        if left_follower_id:
            dCrashFollowerLeft, dResponseFollowerLeft = calculate_safety_distances(left_follower_id)
        else:
            dCrashFollowerLeft, dResponseFollowerLeft = float('inf'), float('inf')

        if right_follower_id:
            dCrashFollowerRight, dResponseFollowerRight = calculate_safety_distances(right_follower_id)
        else:
            dCrashFollowerRight, dResponseFollowerRight = float('inf'), float('inf')

        return {
            "dFront": dFront, "dCrash": dCrash, "dResponse": dResponse,
            "left_lane_available": left_lane_available, "right_lane_available": right_lane_available,
            "lane_blockers": lane_blockers,
            "longitudinal_distances_left": longitudinal_distances_left,
            "longitudinal_distances_right": longitudinal_distances_right,
            "left_lane_safety": left_lane_safety, "right_lane_safety": right_lane_safety,
            "dCrashFollowerLeft": dCrashFollowerLeft, "dResponseFollowerLeft": dResponseFollowerLeft,
            "dCrashFollowerRight": dCrashFollowerRight, "dResponseFollowerRight": dResponseFollowerRight,
            "previous_lane": previous_lane,
            "new_lane": new_lane
        }

    except traci.exceptions.TraCIException as e:
        print(f"Error managing vehicle {av_id}: {e}")
        return {}
    except Exception as e:
        print(f"An unexpected error toccurred: {e}")
        return {}



In [15]:
def close_simulation():
    print("Closing TraCI connection")
    traci.close()
    print("TraCI connection closed")

In [16]:
def run_simulation(sumocfg_file, av_ids):
    initialize_simulation(sumocfg_file)

    av_logs_dir = "avLogs"
    os.makedirs(av_logs_dir, exist_ok=True)

    csv_file_path = os.path.join(av_logs_dir, "all_vehicles_log.csv")
    with open(csv_file_path, mode='w', newline='') as file:
        writer = csv.writer(file)
        headers = ["Step", "Vehicle ID", "Distance to Front", "dCrashAV", "dResponseAV", 
                   "Left Lane Available", "Right Lane Available", "Left Blockers", "Right Blockers", 
                   "Left Non-Blockers", "Right Non-Blockers", "Left Leader Vehicle ID", "Left Leader Distance", 
                   "Left Follower Vehicle ID", "Left Follower Distance", "Left Follower Velocity", 
                   "Right Leader Vehicle ID", "Right Leader Distance", "Right Follower Vehicle ID", 
                   "Right Follower Distance", "Right Follower Velocity", "dCrashFollowerLeft", 
                   "dResponseFollowerLeft", "dCrashFollowerRight", "dResponseFollowerRight", 
                   "Lane Change Safety Left", "Lane Change Safety Right", "Previous Lane", "New Lane"]
        writer.writerow(headers)

        try:
            step = 0
            while traci.simulation.getMinExpectedNumber() > 0:
                traci.simulationStep()

                vehicle_ids = get_veh_ids()
                for av_id in vehicle_ids:
                    if av_id in av_ids:
                        data = manage_av(av_id)

                        left_leader_id, left_leader_distance = data["longitudinal_distances_left"]["leader_id"], data["longitudinal_distances_left"]["leader_distance"]
                        left_follower_id, left_follower_distance, left_follower_velocity = data["longitudinal_distances_left"]["follower_id"], data["longitudinal_distances_left"]["follower_distance"], data["longitudinal_distances_left"]["follower_velocity"]

                        right_leader_id, right_leader_distance = data["longitudinal_distances_right"]["leader_id"], data["longitudinal_distances_right"]["leader_distance"]
                        right_follower_id, right_follower_distance, right_follower_velocity = data["longitudinal_distances_right"]["follower_id"], data["longitudinal_distances_right"]["follower_distance"], data["longitudinal_distances_right"]["follower_velocity"]

                        if data:
                            writer.writerow([
                                step, av_id, data["dFront"], data["dCrash"], data["dResponse"],
                                data["left_lane_available"], data["right_lane_available"],
                                data["lane_blockers"]["left_blockers"], data["lane_blockers"]["right_blockers"],
                                data["lane_blockers"]["left_non_blockers"], data["lane_blockers"]["right_non_blockers"],
                                left_leader_id, left_leader_distance, left_follower_id, left_follower_distance, left_follower_velocity,
                                right_leader_id, right_leader_distance, right_follower_id, right_follower_distance, right_follower_velocity,
                                data["dCrashFollowerLeft"], data["dResponseFollowerLeft"], data["dCrashFollowerRight"], data["dResponseFollowerRight"],
                                data["left_lane_safety"], data["right_lane_safety"], data["previous_lane"], data["new_lane"]
                            ])

                step += 1

        except Exception as e:
            print(f"An unexpected error occurred during the simulation: {e}")
        finally:
            close_simulation()


In [17]:
av_vehicle_ids = [
    "veh11", "veh12", "veh13", "veh15", "veh17", "veh18", "veh19", "veh2", 
    "veh21", "veh22", "veh23", "veh24", "veh27", "veh28", "veh29", "veh3", 
    "veh30", "veh33", "veh34", "veh35", "veh36", "veh37", "veh39", "veh4", 
    "veh40", "veh41", "veh42", "veh43", "veh44", "veh45", "veh46", "veh48", 
    "veh49", "veh51", "veh53", "veh55", "veh56", "veh58", "veh60", "veh61", "veh8"
]
sumocfg_file_path = 'map4\osm.sumocfg'
run_simulation(sumocfg_file_path, av_vehicle_ids)

TraCI started successfully
