# Cruise Control GPU (Live Mirror) and Intruder allocation


In [10]:
import subprocess

def has_nvidia_smi():
  try:
    subprocess.check_output(["bash", "-lc", "command -v nvidia-smi"])
    return True
  except subprocess.CalledProcessError:
    return False

GPU_AVAILABLE = has_nvidia_smi()
print("GPU available:", GPU_AVAILABLE)

if GPU_AVAILABLE:
  !nvidia-smi
  !pip -q install cupy-cuda12x
else:
  print("No GPU detected. (Colab: Runtime -> Change runtime type -> GPU)")
  print("Will run in CPU mode (NumPy fallback).")

GPU available: True
Mon Feb  2 23:35:17 2026       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   67C    P0             31W /   70W |     140MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                            

In [11]:
import torch, time, requests

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Device:', device)
if device.type == 'cuda':
    print('GPU:', torch.cuda.get_device_name(0))

# Match C constants
SIM_DT = 0.1
TARGET_GAP = 10.0
Kp = 0.35
Kd = 0.70


Device: cuda
GPU: Tesla T4


In [12]:
def compute_block(sample, device):
    # Required fields from latest.json
    row = [[
        float(sample['my_x']), float(sample['my_y']),
        float(sample['leader_x']), float(sample['leader_y']),
        float(sample['current_speed']), float(sample['leader_speed']), float(sample['leader_speed'])
    ]]

    d = torch.tensor(row, dtype=torch.float32, device=device)
    my_x, my_y, front_x, front_y, cur_spd, front_spd, leader_spd = d.T

    gap = torch.sqrt((my_x - front_x)**2 + (my_y - front_y)**2)
    projected_error = (gap - TARGET_GAP) - (front_spd * SIM_DT)

    base = leader_spd
    damping = (front_spd - cur_spd) * Kd
    correction = projected_error * Kp

    new_speed = base + damping + correction
    new_speed = torch.maximum(new_speed, torch.zeros_like(base))
    new_speed = torch.minimum(new_speed, base + 25.0)

    return float(gap.item()), float(new_speed.item())


In [13]:
# Rajdeep
# Intruder GPU model (1000x1000 grid)
GRID_SIZE = 1000
GRID_OFFSET = GRID_SIZE // 2
INTRUDER_RADIUS = 4
INTRUDER_PROB = 0.05
INTRUDER_SPEED = 40
INTRUDER_LENGTH = 10
INTRUDER_DURATION_MS = 5000

intruder_grid = torch.zeros((GRID_SIZE, GRID_SIZE), device=device)

def _grid_idx(v):
    return max(0, min(GRID_SIZE - 1, int(round(v)) + GRID_OFFSET))

def gpu_intruder_scan(sample):
    intruder_grid.zero_()
    fx = sample['my_x']
    fy = sample['my_y']
    fx_i = _grid_idx(fx)
    fy_i = _grid_idx(fy)

    # Spawn near truck with small probability
    if torch.rand((), device=device).item() > INTRUDER_PROB:
        return False

    ox = int(torch.randint(-INTRUDER_RADIUS, INTRUDER_RADIUS + 1, (1,), device=device).item())
    oy = int(torch.randint(-INTRUDER_RADIUS, INTRUDER_RADIUS + 1, (1,), device=device).item())
    ix = max(0, min(GRID_SIZE - 1, fx_i + ox))
    iy = max(0, min(GRID_SIZE - 1, fy_i + oy))

    intruder_grid[ix, iy] = 1.0

    x0 = max(0, fx_i - INTRUDER_RADIUS)
    x1 = min(GRID_SIZE, fx_i + INTRUDER_RADIUS + 1)
    y0 = max(0, fy_i - INTRUDER_RADIUS)
    y1 = min(GRID_SIZE, fy_i + INTRUDER_RADIUS + 1)

    hit = intruder_grid[x0:x1, y0:y1].sum() > 0
    return bool(hit)


## Live URL
Paste your tunnel URL here (cloudflared).
Example: `https://abc.trycloudflare.com/latest.json`


In [17]:
LIVE_URL = "https://overhead-coding-cameron-nova.trycloudflare.com/latest.json"
#need to change during demonstration


In [15]:
INTRUDER_POST_URL = LIVE_URL.rsplit('/', 1)[0] + '/intruder'


In [16]:
# Rajdeep
seq = 0
null_shown = {}
for _ in range(2000):
    try:
        r = requests.get(LIVE_URL, timeout=5)
        if r.status_code != 200 or not r.text.strip():
            print('Waiting for live JSON...')
            time.sleep(0.5)
            continue
        s = r.json()
    except Exception as e:
        print('Waiting for live JSON...', e)
        time.sleep(0.5)
        continue

    samples = []
    if isinstance(s, dict) and 'followers' in s:
        samples = list(s.get('followers', {}).values())
    else:
        samples = [s]

    if samples and all(sample.get('disconnected') for sample in samples):
        print('All followers disconnected. Stopping GPU loop.')
        for sample in samples:
            fid = int(sample.get('follower_id', 0))
            seq += 1
            try:
                requests.post(INTRUDER_POST_URL, json={
                    'seq': seq,
                    'target_id': fid,
                    'active': False,
                    'speed': 0,
                    'length': 0,
                    'duration_ms': 0,
                }, timeout=3)
            except Exception:
                pass
        break

    for sample in samples:
        fid = int(sample.get('follower_id', 0))

        if sample.get('disconnected'):
            if not null_shown.get(fid):
                print(f'NULL (leader disconnected) fid={fid}')
                null_shown[fid] = True
            seq += 1
            try:
                requests.post(INTRUDER_POST_URL, json={
                    'seq': seq,
                    'target_id': fid,
                    'active': False,
                    'speed': 0,
                    'length': 0,
                    'duration_ms': 0,
                }, timeout=3)
            except Exception:
                pass
            continue

        null_shown[fid] = False
        out = compute_block(sample, device=device)
        if out is None:
            print('Skipping sample: compute_block returned None', sample)
            continue
        gap, spd = out

        active = gpu_intruder_scan(sample)
        seq += 1
        try:
            requests.post(INTRUDER_POST_URL, json={
                'seq': seq,
                'target_id': fid,
                'active': bool(active),
                'speed': INTRUDER_SPEED if active else 0,
                'length': INTRUDER_LENGTH if active else 0,
                'duration_ms': INTRUDER_DURATION_MS if active else 0,
            }, timeout=3)
        except Exception:
            pass

        print(f"fid={fid} | ts={sample['ts']:.2f} | pos=({sample['my_x']:.1f},{sample['my_y']:.1f}) | speed={sample['current_speed']:.3f} | gap={sample['gap']:.3f} | new_speed={spd:.3f}")

    time.sleep(0.5)


Waiting for live JSON...
Waiting for live JSON...
Waiting for live JSON...
Waiting for live JSON...
Waiting for live JSON...
Waiting for live JSON...
Waiting for live JSON...
Waiting for live JSON...
Waiting for live JSON...
Waiting for live JSON...
Waiting for live JSON...
Waiting for live JSON...
Waiting for live JSON...
Waiting for live JSON...


KeyboardInterrupt: 