<a href="https://colab.research.google.com/github/yuvarbiv/DQN/blob/main/recreating_Baroch's_article.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ALL FUNCTIONS

In [None]:
import random, math
import numpy as np
import matplotlib.pyplot as plt
import types
from matplotlib import animation
from IPython.display import HTML
from types import SimpleNamespace

# ----- model hyper-parameters -----
seed = 42
random.seed(seed)
np.random.seed(seed)

# threshold = 0.9
# alpha = 0.5
# lamda = 10
# P_ta = 1; NP_ta = 1 - P_ta
# nof_iterations = 80
# GSize = 80
# nof_agents = 2
# nof_targets = 3
# nof_false_targets = 3

const_HP = SimpleNamespace()
const_HP.P_ta = 1
const_HP.NP_ta = 1-const_HP.P_ta
const_HP.nof_iterations = 80
const_HP.nof_agents = 2
const_HP.nof_targets = 3
const_HP.nof_false_targets = 3
const_HP.GSize = 80

var_HP = SimpleNamespace()
var_HP.threshold = 0.9
var_HP.alpha = 0.5
var_HP.lamda = 10

# ----- initialize datasets -----
datasets = SimpleNamespace()
datasets.grid = np.full((const_HP.GSize, const_HP.GSize), const_HP.nof_targets/(const_HP.GSize**2), dtype=float)
datasets.agents_loc = random.sample([(i,j) for i in range(const_HP.GSize) for j in range(const_HP.GSize)], const_HP.nof_agents)
datasets.targets_loc = random.sample([(i,j) for i in range(const_HP.GSize) for j in range(const_HP.GSize)], const_HP.nof_targets)
datasets.available_locs = [(i,j) for i in range(const_HP.GSize) for j in range(const_HP.GSize) if (i,j) not in datasets.targets_loc]
datasets.false_targets_loc = random.sample(datasets.available_locs, const_HP.nof_false_targets)
print(datasets.grid)
print(datasets.agents_loc)
print(datasets.targets_loc)
print(datasets.false_targets_loc)

# ----- functions: -----
def cell_send_pulse(const_HP, var_HP, datasets, i, j):
  # return: 1==pulse, 0==no_pulse
  # later I'd like this function to return continuous value [0,1]
  if (i,j) in datasets.targets_loc:        return random.random() < const_HP.P_ta
  if (i,j) in datasets.false_targets_loc:  return random.random() < var_HP.alpha*const_HP.P_ta
  return False

# --------------------------------------------
def agent_recieved_signal(const_HP, var_HP, datasets, d):
  # return: 1==pulse, 0==no_pulse
  # TODO: later I'd like this function to return continuous value [0,1]
  x = math.e**(-d/var_HP.lamda)
  return random.random() < x

# --------------------------------------------
def cell_update_probabillity(const_HP, var_HP, datasets, frame, prev_grid_val, d, recieved):
  P_recieved__t0 = prev_grid_val
  NP_recieved__t0 = 1-P_recieved__t0
  # recieved
  numerator =             P_recieved__t0 * const_HP.P_ta
  denumerator = numerator + NP_recieved__t0 * const_HP.P_ta * var_HP.alpha
  P_signal_recieved__t1 = numerator / denumerator if denumerator != 0 else 0 # Avoid division by zero
  # UNrecieved
  signal_strength = 1-math.e**(-d/var_HP.lamda)
  numerator =           P_recieved__t0 * (const_HP.NP_ta + const_HP.P_ta*signal_strength)
  denumerator = numerator + NP_recieved__t0 * ((1-var_HP.alpha*const_HP.P_ta)+var_HP.alpha*const_HP.P_ta*signal_strength)
  P_signal_UNrecieved__t1 = numerator / denumerator if denumerator != 0 else 0 # Avoid division by zero

  if recieved:
    return P_signal_recieved__t1 # Using the calculated received probability
  else:
    return P_signal_UNrecieved__t1 # Using the calculated unreceived probability

