# Loading the metadata and getting information for graph

Loading of the Digraph and understanding the graph structure and it seems there are 50 nodes and adjacency it's known each node connection and wieght of each node

In [24]:
import networkx as nx
import json
def load_graph(filename="graph_with_metadata.json"):
    with open(filename, "r") as f:
        data = json.load(f)

    adjacency_raw = data["adjacency"]
    adjacency = {int(k): [(int(n), w) for n, w in v] for k, v in
adjacency_raw.items()}

    positions = {int(k): tuple(v) for k, v in
data["positions"].items()}
    metadata = data.get("metadata", {})
    end_nodes_loaded = metadata.get("exit_nodes", [])
    exit_nodes = end_nodes_loaded[1:]

    G_loaded = nx.DiGraph()
    for node, neighbors in adjacency.items():
        for nbr, weight in neighbors:
            G_loaded.add_edge(node, nbr, weight=weight)

    print(f"Graph successfully reloaded from '{filename}'")
    print(f"Metadata: {metadata}")

    return G_loaded, adjacency, positions, metadata, exit_nodes

G_loaded, adjacency, positions, metadata, exit_nodes=load_graph(filename="graph_with_metadata.json")

Graph successfully reloaded from 'graph_with_metadata.json'
Metadata: {'n_nodes': 50, 'p': 0.2, 'traffic_min': 0.8, 'traffic_max': 1.2, 'directed_ratio': 0.2, 'description': 'Synthetic city map with directed and undirected roads', 'exit_nodes': [0, 48]}


# Dijkstra Implementation

implementation of dijkstra algorith which will be used by both thieves and the police to the optimal path that they desire

In [25]:
from queue import PriorityQueue
import copy

def dijkstra(graph, source, exitNode):
    dist = [float('inf')] * len(graph)
    dist[source] = 0
    parent = [-1] * len(graph)
    pq = PriorityQueue()
    pq.put((0, source))

    while not pq.empty():
        dist1, node = pq.get()
        if dist1 > dist[node]:
            continue

        edges = graph[node]
        for neighbour_node, dist_edge in edges:
            if dist[neighbour_node] > dist1 + dist_edge:
                dist[neighbour_node] = dist1 + dist_edge
                parent[neighbour_node] = node
                pq.put((dist[neighbour_node], neighbour_node))

    path = []
    current = exitNode
    while current != -1:
        path.append(current)
        current = parent[current]

    path.reverse()
    return path


# Randomness of the Graph

this section of code helps in managing the randomness of the graph by changing the graph weights depending on the situation or type of event and manages this events by making them to stop and change back to original after their duration is done.

In [26]:
import copy
import random

original=copy.deepcopy(adjacency)
def get_random_edge(graph):
    all_edges = []
    for u in graph:
        for v, w in graph[u]:
            all_edges.append((u, v))

    if not all_edges:
        return None, None, None

    u, v = random.choice(all_edges)
    weight = next(w for neighbor, w in graph[u] if neighbor == v)

    return u, v, weight

def manage_active_events(current_graph_state, step):
    global active_events, original

    graph = current_graph_state
    graph_was_changed = False
    events_to_keep = []

    def revert_edge(n1, n2, weight):
        if n1 in graph:
            graph[n1] = [(i, weight) if i == n2 else (i, w) for i, w in graph[n1]]

    for event in active_events:
        u, v = event['edge']
        event_type = event['type']
        if step >= event['step_end']:
            revert_edge(u, v, event['original_weight'])
            graph_was_changed = True
        else:
            if event_type == 'traffic_jam':
                new_w = event['original_weight'] * event['multiplier']
                revert_edge(u, v, new_w)

            elif event_type in ['blockage', 'one_way_block']:
                revert_edge(u, v, float('inf'))

            events_to_keep.append(event)

    active_events = events_to_keep
    return graph, graph_was_changed


def dynamic_nature_simulation(current_graph_state, step):
    global active_events, original

    graph = current_graph_state
    graph_changed = False

    events = ['traffic_jam', 'blockage', 'one_way']
    PROBABILITY = 0.3
    MIN_DURATION = 5
    MAX_DURATION = 15

    if random.random() < PROBABILITY:
        u, v, original_weight = get_random_edge(graph)

        if u is not None and u != v and original_weight < float('inf'):
            if any(e['edge'] == (u, v) for e in active_events if e['type'] != 'one_way'):
                return graph, graph_changed

            event_type = random.choice(events)

            def apply_edge_change(n1, n2, new_w):
                graph[n1] = [(i, new_w) if i == n2 else (i, w) for i, w in graph[n1]]

            new_event_data = {
                'edge': (u, v),
                'type': event_type,
                'step_start': step,
                'step_end': step,
                'original_weight': original_weight
            }

            if event_type == 'traffic_jam':
                multiplier = random.uniform(2.0, 5.0)
                new_weight = original_weight * multiplier
                apply_edge_change(u, v, new_weight)
                new_event_data['step_end']+=3
                new_event_data['multiplier'] = multiplier

            elif event_type == 'blockage':
                apply_edge_change(u, v, float('inf'))
                new_event_data['step_end']+=4

            elif event_type == 'one_way':
                if v in graph:
                    original_reverse_weight = next((w for i, w in original[v] if i == u), float('inf'))
                    apply_edge_change(v, u, float('inf'))

                    active_events.append({
                        'edge': (v, u),
                        'type': 'one_way_block',
                        'step_start': step,
                        'step_end': step + 6,
                        'original_weight': original_reverse_weight
                    })
                    graph_changed = True
                    return graph, graph_changed
            active_events.append(new_event_data)
            graph_changed = True

    return graph, graph_changed

