Refrerences:
- https://github.com/commaai/openpilot/tree/3766830f6588df6a1fee3e0295c689f9de364476
- https://github.com/MTammvee/openpilot-supercombo-model/tree/main
- https://github.com/noobmasterbala/Adversarial-Attack-and-Defence-On-Openpilot

1. Activate virtual environment (.\venv\Scripts\activate)
2. `pip3 install -r requirements.txt`

In [None]:
# Install pip packages
%pip install -r requirements.txt

In [None]:
import sys
import os
import re
import json

import onnx
import onnxruntime
from onnx2torch import convert
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import numpy as np
import pandas as pd

import cv2
from matplotlib import pyplot as plt

# Load Supercombo ONNX model
model_name = "supercombo_0.8.3.onnx"
onnx_model = onnx.load(model_name)

# Conver to PyTorch model
torch_model = convert(onnx_model)
if torch.cuda.is_available():
    torch_model.cuda()
#torch_model.half() # Make the model use float16 dtype
torch_model.eval()
#print(torch_model)

# Session ONNX
session = onnxruntime.InferenceSession(model_name, providers=['CPUExecutionProvider'])

def rgb_to_yuv(rgb_tensor):
    # Ensure tensor is in (N, C, H, W) format
    assert rgb_tensor.dim() == 4 and rgb_tensor.size(1) == 3, "Input tensor must be in (N, C, H, W) format with 3 channels"

    # Convert RGB to YUV
    R = rgb_tensor[:, 0, :, :]
    G = rgb_tensor[:, 1, :, :]
    B = rgb_tensor[:, 2, :, :]

    Y = 0.299 * R + 0.587 * G + 0.114 * B
    U = -0.14713 * R - 0.28886 * G + 0.436 * B
    V = 0.614 * R - 0.51498 * G - 0.10001 * B

    yuv_tensor = torch.stack([Y, U, V], dim=1)
    return yuv_tensor