# --------------------------------------------
def print_targets_found(datasets):
  pairs = [f"({i}),({j})" for i,j, _ in datasets.target_set]
  steps = [steps for _, _, steps in datasets.target_set]
  return pairs,steps

# --------------------------------------------
def update_cell(const_HP, var_HP, datasets, i, j, frame):
  new_val = np.mean(datasets.cell_by_agent)
  datasets.grid[i][j] = new_val

# --------------------------------------------
def update_target_set(const_HP, var_HP, datasets, i, j, frame, new_grid_val):
  if new_grid_val >= var_HP.threshold and (i,j) not in [(t[0], t[1]) for t in datasets.target_set]:
    datasets.target_set.add((i,j,frame))
    datasets.detect_img[i, j] = frame  # reflect into detection image
    pairs,frame = print_targets_found(datasets)


#1 RUN + SIMULATION

In [None]:

# ----- your model bits (edit as needed) -----
seed = 41
random.seed(seed)
np.random.seed(seed)

alpha = 0.5
P_ta = 1; NP_ta = 1 - P_ta
nof_iterations = 40
GSize = 50
lamda = GSize*2
nof_agents = 3
nof_targets = 3
nof_false_targets = 3
threshold = 0.9
size_of_markers = 20

# ----- initialize datasets -----
initial_grid_value = nof_targets/(GSize**2)
grid = np.full((GSize, GSize), initial_grid_value, dtype=float)
agents_loc = random.sample([(i,j) for i in range(GSize) for j in range(GSize)], nof_agents)
targets_loc = random.sample([(i,j) for i in range(GSize) for j in range(GSize)], nof_targets)
available_locs = [(i,j) for i in range(GSize) for j in range(GSize) if (i,j) not in targets_loc]
false_targets_loc = random.sample(available_locs, nof_false_targets)
print(grid)
print(agents_loc)
print(targets_loc)
print(false_targets_loc)

# ----- Simulation Initialization -----
# A 2D image to visualize detections (instead of trying imshow on a set)
detect_img = np.zeros_like(grid, dtype=float)
target_set = set()

# ---- single figure, two axes ----
fig, (Gax, Tax, Bax) = plt.subplots(1, 3, figsize=(20, 8))
Gim = Gax.imshow(grid, cmap='viridis', vmin=0, vmax=1)
Gax.set_title('Probabilities'); fig.colorbar(Gim, ax=Gax)
Tim = Tax.imshow(detect_img, cmap='viridis', vmin=0, vmax=nof_iterations)
Tax.set_title('Locations of Participants'); fig.colorbar(Tim, ax=Tax)
# bar chart axis setup
Bax.set_title('Detections by cell')
Bax.set_ylabel('Frame of first detection')
Bax.set_ylim(0, nof_iterations)
plt.tight_layout()

# draw static markers once
ax = [c[1] for c in agents_loc]; ay = [c[0] for c in agents_loc]
# Gax.scatter(ax, ay, marker='o', s=size_of_markers, color='black', label='Agent')
Tax.scatter(ax, ay, marker='o', s=size_of_markers, color='black', label='Agent')
tx = [c[1] for c in targets_loc]; ty = [c[0] for c in targets_loc]
# Gax.scatter(tx, ty, marker='X', s=size_of_markers, color='white', label='Targets')
Tax.scatter(tx, ty, marker='X', s=size_of_markers, color='white', label='Targets')
fx = [c[1] for c in false_targets_loc]; fy = [c[0] for c in false_targets_loc]
# Gax.scatter(fx, fy, marker='X', s=size_of_markers, color='red', label='False Targets')
Tax.scatter(fx, fy, marker='X', s=size_of_markers, color='red', label='False Targets')
Gax.legend(loc="upper center", bbox_to_anchor=(0.5, -0.1), ncol=3)
# List to hold text objects for detections
texts = []

