# Define new phases in a roadnet file
#### Suitable for different phase definitions for intersections

In [1]:
import json

In [2]:
def write_phases(all_phases, intersection_type):
        """
        create list to match format of roadnet file
        this implementation enables to set different phases for each intersection
        """
        
        phases_list = []
        default_time = 10 # default time is set to 10 sec
        
        for phase_lanes in all_phases[intersection_type].values():
            
            phases_list.append({"time": default_time,
                                "availableRoadLinks": phase_lanes
                               }
                              )
        
        
        # This phase corresponds to the idle time (all tl are red)
        phases_list.append({"time": default_time,
                                "availableRoadLinks": []
                               }
                              )

        return phases_list

In [3]:
def read_and_write_roadnet(all_phases, dir_roadnet_file, combined_intersections):
        """ Read the roadnet file and retrieve information"""
        
        all_intersections = []

        with open(dir_roadnet_file, "r") as jsonFile:
            data = json.load(jsonFile)
            
        for intersection in data["intersections"]:
            if (not intersection["virtual"]):
                
                all_intersections.append(intersection["id"])
                
                num_phases = len(intersection["roadLinks"])
                
                # change road link indices
                intersection["trafficLight"]["roadLinkIndices"] = list(range(num_phases))
                
                # determine intersection type
                if intersection["id"] in combined_intersections["top_intersections"]:
                    intersection_type = "top_intersection"
                
                elif intersection["id"] in combined_intersections["bottom_intersections"]:
                    intersection_type = "bottom_intersection"
                
                elif intersection["id"] in combined_intersections["left_intersections"]:
                    intersection_type = "left_intersection"
                
                elif intersection["id"] in combined_intersections["special_intersections"]:
                    intersection_type = intersection["id"]
                    
                else:
                    intersection_type = "normal_intersection"

                
                # change phase of roadnet file for "intersection"
                intersection["trafficLight"]["lightphases"] = write_phases(all_phases, intersection_type)
                
                
              
        # Write the modified data back to the file
        with open(dir_roadnet_file, "w") as jsonFile:
            json.dump(data, jsonFile, indent=4)
        
        print("Done")
        return all_intersections

In [4]:
# Predefined phases
normal_intersection = {0: [0, 7, 2, 3, 6, 10],
                       1: [1, 8, 2, 3, 6, 10],
                       2: [4, 11, 2, 3, 6, 10],
                       3: [5, 9, 2, 3, 6, 10],
                       4: [0, 1, 2, 3, 6, 10],
                       5: [7, 8, 2, 3, 6, 10],
                       6: [9, 11, 2, 3, 6, 10],
                       7: [4, 5, 2, 3, 6, 10]
                      }


top_intersection = {0: [0, 3, 2, 5],
                    1: [0, 1, 2, 5],
                    2: [5, 2, 4]
                   }

bottom_intersection = {0: [0, 4, 1, 2],
                       1: [4, 5, 1, 2],
                       2: [3, 1, 2]
                      }


left_intersection = {0: [2, 5, 1, 4],
                     1: [2, 3, 1, 4],
                     2: [0, 1, 4]
                    }

intersection_16_9 = {0: [0, 4, 2, 3, 7], # l - r 
                     1: [1, 5, 2, 3, 7], # l_u, r_d
                     2: [6, 8, 2, 3, 7], # top ->
                     3: [0, 1, 2, 3, 7], # left ->
                     4: [4, 5, 2, 3, 7]  # right ->
                    }

intersection_17_9 = {0: [0, 6, 2, 5, 8], # l - r
                     1: [4, 7, 2, 5, 8], # t_r, b_l
                     2: [0, 1, 2, 5, 8], # left ->
                     3: [3, 4, 2, 5, 8]  # bottom ->
                    }
                         
                         
intersection_16_6 = {0: [0, 4, 1, 2, 7], # l - r
                     1: [3, 6, 1, 2, 7], # t_r, b_l
                     2: [4, 5, 1, 2, 7], # right ->
                     3: [6, 8, 1, 2, 7], # top ->
                    }

intersection_17_6 = {0: [0, 7, 2, 3, 6], # l - r
                     1: [1, 8, 2, 3, 6], # l_u, r_d
                     2: [4, 5, 2, 3, 6], # bottom
                     3: [0, 1, 2, 3, 6], # left
                     4: [7, 8, 2, 3 ,6], # right
                    }


all_phases = {
    "normal_intersection": normal_intersection,
    "top_intersection": top_intersection,
    "bottom_intersection": bottom_intersection,
    "left_intersection": left_intersection,
    "intersection_16_9": intersection_16_9,
    "intersection_17_9": intersection_17_9,
    "intersection_16_6": intersection_16_6,
    "intersection_17_6": intersection_17_6
}


top_intersections = {f"intersection_{i}_9" for i in range(11, 30) if i not in {16, 17}}
bottom_intersections = {f"intersection_{i}_6" for i in range(11, 30) if i not in {16, 17}}
left_intersections = {"intersection_10_7", "intersection_10_8"}
special_intersections = {"intersection_16_9", "intersection_17_9", "intersection_16_6", "intersection_17_6"}

combined_intersections = {
    "top_intersections": top_intersections,
    "bottom_intersections": bottom_intersections,
    "left_intersections": left_intersections,
    "special_intersections": special_intersections
}

all_intersections = read_and_write_roadnet(all_phases, "../Simulation_Results/Manhattan/roadnet.json", combined_intersections)

Done


# Test out all phases in a round-robin way

In [5]:
import cityflow
import numpy as np

In [6]:
# start sim engine
config_path = '../Simulation_Results/Manhattan/config.json'
eng = cityflow.Engine(config_path, thread_num=1)
eng.reset()

In [7]:
# sim for 100 time steps
n_step = 4000

counter = 0
    
for i in range(n_step):
        
    if i % 10 == 0:
        for intersection in all_intersections:
            if intersection in top_intersections:
                active_phase = counter % (len(all_phases["top_intersection"]) + 1)
            
            elif intersection in bottom_intersections:
                active_phase = counter % (len(all_phases["bottom_intersection"]) + 1)
                
            elif intersection in left_intersections:
                active_phase = counter % (len(all_phases["left_intersection"]) + 1)
            
            elif intersection in special_intersections:
                active_phase = counter % (len(all_phases[intersection]) + 1)
                
            else:
                active_phase = counter % (len(all_phases["normal_intersection"]) + 1)
            
            
            eng.set_tl_phase(intersection, active_phase)
        
        counter += 1
        
    eng.next_step()
    