In [1]:
import numpy as np
import gym
import matplotlib.pyplot as plt
from f110_gym.envs.base_classes import Integrator
from collections import Counter,defaultdict
# np.set_printoptions(s/uppress=True,precision=4)
import random
import os
import pickle
import pandas as pd
from collections import defaultdict

In [2]:
def render_callback(env_renderer):
    # custom extra drawing function

    e = env_renderer

    # update camera to follow car
    x = e.cars[0].vertices[::2]
    y = e.cars[0].vertices[1::2]
    top, bottom, left, right = max(y), min(y), min(x), max(x)
    e.score_label.x = left
    e.score_label.y = top - 700
    e.left = left - 800
    e.right = right + 800
    e.top = top + 800
    e.bottom = bottom - 800

In [3]:
global map_path,map_ext, num_agents, network_size
num_agents = 1
network_size = 51
# map_path = '/home/praneeth/f1tenth_gym_ros/f1tenth_racetracks/Austin/Austin_map'
# map_path = '/home/praneeth/f1tenth_gym_ros/f1tenth_racetracks/Catalunya/Catalunya_map'
# map_path = '/home/praneeth/f1tenth_gym_ros/f1tenth_racetracks/Budapest/Budapest_map'
map_path = '/home/praneeth/f1tenth_gym_ros/f1tenth_racetracks/Melbourne/Melbourne_map'
map_path = '/home/praneeth/f1tenth_gym_ros/f1tenth_racetracks/Zandvoort/Zandvoort_map'
map_ext = '.png'

In [None]:
env = gym.make('f110_gym:f110-v0', map=map_path, map_ext=map_ext, num_agents=1, timestep=0.01, integrator=Integrator.RK4)
env.add_render_callback(render_callback)
sx,sy,stheta=0.,0.,1.3
obs, step_reward, done, info = env.reset(np.array([[sx,sy,stheta]]))
# env.step(np.array([[0.5,0.5]]))
env.render(mode='human')

In [None]:
def downsample_input(vector):
        segment_length = len(vector) // network_size
        reshaped_vector = vector[:segment_length * network_size].reshape(network_size, segment_length)
        reduced_vector = np.mean(reshaped_vector, axis=1)
        return reduced_vector

In [None]:
obs['scans'][0]

In [None]:
fig,ax = plt.subplots(1,2)
ax[0].plot(obs['scans'][0])
ax[1].plot(downsample_input(obs['scans'][0]))
ax[0].set(title='Original Scan')
ax[1].set(title='Downsampled Scan')
plt.show()

In [None]:
def lidar_to_binary_features(lidar_data, n_features=10, n_sectors=36, threshold_percentile=60):
    """
    Convert high-dimensional LiDAR data to binary features while preserving similarity.
    Similar inputs will produce the same sparse representation.
    
    Args:
        lidar_data (np.array): Input LiDAR readings of size 1080
        n_features (int): Size of output binary vector (default: 10)
        n_sectors (int): Number of sectors to divide the scan into (default: 36)
        threshold_percentile (float): Percentile for binary threshold (default: 70)
    
    Returns:
        np.array: Binary feature vector of size n_features
    
    Example:
        lidar_readings = np.random.rand(1080)  # Your LiDAR data
        binary_features = lidar_to_binary_features(lidar_readings)
    """
    # Ensure input is numpy array
    lidar_data = np.asarray(lidar_data, dtype=np.float32)
    
    # 1. Downsample to n_sectors using average pooling
    sector_size = len(lidar_data) // n_sectors
    sectors = lidar_data[:sector_size * n_sectors].reshape(-1, sector_size)
    sector_values = np.mean(sectors, axis=1)
    
    # 2. Calculate sector differences (gradient-like features)
    sector_diffs = np.diff(sector_values, append=sector_values[0])
    
    # 3. Combine absolute values and differences
    combined_features = np.concatenate([
        sector_values / np.max(sector_values),  # Normalized values
        (sector_diffs - np.min(sector_diffs)) / (np.max(sector_diffs) - np.min(sector_diffs))  # Normalized diffs
    ])
    
    # 4. Project to lower dimension using random projections
    # Use fixed random seed for consistency
    rng = np.random.RandomState(42)
    projection_matrix = rng.normal(0, 1, (len(combined_features), n_features))
    projected = np.dot(combined_features, projection_matrix)
    
    # 5. Apply threshold to create binary features
    threshold = np.percentile(projected, threshold_percentile)
    binary_features = (projected > threshold).astype(np.int8)
    
    return binary_features

In [None]:
bin = lidar_to_binary_features(obs['scans'][0])
bin

In [None]:
base_scan = np.random.rand(1080)
similar_scan = base_scan + np.random.normal(0.1, 0.2, size=1080) 

In [None]:
base_scan,similar_scan

In [None]:
lidar_to_binary_features(base_scan).shape,lidar_to_binary_features(similar_scan)