# ----- main loop -----
def update(frame):
    global texts
    for text in texts:
        text.remove()
    texts = []

    # step model once (your existing loop) ...
    for i in range(GSize):
        for j in range(GSize):
            cell_by_agent = []
            for a in agents_loc:
                d = np.hypot(a[0]-i, a[1]-j)
                s = cell_send_pulse(targets_loc, false_targets_loc, alpha, i, j)
                r = agent_recieved_signal(d, lamda) if s else 0
                cell_by_agent.append(cell_update_probabillity(grid[i][j], frame, d, r))
            update_cell(cell_by_agent, frame, grid, i, j)
            update_target_set(target_set, detect_img, i, j, threshold, grid[i][j], frame)

    # update heatmaps
    Gim.set_data(grid)
    Tim.set_data(detect_img)

    # annotate detections on Tax
    ii, jj = np.where(detect_img > 0)
    for y, x in zip(ii, jj):
        t = Tax.text(x, y, int(detect_img[y, x]), ha="center", va="center", color="black", fontsize=7)
        texts.append(t)

    # --- update bar "table" ---
    Bax.cla()
    Bax.set_title('Detections by cell')
    Bax.set_ylabel('Frame of first detection')
    Bax.set_ylim(0, nof_iterations)

    if ii.size:
        frames = detect_img[ii, jj]
        labels = [f"({i},{j})" for i, j in zip(ii, jj)]
        order = np.argsort(frames)  # sort by frame (earliest first)
        frames = frames[order]
        labels = [labels[k] for k in order]

        x = np.arange(len(frames))
        Bax.bar(x, frames)
        Bax.set_xticks(x)
        Bax.set_xticklabels(labels, rotation=90, fontsize=7)

    return [Gim, Tim, *texts]  # blit=False, so returning artists is optional

# ----- run -----
anim = animation.FuncAnimation(fig, update, frames=nof_iterations, interval=200, blit=False)
HTML(anim.to_jshtml())

# Multiple Runs, no simulation
>
this version gets the constant hyper-parameters(HP):
1. nof_runs (number of runs)
2. max_frames
3. gridsize
4. nof_agents
5. nof_targets
6. nof_false_targets

and the varying-HP:
1. alpha
2. lambda
3. threshold
>

It then iterates over the varying-HP and tries to find the {max-alpha, min-lambda, max-threshold} combination that would still find all targets withough finding false-targets.

In [None]:
import pandas as pd

# List to store results from each run
datasets.results = []

const_HP.nof_runs = 20
const_HP.max_frames = 500
const_HP.GSize = 80
# Randomize hyperparameters (example ranges, adjust as needed)
var_HP.alpha = 0.50 #0.53 #random.uniform(0.1, 0.95)
var_HP.lamda = 14.52 #random.uniform(5, 20)
var_HP.threshold = 0.78 #random.uniform(0.7, 0.95)
all_targets_detected = False
reached_undetection = 3

