# CPU 1D Median Filter - Understanding Neighbourhood Operations
This introduces kernels that look at multiple elements (windows)

The median calculation is done explicitly (without `np.median`) to show exactly what operations should be in CUDA C.

In [None]:
import matplotlib.pyplot as plt
import numpy as np

## Functions for Median Filter

In [None]:
def calculate_median_explicit(window):
    """
    Calculate median using only basic operations available in CUDA C.
    Use bubble sort for simplicity.
    """
    # make a copy to avoid modifying original
    sorted_window = window.copy()
    n = len(sorted_window)
    
    # bubble sort - simple and sufficient for small windows
    # this directly translates to CUDA C
    for i in range(n):
        for j in range(0, n - i - 1):
            if sorted_window[j] > sorted_window[j + 1]:
                # swap elements
                temp = sorted_window[j]
                sorted_window[j] = sorted_window[j + 1]
                sorted_window[j + 1] = temp
    
    # find middle element (we ensure odd window sizes)
    middle_index = n // 2
    return sorted_window[middle_index]

In [None]:
def median_filter_1d(data, window_size=3, quiet=False):
    """
    Apply median filter to 1D data.
    Window size should be odd (3, 5, 7, etc.).
    """
    # validate inputs
    if data.size == 0:
        return np.array([])
    
    # ensure window size is odd
    if window_size % 2 == 0:
        window_size += 1
        print(f"Window size must be odd, using {window_size}")
    
    # ensure window size is not larger than data
    if window_size > data.size:
        window_size = data.size if len(data) % 2 == 1 else data.size - 1
        print(f"Window size too large, using {window_size}")
    
    half_window = window_size // 2
    result = np.zeros_like(data)
    
    if not quiet:
        print(f"Applying median filter with window size {window_size}")
        print(f"Each element looks at {half_window} neighbours on each side\n")
    
    # process each element
    for i in range(data.size):
        # determine window boundaries (handle edges)
        start = max(0, i - half_window)
        end = min(data.size, i + half_window + 1)
        
        # extract window
        window = data[start:end]
        
        # calculate median using explicit operations
        result[i] = calculate_median_explicit(window)
        
        # show details for first and last few elements
        if not quiet:
            if i < 3 or i >= len(data) - 3:
                window_str = f"[{start}:{end}]"
                median_val = f"{result[i]:.2f}"
                print(f"Position {i}: window {window_str} = {window}")
                print(f"         -> median = {median_val}")

    if not quiet:
        if data.size > 6:
            print("... (middle elements processed similarly) ...")
    
    return result

## Function to Visualise Filtered Signal

In [None]:
def visualise_median_filter(original, filtered, noisy=None):
    """Visualise the effect of median filtering."""
    num_plots = 2 if noisy is None else 3
    fig, ax = plt.subplots(num_plots, 1, figsize=(12, 8))
    
    # original signal
    ax[0].plot(original, "b-", label="Original", linewidth=2)
    ax[0].set_title("Original Signal")
    ax[0].set_ylabel("Value")
    ax[0].grid(True, alpha=0.3)
    ax[0].legend()
    
    if noisy is not None:
        ax_filtered = ax[2]
        # noisy signal
        ax[1].plot(noisy, "r-", alpha=0.7, label="Noisy")
        ax[1].plot(original, "b--", alpha=0.5, label="Original")
        ax[1].set_title("Signal with Noise")
        ax[1].set_ylabel("Value")
        ax[1].grid(True, alpha=0.3)
        ax[1].legend()
    else:
        ax_filtered = ax[1]

    # filtered signal
    ax_filtered.plot(filtered, "g-", label="Median Filtered", linewidth=2)
    ax_filtered.plot(original, "b--", alpha=0.5, label="Original")
    ax_filtered.set_title("After Median Filter")
    ax_filtered.set_xlabel("Index")
    ax_filtered.set_ylabel("Value")
    ax_filtered.grid(True, alpha=0.3)
    ax_filtered.legend()
    
    plt.tight_layout()
    plt.show()

## Removing Outliers

In [None]:
signal = np.full(20, 5)
signal[2] = 15   # outlier
signal[10] = 0   # outlier
signal[17] = 7  # outlier

print(f"Signal with outliers: {signal}")

In [None]:
# apply median filter
filtered = median_filter_1d(signal, window_size=5)
print(f"\nFiltered signal: {filtered}")

In [None]:
visualise_median_filter(signal, filtered)

## Smoothing Noisy Signal

In [None]:
# noisy signal
x = np.linspace(0, 4*np.pi, 100)
clean_signal = (np.sin(x)**2 - np.cos(x-np.pi/2))*5

noise = np.random.normal(0, 1, len(x))
noisy_signal = clean_signal + noise

In [None]:
# filter the signal with different window sizes
for ws in (3, 5, 7):
    filtered = median_filter_1d(noisy_signal, window_size=ws, quiet=True)
    visualise_median_filter(clean_signal, filtered, noisy=noisy_signal)

## Bubble Sort Break Down

In [None]:
demo_window = np.array([5.0, 2.0, 8.0, 1.0, 6.0])
print(f"Example window: {demo_window}")
print("\nBubble sort steps:")

temp_window = demo_window.copy()
n = len(temp_window)
for i in range(n):
    swapped = False
    for j in range(0, n - i - 1):
        if temp_window[j] > temp_window[j + 1]:
            # swap
            temp = temp_window[j]
            temp_window[j] = temp_window[j + 1]
            temp_window[j + 1] = temp
            swapped = True
    if swapped:
        print(f"After pass {i+1}: {temp_window}")

print(f"\nSorted window: {temp_window}")
print(f"Middle index: {n//2}")
print(f"Median value: {temp_window[n//2]}")

## Handling of edge

In [None]:
def median_filter_with_padding(data, window_size=3, padding="edge"):
    """Median filter with different padding strategies."""
    half_window = window_size // 2
    
    if padding == "edge":
        # repeat edge values
        padded = np.pad(data, half_window, mode="edge")
    elif padding == "reflect":
        # mirror values at edges
        padded = np.pad(data, half_window, mode="reflect")
    elif padding == "zero":
        # pad with zeros
        padded = np.pad(data, half_window, mode="constant", 
                       constant_values=0)
    else:
        raise ValueError(f"Unsupported padding strategy '{padding}'.") 
    
    result = np.zeros_like(data)
    
    for i in range(data.size):
        # window in padded array
        window_start = i
        window_end = i + window_size
        window = padded[window_start:window_end]
        result[i] = calculate_median_explicit(window)
    
    return result

In [None]:
edge_test = np.array([2, 0, 10, 5, 5, 5, 10, 10, 0])
print(f"Test signal: {edge_test}")

for padding in ("edge", "reflect", "zero"):
    filtered = median_filter_with_padding(edge_test, window_size=3, padding=padding)
    print(f"{padding:11}: {filtered}")