# Downsampling interface

In [4]:
from abc import ABC, abstractmethod
import numpy as np

class DownsamplingInterface(ABC):
    """
    Interface-Klasse für Downsampling Methoden.
    Alle abgeleiteten Klassen müssen diese Methoden implementieren.
    """
    
    @abstractmethod
    def __init__(self):
        super().__init__()
    
    @abstractmethod
    def __call__(self, ranges: list) -> list: # TODO: list or ndarray?
        return ranges



## Box Downsampling

In [5]:
import numpy as np 

class BoxDownsampling(DownsamplingInterface):
    
    def __init__(self, box_aspect_ratio, num_boxed_beams, num_beams, angle_min, angle_max):
        self.get_boxed_indices(box_aspect_ratio, num_boxed_beams, num_beams, angle_min, angle_max)
        super().__init__()
        
    def __call__(self, ranges):
        return ranges[self.lidar_sample_idxs_]
    
    def get_boxed_indices(self, box_aspect_ratio, num_boxed_beams, num_beams, angle_min, angle_max):
        '''
        Finds an evenly spaced "boxed" pattern of beams based on the TUM paper
        "ROS-based localization of a race vehicle at high-speed using LIDAR".
        '''
        beam_angles = np.linspace(
            angle_min, angle_max, num_beams)

        mid_idx = num_beams//2
        sparse_idxs = [mid_idx]

        # Structures
        a = box_aspect_ratio
        beam_proj = 2*a*np.array([np.cos(beam_angles), np.sin(beam_angles)])
        # Allows us to do intersection math later
        beam_intersections = np.zeros((2, num_beams))

        # Compute the points of intersection along a uniform corridor of given aspect ratio
        box_corners = [(a, 1), (a, -1), (-a, -1), (-a, 1)]
        for idx in range(len(box_corners)):
            x1, y1 = box_corners[idx]
            x2, y2 = box_corners[0] if idx == 3 else box_corners[idx+1]
            for i in range(num_beams):
                x4 = beam_proj[0, i]
                y4 = beam_proj[1, i]

                den = (x1-x2)*(-y4)-(y1-y2)*(-x4)
                if den == 0:
                    continue    # parallel lines

                t = ((x1)*(-y4)-(y1)*(-x4))/den
                u = ((x1)*(y1-y2)-(y1)*(x1-x2))/den

                px = u*x4
                py = u*y4
                if 0 <= t <= 1.0 and 0 <= u <= 1.0:
                    beam_intersections[0, i] = px
                    beam_intersections[1, i] = py

        # Compute the distances for uniform spacing
        dx = np.diff(beam_intersections[0, :])
        dy = np.diff(beam_intersections[1, :])
        dist = np.sqrt(dx**2 + dy**2)
        total_dist = np.sum(dist)
        dist_amt = total_dist/(num_boxed_beams-1)

        # Calc half of the evenly-spaced interval first, then the other half
        idx = mid_idx + 1
        num_boxed_beams2 = num_boxed_beams//2 + 1
        acc = 0
        while len(sparse_idxs) <= num_boxed_beams2:
            acc += dist[idx]
            if acc >= dist_amt:
                acc = 0
                sparse_idxs.append(idx-1)
            idx += 1

            if idx == num_beams-1:
                sparse_idxs.append(num_beams-1)
                break

        mirrored_half = []
        for idx in sparse_idxs[1:]:
            new_idx = 2*sparse_idxs[0]-idx
            mirrored_half.insert(0, new_idx)
        sparse_idxs = mirrored_half + sparse_idxs

        # results 
        self.lidar_sample_idxs_ = np.array(sparse_idxs)
        # lidar_theta_lut = beam_angles[lidar_sample_idxs]
        # lidar_theta_lut = lidar_theta_lut.astype(np.single)


## Uniform downsampling

In [6]:
import numpy as np 

class UniformDownsampling(DownsamplingInterface):
    
    def __init__(self, range_min, range_max, num_uniform_beams):
        """
        :param range_min: Minimale Reichweite, unterhalb derer Strahlen ausgeschlossen werden.
        :param range_max: Maximale Reichweite, oberhalb derer Strahlen ausgeschlossen werden.
        :param num_desired_beams: Anzahl der gewünschten Strahlen nach Downsampling.
        """
        self.range_min_ = range_min
        self.range_max_ = range_max
        self.num_uniform_beams_ = num_uniform_beams
        super().__init__()       

    def __call__(self, ranges):
        """
        Verarbeitet den LaserScan: Filtert ungültige Strahlen und führt gleichmäßiges Downsampling durch.

        :param ranges: Liste oder Array der eingehenden LaserScan-Daten (Reichweiten).
        :return: Gefilterte und gleichmäßig reduzierte Reichweiten.
        """
        
        # Filter: Ausschließen von Strahlen außerhalb des gültigen Bereichs
        valid_ranges = ranges[(ranges >= self.range_min_) & (ranges <= self.range_max_)]

        # Prüfen, ob gültige Strahlen vorhanden sind
        if valid_ranges.size == 0:
            return np.array([])  # Keine gültigen Daten vorhanden

        # Gleichmäßiges Downsampling durchführen
        indices = np.linspace(0, valid_ranges.size - 1, self.num_uniform_beams_, dtype=int)
        downsampled_ranges = valid_ranges[indices]

        return downsampled_ranges
    
        
        
    



In [36]:
import numpy as np 

class UniformDownsampling():
    # TODO: description of uniform downsampling and explain approach

    identifier='UNI'
    
    def __init__(self, range_min: float, range_max: float, num_uniform_beams: int):
        """
        :param range_min: Minimale Reichweite, unterhalb derer Strahlen ausgeschlossen werden.
        :param range_max: Maximale Reichweite, oberhalb derer Strahlen ausgeschlossen werden.
        :param num_desired_beams: Anzahl der gewünschten Strahlen nach Downsampling.
        """
        self.range_min_ = range_min
        self.range_max_ = range_max
        self.num_uniform_beams_ = num_uniform_beams
    
    def __call__(self, ranges: np.ndarray) -> np.ndarray:
        """
        Verarbeitet den LaserScan: Filtert ungültige Strahlen und führt gleichmäßiges Downsampling durch.

        :param ranges: Liste oder Array der eingehenden LaserScan-Daten (Reichweiten).
        :return: Gefilterte und gleichmäßig reduzierte Reichweiten.
        """
        
        # Filter out all ranges that fall outside the specified range_min and range_max parameters
        filtered_indices = np.where((ranges >= self.range_min_) & (ranges <= self.range_max_))[0]
        
        print(type(filtered_indices))
        # Return empty array 
        num_valid_beams = len(filtered_indices)
        if num_valid_beams == 0:
            return []  # Keine gültigen Daten vorhanden

        # Uniform downsampling
        sampled_indices = np.linspace(0, num_valid_beams - 1, self.num_uniform_beams_, dtype=int)
        
        # Export downsampled range measurement
        return ranges[filtered_indices[sampled_indices]]
        
        
    
        
        
    



In [39]:
ud = UniformDownsampling(0, 10, 10)

ranges = np.random.randint(0, 21, 100)

print(type(ranges))

x = ud(ranges)





<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
[ 3  9  8  0  6  3  7  4  5  5  6  3  7  9 10  1  4  1 10  2  4  9 10  5
  5  3  2  0  6  4  3  2  5 10  1  9  1  4  8  9  5  2  8  8  5  2 10  4
  5  1  0  8  5  4  9  7  0 10  6  7]
[3 7 9 2 2 5 9 2 5 7]