for run in range(const_HP.nof_runs):
    print(f"Starting run {run + 1}/{const_HP.nof_runs} with lamda {var_HP.lamda}")

    # Reset variables for each run
    datasets.grid = np.full((const_HP.GSize, const_HP.GSize), const_HP.nof_targets/(const_HP.GSize**2), dtype=float)
    # Re-randomize agent, target, and false target locations for each run
    datasets.agents_loc = random.sample([(i,j) for i in range(const_HP.GSize) for j in range(const_HP.GSize)], const_HP.nof_agents)
    datasets.targets_loc = random.sample([(i,j) for i in range(const_HP.GSize) for j in range(const_HP.GSize)], const_HP.nof_targets)
    datasets.available_locs = [(i,j) for i in range(const_HP.GSize) for j in range(const_HP.GSize) if (i,j) not in datasets.targets_loc]
    datasets.false_targets_loc = random.sample(datasets.available_locs, const_HP.nof_false_targets)

    datasets.detect_img = np.zeros_like(datasets.grid, dtype=float)
    datasets.target_set = set()
    frames_taken = 0
    false_target_detected = False # Flag to indicate if a false target was detected

    # --- Simulation loop for a single run ---
    for frame in range(const_HP.max_frames):
        # Step model once (copying logic from the previous update function)
        for i in range(const_HP.GSize):
            for j in range(const_HP.GSize):
                datasets.cell_by_agent = []
                for a in datasets.agents_loc:
                    d = np.hypot(a[0]-i, a[1]-j)
                    s = cell_send_pulse(const_HP, var_HP, datasets, i, j) # Use current_alpha
                    r = agent_recieved_signal(const_HP, var_HP, datasets, d) if s else 0 # Use current_lamda
                    datasets.cell_by_agent.append(cell_update_probabillity(const_HP, var_HP, datasets, frame, datasets.grid[i][j], d, r))
                update_cell(const_HP, var_HP, datasets, i, j, frame)
                update_target_set(const_HP, var_HP, datasets, i, j, frame, datasets.grid[i][j]) # Use current_threshold

        frames_taken = frame + 1

        # Check stopping condition
        datasets.detected_target_locations = [(t[0], t[1]) for t in datasets.target_set]
        if all(target in datasets.detected_target_locations for target in datasets.targets_loc):
            print(f"All targets detected in run {run + 1} at frame {frames_taken}")
            break

        # Check if any detected location is a false target
        if any(datasets.detected_loc in datasets.false_targets_loc for datasets.detected_loc in datasets.detected_target_locations):
            false_target_detected = True
            print(f"False Target detected in run {run + 1} at frame {frames_taken}")
            break

    # --- Collect results for the run ---
    last_detected_target_frame = -1
    last_detected_target_loc = None
    if datasets.target_set:
        # Find the target that was discovered last
        # Filter out false targets before finding the last detected true target
        datasets.true_targets_detected = [t for t in datasets.target_set if (t[0], t[1]) in datasets.targets_loc]
        if datasets.true_targets_detected:
          last_detected_target_info = max(datasets.true_targets_detected, key=lambda item: item[2])
          last_detected_target_loc = (last_detected_target_info[0], last_detected_target_info[1])
          last_detected_target_frame = last_detected_target_info[2]


    avg_agent_distance_to_last_target = -1
    if last_detected_target_loc:
        distances = [np.hypot(a[0] - last_detected_target_loc[0], a[1] - last_detected_target_loc[1]) for a in datasets.agents_loc]
        avg_agent_distance_to_last_target = np.mean(distances)

    all_targets_detected = all(target in datasets.detected_target_locations for target in datasets.targets_loc)
    if not all_targets_detected : reached_undetection = 1
    print(f"reached_undetection: {reached_undetection}")
    print(f"all_targets_detected: {all_targets_detected}")

    datasets.results.append({
        'run': run + 1,
        'nof_agents': const_HP.nof_agents,
        'nof_targets': const_HP.nof_targets,
        'alpha': var_HP.alpha,
        'lamda': var_HP.lamda,
        'threshold': var_HP.threshold,
        'frames_to_detect_all': frames_taken if all_targets_detected else -1, # -1 if not all detected
        'avg_agent_distance_to_last_target': avg_agent_distance_to_last_target,
        'false_target_detected': false_target_detected # Add the false target detection flag
    })

    for i in range(reached_undetection) :
      choose_hyper = 1#random.choice([0,1,2])
      match choose_hyper:
        case 0:   var_HP.alpha += 0.01 if all_targets_detected else -0.01
        case 1:   var_HP.lamda += -0.1 if all_targets_detected else 0.1
        case 2:   var_HP.threshold += 0.01 if all_targets_detected else -0.01
        case _:   print('ERROR')

# Convert results to a pandas DataFrame
results_df = pd.DataFrame(datasets.results)

# Display the results
display(results_df)