# Simulation Part

In the below code snippet the simulation part of ploice and theieves are done and their events have been added into the history array with each step's associated moments have be appended to give an understanding whats happening

This history is used to later animate and save the log as simulation.json

In [27]:
step = 1
carApos = 0
carBpos = 49
carApath = []
carBpath = []
carAedgefrom = 0
carBedgefrom = 49
carAedgeto = 0
carBedgeto = 0
carAprogress = 0
carBprogress = 0
distancecarA = 0
distancecarB = 0
reached = False
caught = False
history = []
active_events = []
graph_changed = False
carApath = dijkstra(adjacency, carApos, 48)
if len(carApath) >= 2:
    carAedgefrom = carApath[0]
    carAedgeto = carApath[1]

while not (caught or reached):
    if(step > 51):
        break
    if carApos == carBpos:
        print('Police have won!')
        caught = True
        break

    if carApos == 48:
        print('Thieves have escaped!')
        reached = True
        break
    new_graph = copy.deepcopy(adjacency)
    new_graph, reversed_now = manage_active_events(new_graph, step)
    new_graph, created_now = dynamic_nature_simulation(new_graph, step)

    if graph_changed:
        if carAprogress == 0:
            carApath = dijkstra(new_graph, carApos, 48)
            if len(carApath) >= 2:
                carAedgefrom = carApath[0]
                carAedgeto = carApath[1]
                carAprogress = 0
                distancecarA = next((f for i, f in new_graph[carAedgefrom] if i == carAedgeto), 0)

        if step > 3:
            carBtarget = carAedgeto if carAprogress > 0 else carApos
            carBpath = dijkstra(new_graph, carBpos, carBtarget)
            if len(carBpath) >= 2:
                carBedgefrom = carBpath[0]
                carBedgeto = carBpath[1]
                carBprogress = 0
                distancecarB = next((k for j, k in new_graph[carBedgefrom] if j == carBedgeto), 0)

        graph_changed = False

    if distancecarA == 0 and carAedgefrom != carAedgeto:
        distancecarA = next((f for i, f in new_graph[carAedgefrom] if i == carAedgeto), 0)

    if carAprogress < distancecarA:
        carAprogress += 1
    else:
        carApos = carAedgeto
        carAprogress = 0
        try:
            current_index = carApath.index(carApos)
            if current_index + 1 < len(carApath):
                carAedgefrom = carApath[current_index]
                carAedgeto = carApath[current_index + 1]
                distancecarA = next((f for i, f in new_graph[carAedgefrom] if i == carAedgeto), 0)
            else:
                if carApos != 48:
                    carAedgefrom = carApos
                    carAedgeto = carApos
                    distancecarA = float('inf')
        except ValueError:
            pass

    if step > 3:
        if carBprogress == 0 or not carBpath or len(carBpath) < 2:
            carBtarget = carAedgeto if carAprogress > 0 else carApos
            carBpath = dijkstra(new_graph, carBpos, carBtarget)
            if len(carBpath) >= 2:
                carBedgefrom = carBpath[0]
                carBedgeto = carBpath[1]
                distancecarB = next((k for j, k in new_graph[carBedgefrom] if j == carBedgeto), 0)
            else:
                carBedgefrom = carBpos
                carBedgeto = carBpos
                distancecarB = float('inf')

        if distancecarB == 0 and carBedgefrom != carBedgeto:
            distancecarB = next((k for j, k in new_graph[carBedgefrom] if j == carBedgeto), 0)

        if carBprogress < distancecarB:
            carBprogress += 1.5
        else:
            carBpos = carBedgeto
            carBprogress = 0

    if carApos == carBpos:
        print("Police have won!")
        caught = True

    elif carAedgefrom == carBedgefrom and carAedgeto == carBedgeto and abs(carAprogress - carBprogress) < 0.5:
        print("Police caught the thieves mid-edge!")
        caught = True

    if not caught and not reached:
        history.append({
            'step': step,
            'carAstate': {
                'carApos': carApos, 'edgefrom': carAedgefrom, 'edgeto': carAedgeto,
                'progress': carAprogress, 'path': carApath
            },
            'carBstate': {
                'carBpos': carBpos, 'edgefrom': carBedgefrom, 'edgeto': carBedgeto,
                'progress': carBprogress, 'path': carBpath if step > 3 else []
            },
            'reached': reached, 'caught': caught, 'events': active_events
        })
        step += 1
    elif caught or reached:
        history.append({
            'step': step,
            'carAstate': {
                'carApos': carApos, 'edgefrom': carAedgefrom, 'edgeto': carAedgeto,
                'progress': carAprogress, 'path': carApath
            },
            'carBstate': {
                'carBpos': carBpos, 'edgefrom': carBedgefrom, 'edgeto': carBedgeto,
                'progress': carBprogress, 'path': carBpath if step > 3 else []
            },
            'reached': reached, 'caught': caught, 'events': active_events
        })
        break



