In [1]:
import traci
import traci.constants as tc
import os
import sys
import time

In [2]:
if 'SUMO_HOME' in os.environ:
    tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
    sys.path.append(tools)
else:
    sys.exit("please declare environment variable 'SUMO_HOME'")

In [3]:
sumo_binary = r'C:\Program Files (x86)\Eclipse\Sumo\bin\sumo-gui'
sumo_cmd = [sumo_binary, '-c', 'traffic.sumocfg']

In [53]:
traci.close()

In [54]:
traci.start(sumo_cmd)

(20, 'SUMO 1.15.0')

In [6]:
def group_lanes_flow(lanes):
    flow_lanes = {}
    for lane in lanes:
        flow_id_length = 2
        if lane[0] == '-':
            flow_id_length = 3
        
        if not lane[:flow_id_length] in flow_lanes:
            flow_lanes[lane[:flow_id_length]] = {
                'lanes': [lane]
            }
        else:
            flow_lanes[lane[:flow_id_length]]['lanes'].append(lane)

    return flow_lanes

In [7]:
def get_vehicle_list(flow):
    flow_vehicle_list = []
    for lane in flow['lanes']:
        vehicle_list = traci.lane.getLastStepVehicleIDs(lane)
        flow_vehicle_list.extend(vehicle_list)
        
    return flow_vehicle_list

In [8]:
def count_vehicle_per_flow(flow):
    total_vehicle = 0
    for lane in flow['lanes']:
        total_vehicle += traci.lane.getLastStepVehicleNumber(lane)
        
    return total_vehicle

In [9]:
# input: {'vehicle_id': waiting_time(int)}
def calc_vehicle_waiting_time(vehicle_list_dict, vehicle_on_simulation):
    for vehicle in vehicle_list_dict:
        if vehicle in vehicle_on_simulation:
            waiting_time = traci.vehicle.getAccumulatedWaitingTime(vehicle)
            vehicle_list_dict[vehicle] = max(vehicle_list_dict[vehicle], waiting_time)
            
    return vehicle_list_dict

In [10]:
def calc_flow_stats(flow, num_minute):
#     avg_vehicle_per_minute = len(flow['vehicle_list'])
    total_vehicle = len(flow['vehicle_list'])
    total_waiting_time = 0
    count_wait_vehicle = 0
    for vehicle in flow['vehicle_list']:
        total_waiting_time += flow['vehicle_list'][vehicle]
        if flow['vehicle_list'][vehicle] > 0:
            count_wait_vehicle += 1
    avg_vehicle_waiting_time_all = total_waiting_time/total_vehicle
    avg_vehicle_waiting_time = total_waiting_time/count_wait_vehicle
    
    return {
        'total_vehicle': total_vehicle,
        'average_vehicle_per_minute': total_vehicle/num_minute,
        
        'total_waiting_time': total_waiting_time,
        'average_vehicle_waiting_time_all': avg_vehicle_waiting_time_all,
        'average_vehicle_waiting_time': avg_vehicle_waiting_time
    }

In [11]:
def get_queue_lenght(flow):
    max_vehicle_stop_length = 0
    for lane in flow['lanes']:
        max_vehicle_stop_length = max(max_vehicle_stop_length, traci.lane.getLastStepHaltingNumber(lane))
        
    return max_vehicle_stop_length

## read traffic condition

In [13]:
tf_id = traci.trafficlight.getIDList()[0]
lanes = traci.trafficlight.getControlledLanes(tf_id)
# remove duplicates
lanes = list(dict.fromkeys(lanes))
# group lanes into same flow
flow_lanes = group_lanes_flow(lanes)
for flow in flow_lanes:
    # create vehicle_list key if not exist yet
    if 'vehicle_list' not in flow_lanes[flow]:
        flow_lanes[flow]['vehicle_list'] = {}

    # identify list of vehicles currently (current step) on the lanes
    new_vehicle_list = get_vehicle_list(flow_lanes[flow])
    # identify list of vehicles that already in the dict
    exisiting_vehicle_list = list(flow_lanes[flow]['vehicle_list'].keys())
    # new vehicle to add
    new_vehicle_to_add = list(set(new_vehicle_list).difference(set(exisiting_vehicle_list)))
    # add new vehicles (that does not exist in the dict yet) into the dict with initial waiting time value of 0
    flow_lanes[flow]['vehicle_list'] = dict(list(flow_lanes[flow]['vehicle_list'].items()) + list(dict.fromkeys(new_vehicle_to_add, 0.0).items())) 
    # calculate waiting time for all vehicle in list
    vehicle_on_simulation = traci.vehicle.getIDList()
    flow_lanes[flow]['vehicle_list'] = calc_vehicle_waiting_time(flow_lanes[flow]['vehicle_list'], vehicle_on_simulation)
    
    

In [55]:
print(traci.trafficlight.getAllProgramLogics(tf_id)[0].phases[0])
print()
print(traci.trafficlight.getAllProgramLogics(tf_id)[0].phases[1])
print()
print(traci.trafficlight.getAllProgramLogics(tf_id)[0].phases[2])
print()
print(traci.trafficlight.getAllProgramLogics(tf_id)[0].phases[3])
print()