In [None]:
np.linspace(-270//2, 270/2, 51)[::-1]

In [None]:
import numpy as np
from collections import deque

class LiDARRewardCalculator:
    def __init__(self, history_size=10, min_speed=0.8, max_speed=2):
        self.min_safe_distance = 0.15    # Minimum safe distance from obstacles
        self.optimal_distance = 1.1     # Optimal distance from walls for racing line
        self.history_size = history_size
        self.min_speed = min_speed
        self.max_speed = max_speed
        
        # Initialize history buffers
        self.position_history = deque(maxlen=history_size)
        self.previous_lidar = None
        self.previous_min_distance = None
        
    def calculate_reward(self, lidar_data, current_speed, collision=False):
        if collision:
            return -20.0, {'collision_penalty': -20.0}
        
        # Initialize reward components
        rewards = {}
        
        # 1. Safety reward based on minimum distance to obstacles
        min_distance = np.min(lidar_data)
        safety_reward = self._calculate_safety_reward(lidar_data)
        rewards['safety'] = safety_reward
        
        # 2. Progress reward based on LiDAR changes and speed
        progress_reward = self._calculate_progress_reward(lidar_data, current_speed)
        rewards['progress'] = progress_reward
        
        # 3. Centering reward for keeping optimal distance from walls
        centering_reward = self._calculate_centering_reward(lidar_data)
        rewards['centering'] = centering_reward
        
        # 4. Speed reward with safety consideration
        # speed_reward = self._calculate_speed_reward(current_speed, min_distance)
        # rewards['speed'] = speed_reward
        
        # # 5. Smoothness reward based on consecutive readings
        # smoothness_reward = self._calculate_smoothness_reward(lidar_data)
        # rewards['smoothness'] = smoothness_reward
        
        # Combine rewards with adjusted weights
        total_reward = (
            0.5 * safety_reward +      # Increased weight for safety
            0.25 * progress_reward +   # Progress is still important
            0.25 * centering_reward    # Centering for a good racing line
            # 0.25 * speed_reward +      # Speed with safety consideration
            # 0.1 * smoothness_reward    # Smooth driving
        )
        
        # Update state for next calculation
        self.previous_lidar = lidar_data.copy()
        self.previous_min_distance = min_distance
        
        return total_reward, rewards
    
    def _calculate_safety_reward( self,lidar):
        """Reward for maintaining a safe distance from obstacles."""       
        left_min = np.min(lidar[:,:int(lidar.shape[1]*0.3)])
        center_min = np.min(lidar[:,int(lidar.shape[1]*0.3):-int(lidar.shape[1]*0.3)])
        right_min = np.min(lidar[:,-int(0.3*lidar.shape[1]):])

        if left_min < self.min_safe_distance or right_min <self.min_safe_distance :
            # Penalty for unsafe distances
            return -10.0 * np.exp(-np.mean([left_min,right_min,center_min]))
        
        elif center_min < self.min_safe_distance:
             return -1 * np.exp(-center_min)
        
        elif left_min < self.min_safe_distance and center_min < self.min_safe_distance:
             return -10.0 * np.exp(- np.mean([left_min,center_min]))
        
        elif right_min <self.min_safe_distance and center_min<self.min_safe_distance:
             return -10.0 * np.exp(- np.mean([right_min,center_min]))
        else:
            # Reward for safe distances, maximized near optimal distance
            return 10.0 * np.exp(- np.mean([left_min,right_min,center_min]))
    
    def _calculate_progress_reward(self, lidar_data, current_speed):
        """Reward for making progress along the track."""
        if self.previous_lidar is None:
            return 0.0
        
        # Calculate difference in consecutive LiDAR scans
        lidar_diff = np.mean(np.abs(lidar_data - self.previous_lidar))
        
        # Scale by speed and ensure we're actually moving
        progress = lidar_diff * np.clip(current_speed, self.min_speed, self.max_speed)
        
        # Normalize and cap progress reward
        return 5.0 * np.clip(progress, 0, 1)
    
    def _calculate_centering_reward(self, lidar_data):
        """Reward for maintaining optimal distance from walls."""
        left_distances = lidar_data[:,:lidar_data.shape[1]//2]
        right_distances = lidar_data[:,lidar_data.shape[1]//2:]
        
        left_min = np.min(left_distances)
        right_min = np.min(right_distances)
        
        centering_error = abs(left_min - right_min)
        
        return 5.0 * np.exp(-centering_error / self.optimal_distance)
    
    # def _calculate_speed_reward(self, current_speed, min_distance):
    #     """Reward for maintaining high speed while considering safety."""
    #     if min_distance < self.min_safe_distance:
    #         return -5.0  # Penalty for high speed near obstacles
            
    #     safe_speed_factor = np.clip(min_distance / self.optimal_distance, 0, 1)
    #     normalized_speed = np.clip(current_speed / self.max_speed, 0, 1)
        
    #     return 4.0 * normalized_speed * safe_speed_factor
    
    # def _calculate_smoothness_reward(self, lidar_data):
    #     """Reward for smooth driving."""
    #     if self.previous_lidar is None:
    #         return 0.0
            
    #     movement_diff = np.diff(lidar_data - self.previous_lidar)
    #     smoothness = np.mean(np.abs(movement_diff))
        
    #     return 2.0 * np.exp(-smoothness)
    
    def reset(self):
        """Reset internal state between episodes."""
        self.position_history.clear()
        self.previous_lidar = None
        self.previous_min_distance = None

# Test function to evaluate scenarios
def test_reward_scenarios():
    reward_calc = LiDARRewardCalculator()
    
    def generate_scenario(base_distance, noise=0.2):
        return np.random.normal(base_distance, noise, 1080).reshape(1,-1)
    
    safe_scenario = generate_scenario(0.8)
    unsafe_scenario = generate_scenario(0.5)
    collision_scenario = generate_scenario(0)
    
    scenarios = {
        "Safe": (safe_scenario, 1.0, False),
        "Unsafe": (unsafe_scenario, 0.8, False),
        "Collision": (collision_scenario, 1.0, True)
    }
    
    for name, (scenario, speed, collision) in scenarios.items():
        reward, components = reward_calc.calculate_reward(scenario, speed, collision)
        print(f"\n{name} Scenario:")
        print(f"Total Reward: {reward:.2f}")
        print("Component Rewards:")
        for component, value in components.items():
            print(f"  {component}: {value:.2f}")
            
# Run the test
test_reward_scenarios()

In [None]:
def calculate_safety_reward( lidar,min_safe_distance=0.1):
        """Reward for maintaining a safe distance from obstacles."""
        left = lidar[:,:int(lidar.shape[1]*0.3)]
        center = lidar[:,int(lidar.shape[1]*0.3):-int(lidar.shape[1]*0.3)]
        right = lidar[:,-int(0.3*lidar.shape[1]):]
        
        left_min = np.min(left)
        center_min = np.min(center)
        right_min = np.min(right)

        if left_min < min_safe_distance or right_min <min_safe_distance :
            # Penalty for unsafe distances
            return -10.0 * np.exp(-np.mean([left_min,right_min,center_min]))
        
        elif center_min < min_safe_distance:
             return -1 * np.exp(-center_min)
        
        elif left_min < min_safe_distance and center_min < min_safe_distance:
             return -10.0 * np.exp(- np.mean([left_min,center_min]))
        
        elif right_min <min_safe_distance and center_min<min_safe_distance:
             return -10.0 * np.exp(- np.mean([right_min,center_min]))
        else:
            # Reward for safe distances, maximized near optimal distance
            return 10.0 * np.exp(- np.mean([left_min,right_min,center_min]))

In [None]:
def calculate_progress_reward(lidar_data, current_speed,previous_lidar):
        """Reward for making progress along the track."""
        if previous_lidar is None:
            return 0.0
        
        # Calculate difference in consecutive LiDAR scans
        lidar_diff = np.mean(np.abs(lidar_data - previous_lidar))
        
        # Scale by speed and ensure we're actually moving
        progress = lidar_diff * np.clip(current_speed, 0.8, 2)
        
        # Normalize and cap progress reward
        return 5.0 * np.clip(progress, 0, 1)

In [None]:
def calculate_centering_reward( lidar_data,optimal_distance=1.0):
        """Reward for maintaining optimal distance from walls."""
        left_distances = lidar_data[:,:lidar_data.shape[1]//2]
        right_distances = lidar_data[:,lidar_data.shape[1]//2:]

        print(left_distances,right_distances)
        
        left_min = np.min(left_distances)
        right_min = np.min(right_distances)
        
        centering_error = abs(left_min - right_min)
        
        return 5.0 * np.exp(-centering_error / optimal_distance)

In [None]:
lidar = abs(np.random.uniform(0,2,(1,10)))
lidar

In [None]:
lidar.shape, lidar[:,:int(lidar.shape[1]*0.3)]

In [None]:
mi = np.min(lidar)
calculate_safety_reward(lidar)

In [None]:
new_lidar = abs(np.random.uniform(0,2,(1,10)))
new_lidar

In [None]:
n_l = lidar + np.random.normal(0.,0.05)
n_l

In [None]:
calculate_progress_reward(lidar,1,n_l)

In [None]:
calculate_centering_reward(new_lidar)

In [4]:
import numpy as np
def normalize_weights(weights, new_min=-3, new_max=3):
    # Convert to numpy array if not already
    weights = np.array(weights)
    
    # Find the current min and max
    current_min = np.min(weights)
    current_max = np.max(weights)
    
    # Normalize to new range
    scaled = (weights - current_min) / (current_max - current_min) * (new_max - new_min) + new_min
    
    return scaled

# Example usage
weights = [
    [0.2, -1.2, 0.5],
    [0.9, -0.4, 0.3]
]

normalized_weights = normalize_weights(weights)
print(normalized_weights)

[[ 1.         -3.          1.85714286]
 [ 3.         -0.71428571  1.28571429]]