print("Simulation completed.")
print(f"Steps executed: {step}")

Police have won!
Simulation completed.
Steps executed: 17


Converting the history into json log as simulation.json

In [28]:
import json

filename = "simulation.json"

with open(filename, "w") as f:
    json.dump(history, f, indent=4)

print(f"Data successfully saved to {filename}")


Data successfully saved to simulation.json


# **Visualization of Graph**

The below is code snippet of animation of police and thieves and we can clearly see police have caught the thieves at node 48.

This animation shows all about the simulation everythings whats happening hows it happening and everything

In [29]:
from matplotlib.animation import FuncAnimation
import matplotlib.pyplot as plt
from IPython.display import HTML
import networkx as nx

def animate_pursuit(G, positions, total_log, exit_nodes):
    fig, ax = plt.subplots(figsize=(14, 10))

    def update(frame_idx):
        ax.clear()
        curr_log = total_log[frame_idx]

        carA = curr_log['carAstate']
        carB = curr_log['carBstate']

        carA_pos = carA['carApos']
        carB_pos = carB['carBpos']
        carA_edge = (carA['edgefrom'], carA['edgeto'])
        carB_edge = (carB['edgefrom'], carB['edgeto'])
        carA_path = carA.get('path', [])
        events = curr_log.get('events', [])

        nx.draw_networkx_edges(G, positions, edge_color='gray', arrows=True, arrowsize=15,
                               width=1.5, alpha=0.6, ax=ax)

        for event in events:
            u, v = event.get('edge', (None, None))
            if u not in G.nodes or v not in G.nodes:
                continue
            etype = event.get('type', '')
            color = {'blockage': 'red', 'traffic': 'orange', 'one way': 'purple'}.get(etype, None)
            if color:
                nx.draw_networkx_edges(G, positions, edgelist=[(u, v)],
                                       edge_color=color, width=4,
                                       style='dashed' if etype == 'blockage' else 'solid',
                                       arrows=True, ax=ax)

        if len(carA_path) > 1:
            path_edges = [(carA_path[i], carA_path[i+1]) for i in range(len(carA_path)-1)
                          if carA_path[i] in G.nodes and carA_path[i+1] in G.nodes]
            nx.draw_networkx_edges(G, positions, edgelist=path_edges,
                                   edge_color='blue', width=2, alpha=0.3, arrows=True, ax=ax)

        if all(n in G.nodes for n in carA_edge):
            nx.draw_networkx_edges(G, positions, edgelist=[carA_edge],
                                   edge_color='blue', width=3, arrows=True, ax=ax)

        if all(n in G.nodes for n in carB_edge):
            nx.draw_networkx_edges(G, positions, edgelist=[carB_edge],
                                   edge_color='darkred', width=3, arrows=True, ax=ax)

        regular_nodes = [n for n in G.nodes() if n not in exit_nodes and n not in (carA_pos, carB_pos)]
        nx.draw_networkx_nodes(G, positions, nodelist=regular_nodes,
                               node_color='lightgray', node_size=300, ax=ax)
        nx.draw_networkx_nodes(G, positions, nodelist=exit_nodes,
                               node_color='lightgreen', node_size=400, ax=ax)

        if carA_pos in G.nodes:
            nx.draw_networkx_nodes(G, positions, nodelist=[carA_pos],
                                   node_color='blue', node_size=600, node_shape='s', ax=ax)
        if carB_pos in G.nodes:
            nx.draw_networkx_nodes(G, positions, nodelist=[carB_pos],
                                   node_color='red', node_size=600, node_shape='^', ax=ax)
        nx.draw_networkx_labels(G, positions, font_size=8, ax=ax)
        status = "CAUGHT!" if curr_log['caught'] else ("ESCAPED!" if curr_log['reached'] else "IN PURSUIT")
        ax.set_title(f"Step {curr_log['step']}: {status}", fontsize=14, fontweight='bold')
        ax.axis('off')

    anim = FuncAnimation(fig, update, frames=len(total_log), interval=500, repeat=True)
    plt.close()
    return anim

anim = animate_pursuit(G_loaded, positions, history, exit_nodes)
HTML(anim.to_html5_video())