Phase(duration=42.0, state='GGGGgrrrrrGGGGgrrrrr', minDur=42.0, maxDur=42.0, next=())

Phase(duration=3.0, state='yyyyyrrrrryyyyyrrrrr', minDur=3.0, maxDur=3.0, next=())

Phase(duration=42.0, state='rrrrrGGGGgrrrrrGGGGg', minDur=42.0, maxDur=42.0, next=())

Phase(duration=3.0, state='rrrrryyyyyrrrrryyyyy', minDur=3.0, maxDur=3.0, next=())



## Run the simulation

In [None]:
# run without value retrieval
step = 0
while step < 100:
    traci.simulationStep()
    lanes = traci.trafficlight.getControlledLanes('J3')
    step += 1
    time.sleep(0.0)

In [56]:
# run this with value retrieval!
tf_id = traci.trafficlight.getIDList()[0]
lanes = traci.trafficlight.getControlledLanes(tf_id)
# remove duplicates
lanes = list(dict.fromkeys(lanes))
# group lanes into same flow
flow_lanes = group_lanes_flow(lanes)
step = 0
max_vehicle_stop_length = 0
# run it for 15 minutes
while step < 901:
    traci.simulationStep()
    lanes = traci.trafficlight.getControlledLanes(tf_id)
    step += 1
    
    for flow in flow_lanes:
        # create vehicle_list key if not exist yet
        if 'vehicle_list' not in flow_lanes[flow]:
            flow_lanes[flow]['vehicle_list'] = {}
        
        max_vehicle_stop_length = max(max_vehicle_stop_length, get_queue_lenght(flow_lanes[flow]))
        
        # identify list of vehicles currently (current step) on the lanes
        new_vehicle_list = get_vehicle_list(flow_lanes[flow])
        # identify list of vehicles that already in the dict
        exisiting_vehicle_list = list(flow_lanes[flow]['vehicle_list'].keys())
        # new vehicle to add
        new_vehicle_to_add = list(set(new_vehicle_list).difference(set(exisiting_vehicle_list)))
        # add new vehicles (that does not exist in the dict yet) into the dict with initial waiting time value of 0
        flow_lanes[flow]['vehicle_list'] = dict(list(flow_lanes[flow]['vehicle_list'].items()) + list(dict.fromkeys(new_vehicle_to_add, 0.0).items())) 
        # calculate waiting time for all vehicle in list
        vehicle_on_simulation = traci.vehicle.getIDList()
        flow_lanes[flow]['vehicle_list'] = calc_vehicle_waiting_time(flow_lanes[flow]['vehicle_list'], vehicle_on_simulation)
    
#     if step % 10 == 0:
#         print(flow_lanes['E0'])
    time.sleep(0)

In [34]:
def calculate_overall_waiting_time(flow_lanes):
    total_vehicle = 0
    total_waiting_time = 0
    for flow_key in list(flow_lanes.keys()):
        vehicle_dict = flow_lanes[flow_key]['vehicle_list']
        for vehicle_key in list(vehicle_dict.keys()):
            total_vehicle += 1
            total_waiting_time += vehicle_dict[vehicle_key]
    
    average_waiting_time = total_waiting_time/total_vehicle
    return average_waiting_time

In [57]:
print(f'Overall traffic average waiting time: {calculate_overall_waiting_time(flow_lanes)}')
print(f'Maximum vehicle waiting length: {max_vehicle_stop_length}')

Overall traffic average waiting time: 11.810596026490066
Maximum vehicle waiting length: 7


In [47]:
for index, flow in enumerate(flow_lanes):
    stats = calc_flow_stats(flow_lanes[flow], 15)
    print(flow)
    print(f'average vehicle per minute: {stats["average_vehicle_per_minute"]:.2f}')
    print(f'average vehicle waiting time: {stats["average_vehicle_waiting_time_all"]:.2f}')
    print()
    if index > 0:
        break

E0
average vehicle per minute: 5.07
average vehicle waiting time: 10.79

E2
average vehicle per minute: 20.07
average vehicle waiting time: 12.11



## List of element in Traci
- gui
- lane
- poi
- simulation
- trafficlight
- vehicletype
- edge
- inductionloop
- junction
- multientryexit
- polygon
- route
- person
- vehicle

In [104]:
traci.trafficlight.getRedYellowGreenState(tf_id)

'rrrrGGGGgrrrrGGGGg'

In [25]:
traci.trafficlight.getAllProgramLogics(tf_id)[0]

Logic(programID='0', type=0, currentPhaseIndex=0, phases=(Phase(duration=42.0, state='GGGGgrrrrrGGGGgrrrrr', minDur=42.0, maxDur=42.0, next=()), Phase(duration=3.0, state='yyyyyrrrrryyyyyrrrrr', minDur=3.0, maxDur=3.0, next=()), Phase(duration=42.0, state='rrrrrGGGGgrrrrrGGGGg', minDur=42.0, maxDur=42.0, next=()), Phase(duration=3.0, state='rrrrryyyyyrrrrryyyyy', minDur=3.0, maxDur=3.0, next=())), subParameter={})