def parse_image(frame):
    # Ensure frame is a tensor of shape (1, 3, H, W)
    assert frame.dim() == 4 and frame.size(1) == 3, "Input tensor must be of shape (1, 3, H, W)"
    
    H = frame.size(2)
    W = frame.size(3)
    
    # Initialize the parsed tensor with shape (6, H//2, W//2)
    parsed = torch.zeros((6, H//2, W//2), dtype=torch.uint8)
    
    # Extract the channels from the input tensor
    Y = frame[0, 0, :, :]
    U = frame[0, 1, :, :]
    V = frame[0, 2, :, :]

    # Populate the parsed tensor
    parsed[0] = Y[0:H:2, 0::2]
    parsed[1] = Y[1:H:2, 0::2]
    parsed[2] = Y[0:H:2, 1::2]
    parsed[3] = Y[1:H:2, 1::2]
    parsed[4] = U[0:H//2, 0::2]  # assuming U is already appropriately sized
    parsed[5] = V[0:H//2, 0::2]  # assuming V is already appropriately sized
    
    return parsed.unsqueeze(0)

def seperate_points_and_std_values(df):
	points = df.iloc[lambda x: x.index % 2 == 0]
	std = df.iloc[lambda x: x.index % 2 != 0]
	points = pd.concat([points], ignore_index = True)
	std = pd.concat([std], ignore_index = True)
	return points, std

def display_image(image):
	plt.imshow(image)
	plt.show()
	plt.clf()

def parse_input(session, imgs_array):
	# Retrieve input names
	input_imgs = session.get_inputs()[0].name # ROI (Region Of Interest) area
	big_input_imgs = session.get_inputs()[1].name # Wide frame
	desire = session.get_inputs()[2].name
	traffic_convention = session.get_inputs()[3].name
	nav_features = session.get_inputs()[4].name
	features_buffer = session.get_inputs()[5].name

	# ORT_TYPES_TO_NP_TYPES = {'tensor(float16)': np.float16, 'tensor(float)': np.float32, 'tensor(uint8)': np.uint8}
	# ready to run onnx model ['input_imgs', 'big_input_imgs', 'desire', 'traffic_convention', 'nav_features', 'features_buffer'] [[1, 12, 128, 256], [1, 12, 128, 256], [1, 100, 8], [1, 2], [1, 256], [1, 99, 128]]
	# Prepare input data: https://github.com/commaai/openpilot/tree/fa310d9e2542cf497d92f007baec8fd751ffa99c/selfdrive/modeld/models
	input_imgs_data = np.array(imgs_array[0]).astype('float16')
	input_imgs_data.resize((1,12,128,256)) # [1, 12, 128, 256]
	big_input_imgs_data = np.array(imgs_array[1]).astype('float16')
	big_input_imgs_data.resize((1,12,128,256)) # [1, 12, 128, 256]
	desire_data = np.array([0]).astype('float16') 
	desire_data.resize((1,100,8)) # [1, 100, 8]
	traffic_convention_data = np.array([1, 0]).astype('float16')
	traffic_convention_data.resize((1,2)) # [1, 2]
	nav_features_data = np.array([0]).astype('float16')
	nav_features_data.resize((1,256)) # [1, 256]
	features_buffer_data = np.array([0]).astype('float16') 
	features_buffer_data.resize((1,99,128)) # [1, 99, 128]
	
	return {input_imgs: input_imgs_data,
			big_input_imgs: big_input_imgs_data,
			desire: desire_data,
			traffic_convention: traffic_convention_data,
			nav_features: nav_features_data,
			features_buffer: features_buffer_data}

def parse_output(output):
	# output tensor shape (6120, )
	# Model Output: https://github.com/commaai/openpilot/blob/fa310d9e2542cf497d92f007baec8fd751ffa99c/selfdrive/modeld/models/driving.h#L239
	plan_idx_start = 0
	plan_idx_end = 4955
	lane_idx_start = plan_idx_end
	lane_idx_end = lane_idx_start + 528
	lane_prob_idx_start = lane_idx_end
	lane_prob_idx_end = lane_prob_idx_start + 8
	road_edges_idx_start = lane_prob_idx_end
	road_edges_idx_end = road_edges_idx_start + 264
	leads_idx_start = road_edges_idx_end
	leads_idx_end = leads_idx_start + 105
	meta_idx_start = leads_idx_end
	meta_idx_end = meta_idx_start + 88
	pose_idx_start = meta_idx_end
	pose_idx_end = pose_idx_start + 12
	wide_from_device_euler_idx_start = pose_idx_end
	wide_from_device_euler_idx_end = wide_from_device_euler_idx_start + 6
	temporal_pose_idx_start = wide_from_device_euler_idx_end
	temporal_pose_idx_end = temporal_pose_idx_start + 12
	road_transform_idx_start = temporal_pose_idx_end
	road_transform_idx_end = road_transform_idx_start + 12
	feature_buffer_idx_start = road_transform_idx_end
	feature_buffer_idx_end = feature_buffer_idx_start + 128
	padding_idx_start = feature_buffer_idx_end
	padding_idx_end = padding_idx_start + 2
	return {"plans": output[plan_idx_start:plan_idx_end],
			"lanes": output[lane_idx_start:lane_idx_end].flatten(),
			"lanes_probs": output[lane_prob_idx_start:lane_prob_idx_end],
			"road_edges": output[road_edges_idx_start:road_edges_idx_end],
			"leads": output[leads_idx_start:leads_idx_end],
			"meta": output[meta_idx_start:meta_idx_end],
			"pose": output[pose_idx_start:pose_idx_end],
			"wide_euler": output[wide_from_device_euler_idx_start:wide_from_device_euler_idx_end],
			"temporal_pose": output[temporal_pose_idx_start:temporal_pose_idx_end],
			"road_transform": output[road_transform_idx_start:road_transform_idx_end],
			"feature_buffer": output[feature_buffer_idx_start:feature_buffer_idx_end],
			"padding": output[padding_idx_start:padding_idx_end]}

def place_patch(frames, patch, patch_size=(50, 50), eot_locations=[], eot_rotations=[], eot_scales=[]):
	"""
	Places a patch on 2 consecutive frames with Expectation over Transform (EoT).

	Parameters:
	- frames: List of 2 tensors of shape (N, C, H, W), the batch of frames.
	- patch: Tensor of shape (N, C, H_patch, W_patch), the patch to place.
	- patch_size: Tuple (H_patch, W_patch), the size of the patch.
	- eot_locations: List of tuples [(x, y)], locations to place the patch.
	- eot_rotations: List of angles in degrees to rotate the patch.
	- eot_scales: List of scale factors to resize the patch.

	Returns:
	- frames_patches: List of 2 lists of transformed frames with patches applied for consecutive frames.
	"""
	frames_patches = []
	for frame in frames:
		frame_transforms = []
		for (x, y) in eot_locations:
			for rotation in eot_rotations:
				for scale in eot_scales:
					# Clone the frame
					frame_with_patch = frame.clone()
					
					# Resize (scale) the patch
					scaled_patch = F.interpolate(patch, scale_factor=scale, mode='bilinear', align_corners=False)

					# Calculate new patch size after scaling
					new_H_patch, new_W_patch = scaled_patch.shape[2], scaled_patch.shape[3]
					
					# Create an affine transformation matrix for rotation
					theta = torch.tensor([
						[torch.cos(torch.tensor(rotation)), -torch.sin(torch.tensor(rotation)), 0],
						[torch.sin(torch.tensor(rotation)), torch.cos(torch.tensor(rotation)), 0]
					], dtype=torch.float32)
					
					# Grid for sampling
					grid = F.affine_grid(theta.unsqueeze(0), scaled_patch.size(), align_corners=False)
					
					# Apply the affine transformation (rotation)
					rotated_patch = F.grid_sample(scaled_patch, grid, mode='bilinear', padding_mode='zeros', align_corners=False)
					
					# Place the rotated and scaled patch onto the frame
					frame_with_patch[:, :, y:y + new_H_patch, x:x + new_W_patch] = rotated_patch
					
					# Append the transformed frame to the list
					frame_transforms.append(frame_with_patch)
		
		frames_patches.append(frame_transforms)

	return frames_patches

def display_img(image):
	plt.imshow(image)
	plt.axis('off')  # Turn off axis
	plt.show()
	plt.clf()

def numpy_to_tensor(array):
	# Convert image from BGR to RGB as PyTorch uses RGB by default
	frame_rgb = cv2.cvtColor(array, cv2.COLOR_BGR2RGB)
	# Convert to float32 for precision, then to float16
	tensor = torch.tensor(frame_rgb, dtype=torch.float32).permute(2, 0, 1).unsqueeze(0)
	return tensor.to(torch.float16)

def tensor_to_numpy(tensor):
	# Convert back to float32 to avoid overflow when converting to uint8
	tensor_float32 = tensor.squeeze(0).permute(1, 2, 0).to(torch.float32)
	image_back = tensor_float32.detach().numpy().astype(np.uint8)
	# Convert RGB back to BGR
	return cv2.cvtColor(image_back, cv2.COLOR_RGB2BGR)

def check_images(frame):
	# Step 1: Load the image using OpenCV
	#frame = cv2.imread(data_dir + frame_name)

	# Step 2: Convert the image to a PyTorch tensor in float16
	# Convert image from BGR to RGB as PyTorch uses RGB by default
	frame_rgb = numpy_to_tensor(frame)

	# Step 3: Convert the tensor back to a NumPy array
	# Convert back to float32 to avoid overflow when converting to uint8
	image_back_bgr = tensor_to_numpy(frame_rgb)

	# Check if both images are the same
	assert np.array_equal(frame, image_back_bgr), "The images are not the same!"
	display_img(frame)
	display_img(image_back_bgr)

def disappearance_loss(patch, conf, patchDist, realDist, l1=0.01, l2=0.001):
	Lconf = -torch.log(1 - conf) # 1-conf ya que se busca minimizar conf
	Ldist = -torch.abs(patchDist/realDist)
	# Compute differences along height and width
	diff_h = patch[:, :, 1:, :] - patch[:, :, :-1, :]
	diff_w = patch[:, :, :, 1:] - patch[:, :, :, :-1]
	Ltv = torch.sum(torch.abs(diff_h)) + torch.sum(torch.abs(diff_w))
	return Lconf + l1*Ldist #+ l2*Ltv

width = 512
height = 256
dim = (height, width)
data_dir = 'frames/'
frames_names = os.listdir(data_dir)
parsed_images = []
epochs = 10
learning_rate = 0.1

# Sort frame filenames, by default 100.png is before 20.png
frames_names = sorted(frames_names, key=lambda x: int(re.search(r'\d+', x).group()))

# Initialize randomly an adversarial patch and scale from [0,1] to [0,255]
patch_size = (50, 50)
adversarial_patch = nn.Parameter(torch.rand(1, 3, patch_size[0], patch_size[1], requires_grad=True)*255) #255 demasiado grande para optimizacion
display_img(adversarial_patch.detach().squeeze(0).permute(1,2,0).numpy().astype(np.uint8))

# Define optimizer
optimizer = optim.Adam([adversarial_patch], lr=learning_rate)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

# EoT: (x, y, w, h) 1928x1208 ¿¿rotation, scale??
center_x = 964
center_y = 604
eot_locations = [(center_x-patch_size[0],center_y-patch_size[1]),
                 (center_x-patch_size[0],center_y+patch_size[1]),
                 (center_x,center_y),
                 (center_x+patch_size[0],center_y-patch_size[1]),
                 (center_x+patch_size[0],center_y+patch_size[1]),
                ]
eot_scales = [1, 2, 3]
eot_rotations = [0]
num_eot_transforms = len(eot_locations)*len(eot_rotations)*len(eot_rotations)

def preprocess_frame(frame_tensor, roi_area=None, resize_dim=(128,256)):
    x, y, w, h = roi_area
    # Extract ROI (Region Of Interest) area of an image
    roi_tensor = frame_tensor[:, :, y:y+h, x:x+w]
    # Resize the images to the required dimensions
    roi_tensor_resized = F.interpolate(roi_tensor, size=resize_dim, mode='bilinear', align_corners=False)
    # Convert to YUV
    roi_tensor_resized_yuv = rgb_to_yuv(roi_tensor_resized)
    # Parse YUV with 6 channels: YUV_4:2:0
    parsed_frame = parse_image(roi_tensor_resized_yuv)
    return parsed_frame

def euclidean_distance(pos1, pos2):
    #return math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
    # Linf = torch.norm(v, p=float('inf'))
    return torch.norm(torch.tensor(pos1) - torch.tensor(pos2), p=2).item() # L2 Norm = Euclidean distance
      
def convert_to_meters(normalized_y_position, scaling_factor=10):
    """
    Converts the normalized Y position to real-world distance in meters.
    
    Parameters:
    normalized_y_position: The Y position from the Supercombo output (normalized distance).
    scaling_factor: A conversion factor to map the normalized Y position to meters (empirical or from camera calibration).
    
    Returns:
    distance_meters: The longitudinal distance in meters.
    """
    # Apply a scaling factor to convert normalized distance to meters
    distance_meters = torch.abs(normalized_y_position) * scaling_factor  # Use abs() to ensure positive distance
    return distance_meters

def select_best_y(lead_predictions, alpha=1.0):
    """
    Select the best lead vehicle longitudinal distance based on the y_mean and y_std values.
    
    Parameters:
    - lead_predictions: A list of tuples, where each tuple contains (y_mean, y_std) as PyTorch tensors.
    - alpha: A weighting factor to penalize high uncertainty (large y_std). Default is 1.0.
    
    Returns:
    - best_y_mean: The best estimated longitudinal distance (y_mean) for the lead vehicle (as a PyTorch tensor).
    """
    best_score = torch.tensor(float('inf'))  # Initialize best score as infinity
    best_y_mean = None

    for y_mean, y_std, prob in lead_predictions:
        # Ensure y_mean and y_std are tensors
        y_mean = torch.tensor(y_mean)
        y_std = torch.tensor(y_std)

        # Calculate a score that penalizes predictions with large y_std
        score = y_mean + alpha * y_std

        # Select the prediction with the lowest score
        if score < best_score:
            best_score = score
            best_y_mean = y_mean

    return best_y_mean

def extract_lead_predictions(output, lead_idx_start=5755, lead_idx_end=5860):
    # https://github.com/commaai/openpilot/blob/fa310d9e2542cf497d92f007baec8fd751ffa99c/selfdrive/modeld/models/driving.h
    # Constants
    DESIRE_PRED_SIZE = 32
    OTHER_META_SIZE = 4

    MODEL_WIDTH = 512
    MODEL_HEIGHT = 256
    MODEL_FRAME_SIZE = MODEL_WIDTH * MODEL_HEIGHT * 3 // 2

    PLAN_MHP_N = 5
    PLAN_MHP_COLUMNS = 30
    PLAN_MHP_VALS = 30 * 33
    PLAN_MHP_SELECTION = 1
    PLAN_MHP_GROUP_SIZE = 2 * PLAN_MHP_VALS + PLAN_MHP_SELECTION

    LEAD_MHP_N = 5
    LEAD_MHP_VALS = 4
    LEAD_MHP_SELECTION = 3
    LEAD_MHP_GROUP_SIZE = 2 * LEAD_MHP_VALS + LEAD_MHP_SELECTION

    POSE_SIZE = 12

    # Index calculations
    PLAN_IDX = 0
    LL_IDX = PLAN_IDX + PLAN_MHP_N * PLAN_MHP_GROUP_SIZE
    LL_PROB_IDX = LL_IDX + 4 * 2 * 2 * 33
    RE_IDX = LL_PROB_IDX + 4
    LEAD_IDX = RE_IDX + 2 * 2 * 2 * 33
    LEAD_PROB_IDX = LEAD_IDX + LEAD_MHP_N * LEAD_MHP_GROUP_SIZE
    DESIRE_STATE_IDX = LEAD_PROB_IDX + 3
    DESIRE_LEN = 8  # Replace with the actual value of DESIRE_LEN
    META_IDX = DESIRE_STATE_IDX + DESIRE_LEN
    POSE_IDX = META_IDX + OTHER_META_SIZE + DESIRE_PRED_SIZE
    OUTPUT_SIZE = POSE_IDX + POSE_SIZE

    lead_data = output[LEAD_IDX:LEAD_PROB_IDX] # 10701 --> 10755
    lead_probs = output[LEAD_PROB_IDX:DESIRE_STATE_IDX] # 10756 --> 10758
    #print("-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.")
    #print(lead_data)
    #print(lead_probs)
    #print("-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.")
    # LEAD DATA:
    # x: lateral
    # y: longitudinal distance
    # velocity
    # acceleration
    # (y_mean, y_std)
    lead_pos_indexes = torch.tensor([(i*LEAD_MHP_GROUP_SIZE+1,i*LEAD_MHP_GROUP_SIZE+5, i*LEAD_MHP_GROUP_SIZE+8, i*LEAD_MHP_GROUP_SIZE+9, i*LEAD_MHP_GROUP_SIZE+10) for i in range(0,LEAD_MHP_N)])
    lead_distances = [(lead_data[y_m_idx], lead_data[y_std_idx], torch.mean(torch.cat((lead_data[p1].unsqueeze(0), lead_data[p2].unsqueeze(0), lead_data[p3].unsqueeze(0)), dim=0))) for (y_m_idx,y_std_idx, p1, p2, p3) in lead_pos_indexes]
    """for lead_pos in lead_distances:
        print(lead_pos)
    print("--------------------------------------------------------------------")"""
    #y_best = select_best_y(lead_distances)
    y_best = lead_data[1]
    distance = convert_to_meters(y_best, scaling_factor=10.0)
    best_prob = torch.max(torch.sigmoid(lead_probs))
    return distance, best_prob

def compare_onnx_torch_outputs(onnx_output, torch_output):
    onnx_output = np.array(onnx_output)
    torch_output = torch_output.detach().cpu().numpy()
    assert torch_output.shape == onnx_output.shape, "[!] Error: ONNX and Torch output shapes should be the same! (1, 6210)"
    diff = torch_output[0].detach().numpy() - onnx_output[0]
    print(f"Num equals: {np.sum(np.abs(diff == 0))}")
    print(f"Num differents: {np.sum(np.abs(diff != 0))}")
    print(f"Mean Overall (6120): {np.mean(diff)}")
    print(f"Std Overall (6120): {np.std(diff)}")
    print(f"Mean differents ({np.sum(diff != 0)}): {np.mean(np.array(diff != 0))}")
    print(f"Std differents ({np.sum(diff != 0)}): {np.std(np.array(diff != 0))}")
    # TODO: mean y std
    # Visualize differences
    small_range = diff[np.abs(diff) <= 1e-5]
    print(small_range.shape)
    #small_range = diff[np.abs(diff) <= 1e-8]
    print(small_range.shape)
    plt.figure(figsize=(10, 6))
    plt.hist(small_range, bins=100, edgecolor='black')
    plt.title('Histogram of Differences between ONNX and PyTorch Outputs')
    plt.xlabel('Difference')
    plt.ylabel('Frequency')
    plt.show()

def subplot(img1, img2):
	# Create a figure with subplots
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5))

    # Display the frame in the first subplot
    ax1.imshow(img1)
    ax1.set_title('Original Frame')
    ax1.axis('off')

    # Display the adversarial patch in the second subplot
    ax2.imshow(img2)
    ax2.set_title('Adversarial Patch')
    ax2.axis('off')

    # Show the plot
    plt.tight_layout()
    plt.show()

original_input_imgs_data = None # Shape: (1, 12, 128, 256)
patched_input_imgs_data = None # Shape: (1, 12, 128, 256)
desire_data = np.array([0]).astype('float16')
desire_data.resize((1,8))
traffic_convention_data = np.array([1, 0]).astype('float16')
traffic_convention_data.resize((1,2))
initial_state_data = np.array([0]).astype('float16')
initial_state_data.resize((1,512))

# Initialize an empty DataFrame with the appropriate column names
columns = ['Patch', 'Batch_rdist', 'Batch_rconf', 'Batch_pdist', 'Batch_pconf', 'Batch_loss']
train_df = pd.DataFrame(columns=columns)

for epoch in range(0, epochs):
    for f_idx, frame_name in enumerate(frames_names):
        # Prepare frame: convert to YUV420 and resize dimensions
        # Load frame in BGR format
        frame = cv2.imread(data_dir + frame_name) # Los PNG se ven azules porque estan en RGB cuando en realidad se usa BGR
        #display_img(frame)
        # Convert numpy frame to torch tensor
        frame_tensor = numpy_to_tensor(frame)  # Shape: [1, 3, H, W] of float16 in RGB
        # Only keep the 2 consecutive new frames: delete the oldest
        if (len(parsed_images) >= 2):
            del parsed_images[0]
        parsed_images.append(frame_tensor)
        if (len(parsed_images) >= 2):
            # Location and crop area: https://github.com/commaai/openpilot/blob/fa310d9e2542cf497d92f007baec8fd751ffa99c/system/camerad/cameras/camera_qcom2.cc#L1252
            original_road_old = preprocess_frame(parsed_images[0], (96, 160, 1734, 986), dim)
            original_road_new = preprocess_frame(parsed_images[1], (96, 160, 1734, 986), dim)
            # Join the two parsed frames for original "input_imgs" input
            original_input_imgs_data = torch.cat([original_road_old, original_road_new], dim=1) # Shape: (1, 12, 128, 256)
            original_result = torch_model(torch.as_tensor(original_input_imgs_data, dtype=torch.float),
                                        torch.as_tensor(desire_data, dtype=torch.float),
                                        torch.as_tensor(traffic_convention_data, dtype=torch.float),
                                        torch.as_tensor(initial_state_data, dtype=torch.float))
            # -- Comparison outputs --
            #onnx_input = parse_input(session, (original_input_imgs_data.detach().numpy().astype("float16")))
            """onnx_result = np.array(session.run([session.get_outputs()[0].name], {"input_imgs": original_input_imgs_data.detach().numpy().astype("float32"),
                                                                                 "desire": desire_data.astype("float32"),
                                                                                 "traffic_convention": traffic_convention_data.astype("float32"),
                                                                                 "initial_state": initial_state_data.astype("float32")}))[0]"""                                                                                
            #compare_onnx_torch_outputs(onnx_result, original_result)
            #print(original_result.shape)
            #print(onnx_result.shape)
            #sys.exit(0)
            #--------------------------
            #original_result = original_result.cpu().detach().numpy() # Shape (1, 6120)
            #original_res = parse_output(original_result[0]) # Dict
        # Apply EoT transformations: returns an array of the two consecutive frames with different patch transformations
        batch = place_patch(parsed_images, adversarial_patch, patch_size, eot_locations, eot_rotations, eot_scales)
        # EoT: Expectation Over Transform
        batch_rdist = torch.tensor([])
        batch_rconf = torch.tensor([])
        batch_pdist = torch.tensor([])
        batch_pconf = torch.tensor([])
        batch_losses = torch.tensor([])
        # Compare original frames with all EoT transformations
        if (len(parsed_images) >= 2):
            for transform_idx in range(0,num_eot_transforms):
                #display_img(tensor_to_numpy(batch[0][transform_idx]))
                parsed_patch_road_old = preprocess_frame(batch[0][transform_idx], (96, 160, 1734, 986), dim)
                parsed_patch_road_new = preprocess_frame(batch[1][transform_idx], (96, 160, 1734, 986), dim)

                # dim=1 is used to get shape (1,12,128,256) if dim=0 then (2,6,128,256)
                # Join the two parsed frames for patched "input_imgs" input
                patched_input_imgs_data = torch.cat([parsed_patch_road_old, parsed_patch_road_new], dim=1) # Shape: (1, 12, 128, 256)

                # Torch model query
                patched_result = torch_model(torch.as_tensor(patched_input_imgs_data, dtype=torch.float),
                                            torch.as_tensor(desire_data, dtype=torch.float),
                                            torch.as_tensor(traffic_convention_data, dtype=torch.float),
                                            torch.as_tensor(initial_state_data, dtype=torch.float))
                #patched_result = patched_result.cpu().detach().numpy() # Shape (1, 6120)
                #patched_res = parse_output(patched_result[0]) # Dict
                #target_class_probabilities = predictions[:, target_class_index]
                #loss = -torch.mean(torch.log(target_class_probabilities))
                rDist, rConf = extract_lead_predictions(original_result[0])
                pDist, pConf = extract_lead_predictions(patched_result[0])
                #display_img(frame)
                """print(f"Real Dist: {rDist.detach().cpu().numpy()}")
                print(f"Real Conf: {rConf.detach().cpu().numpy()}")
                print(f"Adv Dist: {pDist.detach().cpu().numpy()}")
                print(f"Adv Conf: {pConf.detach().cpu().numpy()}")
                sys.exit(0)"""
                tmploss = disappearance_loss(adversarial_patch, pConf, pDist, rDist).unsqueeze(0)
                #print(tmploss)
                batch_rdist = torch.cat((batch_rdist,rDist.unsqueeze(0)))
                batch_rconf = torch.cat((batch_rconf,rConf.unsqueeze(0)))
                batch_pdist = torch.cat((batch_pdist,pDist.unsqueeze(0)))
                batch_pconf = torch.cat((batch_pconf,pConf.unsqueeze(0)))
                batch_losses = torch.cat((batch_losses,tmploss))
            # Calculate the expectation of the transforms and update patch
            # Reset to zero the gradients
            optimizer.zero_grad()
            #print(adversarial_patch.grad)
            # Calculate loss
            loss = torch.mean(batch_losses)
            #training_info.append((adversarial_patch.clone(), torch.mean(batch_rdist), torch.mean(batch_rconf), torch.mean(batch_pdist), torch.mean(batch_pconf), torch.mean(batch_losses)))
            # Append data to the DataFrame
            train_df = pd.concat([train_df, pd.DataFrame([{
                'Patch': adversarial_patch.clone(),
                'Batch_rdist': torch.mean(batch_rdist).item(),
                'Batch_rconf': torch.mean(batch_rconf).item(),
                'Batch_pdist': torch.mean(batch_pdist).item(),
                'Batch_pconf': torch.mean(batch_pconf).item(),
                'Batch_loss': torch.mean(batch_losses).item()
            }])], ignore_index=True)
            """subplot(frame, adversarial_patch.detach().squeeze(0).permute(1,2,0).numpy().astype(np.uint8))
            print(f"Epoch: {epoch} ; Frame: {f_idx}")
            print("------------------------------------")
            print(f"Real Dist: {torch.mean(batch_rdist)}")
            print(f"Real Conf: {torch.mean(batch_rconf)}")
            print(f"Adv Dist: {torch.mean(batch_pdist)}")
            print(f"Adv Conf: {torch.mean(batch_pconf)}")
            print(f"Loss: {loss}")
            print()"""
            print(f"{epoch}:{f_idx} ; {loss.item()} ; {torch.mean(batch_pdist).item()} ; {torch.mean(batch_pconf).item()}")
            # Update learning rate
            if torch.mean(batch_pconf) < torch.tensor(0.6):
                scheduler.step()
            #print(adversarial_patch.grad)
            loss.backward() # Compute gradients of the mean loss
            #print(adversarial_patch.grad)
            #print("------------------------------------------------------")
            # Update patch
            optimizer.step() # Update target tensor (adversarial_example) parameters based on computed gradients
            #dpatch = adversarial_patch.clone()
            #display_img(tensor_to_numpy(dpatch))
    print(f"[*] Epoch {epoch}...")
print(f"[*] DONE: Adversarial Patch Trained")
display_img(adversarial_patch.detach().squeeze(0).permute(1,2,0).numpy().astype(np.uint8))
display_img(tensor_to_numpy(adversarial_patch))
# Retrieve outputs: https://github.com/MTammvee/openpilot-supercombo-model/blob/main/openpilot_onnx.py

In [None]:
display_img(tensor_to_numpy(adversarial_patch.clone()))
print(train_df)