# üöÄ Day 1: Parallel Reduction

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/sdodlapati3/cuda-lab/blob/main/learning-path/week-04/day-1-parallel-reduction.ipynb)

## Learning Objectives
- Understand the parallel reduction problem
- Implement tree reduction with shared memory
- Handle multi-pass reduction for large arrays
- Apply reduction to sum, max, min

> **Primary Focus:** CUDA C++ code examples first, Python/Numba backup for interactive testing

---

In [None]:
# ‚öôÔ∏è Colab/Local Setup - Run this first!
import subprocess, sys
try:
    import google.colab
    print("üîß Running on Google Colab - Installing dependencies...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "numba"])
    print("‚úÖ Setup complete!")
except ImportError:
    print("üíª Running locally - make sure you have: pip install numba numpy")

import numpy as np
from numba import cuda
import math
import time

print(f"\nCUDA available: {cuda.is_available()}")
if cuda.is_available():
    device = cuda.get_current_device()
    print(f"Device: {device.name}")
    print(f"Warp size: {device.WARP_SIZE}")

---

## Part 1: The Reduction Problem

### What is Reduction?

Reduction combines all elements of an array into a single value using an associative operator.

```
Examples:
‚Ä¢ Sum:    [1, 2, 3, 4] ‚Üí 10
‚Ä¢ Max:    [1, 5, 3, 2] ‚Üí 5
‚Ä¢ Min:    [4, 1, 7, 2] ‚Üí 1
‚Ä¢ Product: [2, 3, 4]   ‚Üí 24
```

### Why is Parallel Reduction Hard?

```
Sequential (CPU):        Parallel (GPU):
sum = 0                  ??? How to combine?
for x in arr:            Each thread has a value
    sum += x             Need to merge them!
```

### üî∑ CUDA C++ Implementation (Primary)

The following CUDA C++ implementation demonstrates a two-pass parallel reduction algorithm using shared memory for efficient block-level reduction.

In [None]:
%%writefile reduction.cu
// reduction.cu - Parallel sum reduction with shared memory
#include <stdio.h>
#include <cuda_runtime.h>

#define BLOCK_SIZE 256

// Block-level reduction: Each block produces one partial sum
__global__ void blockReduceSum(const float* input, float* blockSums, int n) {
    __shared__ float sdata[BLOCK_SIZE];
    
    int tid = threadIdx.x;
    int gid = blockIdx.x * blockDim.x + threadIdx.x;
    int stride = blockDim.x * gridDim.x;
    
    // Phase 1: Grid-stride load and accumulate
    float sum = 0.0f;
    for (int i = gid; i < n; i += stride) {
        sum += input[i];
    }
    sdata[tid] = sum;
    __syncthreads();
    
    // Phase 2: Tree reduction within block
    for (int s = blockDim.x / 2; s > 0; s >>= 1) {
        if (tid < s) {
            sdata[tid] += sdata[tid + s];
        }
        __syncthreads();
    }
    
    // Thread 0 writes block result
    if (tid == 0) {
        blockSums[blockIdx.x] = sdata[0];
    }
}

// Final reduction of block sums
__global__ void finalReduceSum(const float* blockSums, float* result, int n) {
    __shared__ float sdata[BLOCK_SIZE];
    
    int tid = threadIdx.x;
    
    // Load block sums
    sdata[tid] = (tid < n) ? blockSums[tid] : 0.0f;
    __syncthreads();
    
    // Tree reduction
    for (int s = blockDim.x / 2; s > 0; s >>= 1) {
        if (tid < s) {
            sdata[tid] += sdata[tid + s];
        }
        __syncthreads();
    }
    
    if (tid == 0) {
        result[0] = sdata[0];
    }
}

int main() {
    int n = 10000000;  // 10M elements
    size_t size = n * sizeof(float);
    
    // Allocate and initialize host array
    float *h_input = (float*)malloc(size);
    for (int i = 0; i < n; i++) {
        h_input[i] = 1.0f;  // Sum should be n
    }
    
    // Allocate device memory
    float *d_input, *d_blockSums, *d_result;
    cudaMalloc(&d_input, size);
    
    int numBlocks = 256;
    cudaMalloc(&d_blockSums, numBlocks * sizeof(float));
    cudaMalloc(&d_result, sizeof(float));
    
    cudaMemcpy(d_input, h_input, size, cudaMemcpyHostToDevice);
    
    // First pass: reduce to block sums
    blockReduceSum<<<numBlocks, BLOCK_SIZE>>>(d_input, d_blockSums, n);
    
    // Second pass: reduce block sums to final result
    finalReduceSum<<<1, BLOCK_SIZE>>>(d_blockSums, d_result, numBlocks);
    
    // Get result
    float result;
    cudaMemcpy(&result, d_result, sizeof(float), cudaMemcpyDeviceToHost);
    
    printf("Sum of %d elements: %f (expected %d)\n", n, result, n);
    
    // Cleanup
    cudaFree(d_input); cudaFree(d_blockSums); cudaFree(d_result);
    free(h_input);
    
    return 0;
}

In [None]:
!nvcc -arch=sm_75 -o reduction reduction.cu
!./reduction

In [None]:
# CPU baseline
def cpu_sum(arr):
    return np.sum(arr)

# Test
n = 10_000_000
arr = np.random.rand(n).astype(np.float32)

start = time.perf_counter()
cpu_result = cpu_sum(arr)
cpu_time = (time.perf_counter() - start) * 1000

print(f"CPU sum of {n:,} elements: {cpu_result:.2f}")
print(f"CPU time: {cpu_time:.2f} ms")

### üî∂ Python/Numba (Optional - Quick Testing)

CPU baseline for comparison with GPU reduction.

---

## Part 2: Naive Approach (Don't Do This!)

### Using Atomic Add

In [None]:
@cuda.jit
def naive_atomic_sum(arr, result, n):
    """Naive: every thread does atomicAdd to global memory.
    
    WARNING: This is SLOW due to atomic contention!
    """
    tid = cuda.grid(1)
    stride = cuda.gridsize(1)
    
    for i in range(tid, n, stride):
        cuda.atomic.add(result, 0, arr[i])  # All threads fight for same location!

In [None]:
# Test naive approach
n = 1_000_000  # Smaller N because it's so slow!
arr = np.random.rand(n).astype(np.float32)

d_arr = cuda.to_device(arr)
d_result = cuda.to_device(np.zeros(1, dtype=np.float32))

blocks, threads = 256, 256

# Warmup
naive_atomic_sum[blocks, threads](d_arr, d_result, n)
cuda.synchronize()

# Reset and time
d_result = cuda.to_device(np.zeros(1, dtype=np.float32))
start = time.perf_counter()
naive_atomic_sum[blocks, threads](d_arr, d_result, n)
cuda.synchronize()
naive_time = (time.perf_counter() - start) * 1000

result = d_result.copy_to_host()[0]
expected = np.sum(arr)

print(f"Naive atomic sum: {result:.2f} (expected: {expected:.2f})")
print(f"Time: {naive_time:.2f} ms")
print(f"\n‚ö†Ô∏è  This is SLOW due to atomic contention!")

---

## Part 3: Tree Reduction Pattern

### The Key Insight

```
Instead of N atomic operations, use log‚ÇÇ(N) parallel steps:

Input:  [1] [2] [3] [4] [5] [6] [7] [8]
         ‚Üò   ‚Üô   ‚Üò   ‚Üô   ‚Üò   ‚Üô   ‚Üò   ‚Üô
Step 1:  [3]     [7]     [11]    [15]
          ‚Üò       ‚Üô       ‚Üò       ‚Üô
Step 2:    [10]             [26]
             ‚Üò               ‚Üô
Step 3:           [36] ‚Üê Final sum!

N = 8 elements ‚Üí log‚ÇÇ(8) = 3 steps
```

In [None]:
@cuda.jit
def block_reduce_sum(arr, block_results, n):
    """
    Tree reduction within each block using shared memory.
    Each block produces one partial sum.
    """
    # Shared memory for this block
    shared = cuda.shared.array(256, dtype=np.float32)  # Assume 256 threads
    
    tid = cuda.threadIdx.x
    bid = cuda.blockIdx.x
    gid = cuda.grid(1)
    stride = cuda.gridsize(1)
    
    # Phase 1: Each thread sums its portion (grid-stride)
    local_sum = 0.0
    for i in range(gid, n, stride):
        local_sum += arr[i]
    
    # Store in shared memory
    shared[tid] = local_sum
    cuda.syncthreads()
    
    # Phase 2: Tree reduction within block
    # Stride starts at half the block size
    s = cuda.blockDim.x // 2
    while s > 0:
        if tid < s:
            shared[tid] += shared[tid + s]
        cuda.syncthreads()
        s //= 2
    
    # Thread 0 writes block result
    if tid == 0:
        block_results[bid] = shared[0]

In [None]:
# Visualize tree reduction
def visualize_tree_reduction(values):
    """Show step-by-step tree reduction."""
    arr = list(values)
    step = 0
    
    print(f"Input:  {arr}")
    print()
    
    while len(arr) > 1:
        new_arr = []
        for i in range(0, len(arr), 2):
            if i + 1 < len(arr):
                new_arr.append(arr[i] + arr[i+1])
            else:
                new_arr.append(arr[i])
        step += 1
        print(f"Step {step}: {new_arr}")
        arr = new_arr
    
    print(f"\nFinal sum: {arr[0]}")
    return arr[0]

visualize_tree_reduction([1, 2, 3, 4, 5, 6, 7, 8])

In [None]:
# Test tree reduction
n = 10_000_000
arr = np.random.rand(n).astype(np.float32)

d_arr = cuda.to_device(arr)

blocks, threads = 256, 256
d_block_results = cuda.device_array(blocks, dtype=np.float32)

# First kernel: reduce to block sums
block_reduce_sum[blocks, threads](d_arr, d_block_results, n)
cuda.synchronize()

# Second pass: sum block results on CPU (or another kernel)
block_results = d_block_results.copy_to_host()
gpu_sum = np.sum(block_results)

expected = np.sum(arr)
print(f"Tree reduction result: {gpu_sum:.2f}")
print(f"Expected:              {expected:.2f}")
print(f"Match: {'‚úì' if np.isclose(gpu_sum, expected, rtol=1e-4) else '‚úó'}")

---

## Part 4: Complete Two-Pass Reduction

### Full GPU Sum Implementation

In [None]:
def gpu_reduce_sum(arr):
    """
    Complete GPU reduction with two kernel passes.
    
    Pass 1: Each block reduces its portion ‚Üí block_results
    Pass 2: Reduce block_results ‚Üí final sum
    """
    n = len(arr)
    
    blocks = 256
    threads = 256
    
    d_arr = cuda.to_device(arr)
    d_block_results = cuda.device_array(blocks, dtype=np.float32)
    
    # Pass 1: Reduce to block sums
    block_reduce_sum[blocks, threads](d_arr, d_block_results, n)
    
    # Pass 2: Reduce block results (use 1 block)
    d_final = cuda.device_array(1, dtype=np.float32)
    block_reduce_sum[1, threads](d_block_results, d_final, blocks)
    
    return d_final.copy_to_host()[0]

In [None]:
# Benchmark
n = 10_000_000
arr = np.random.rand(n).astype(np.float32)
iterations = 100

# CPU
start = time.perf_counter()
for _ in range(iterations):
    cpu_result = np.sum(arr)
cpu_time = (time.perf_counter() - start) / iterations * 1000

# GPU - warmup
gpu_result = gpu_reduce_sum(arr)

# GPU - timed
start = time.perf_counter()
for _ in range(iterations):
    gpu_result = gpu_reduce_sum(arr)
cuda.synchronize()
gpu_time = (time.perf_counter() - start) / iterations * 1000

print(f"Sum of {n:,} float32 elements")
print(f"{'='*40}")
print(f"CPU (numpy):  {cpu_time:.3f} ms ‚Üí {cpu_result:.2f}")
print(f"GPU (2-pass): {gpu_time:.3f} ms ‚Üí {gpu_result:.2f}")
print(f"Speedup:      {cpu_time/gpu_time:.1f}x")

---

## Part 5: Reduction Variants (Max, Min)

### Generalizing the Pattern

In [None]:
@cuda.jit
def block_reduce_max(arr, block_results, n):
    """Tree reduction for maximum value."""
    shared = cuda.shared.array(256, dtype=np.float32)
    
    tid = cuda.threadIdx.x
    bid = cuda.blockIdx.x
    gid = cuda.grid(1)
    stride = cuda.gridsize(1)
    
    # Phase 1: Each thread finds max in its portion
    local_max = -np.inf  # Initialize to negative infinity
    for i in range(gid, n, stride):
        if arr[i] > local_max:
            local_max = arr[i]
    
    shared[tid] = local_max
    cuda.syncthreads()
    
    # Phase 2: Tree reduction
    s = cuda.blockDim.x // 2
    while s > 0:
        if tid < s:
            if shared[tid + s] > shared[tid]:
                shared[tid] = shared[tid + s]
        cuda.syncthreads()
        s //= 2
    
    if tid == 0:
        block_results[bid] = shared[0]

@cuda.jit
def block_reduce_min(arr, block_results, n):
    """Tree reduction for minimum value."""
    shared = cuda.shared.array(256, dtype=np.float32)
    
    tid = cuda.threadIdx.x
    bid = cuda.blockIdx.x
    gid = cuda.grid(1)
    stride = cuda.gridsize(1)
    
    # Phase 1: Each thread finds min in its portion
    local_min = np.inf  # Initialize to positive infinity
    for i in range(gid, n, stride):
        if arr[i] < local_min:
            local_min = arr[i]
    
    shared[tid] = local_min
    cuda.syncthreads()
    
    # Phase 2: Tree reduction
    s = cuda.blockDim.x // 2
    while s > 0:
        if tid < s:
            if shared[tid + s] < shared[tid]:
                shared[tid] = shared[tid + s]
        cuda.syncthreads()
        s //= 2
    
    if tid == 0:
        block_results[bid] = shared[0]

In [None]:
def gpu_reduce_max(arr):
    """GPU max reduction."""
    n = len(arr)
    blocks, threads = 256, 256
    
    d_arr = cuda.to_device(arr)
    d_block_results = cuda.device_array(blocks, dtype=np.float32)
    
    block_reduce_max[blocks, threads](d_arr, d_block_results, n)
    
    d_final = cuda.device_array(1, dtype=np.float32)
    block_reduce_max[1, threads](d_block_results, d_final, blocks)
    
    return d_final.copy_to_host()[0]

def gpu_reduce_min(arr):
    """GPU min reduction."""
    n = len(arr)
    blocks, threads = 256, 256
    
    d_arr = cuda.to_device(arr)
    d_block_results = cuda.device_array(blocks, dtype=np.float32)
    
    block_reduce_min[blocks, threads](d_arr, d_block_results, n)
    
    d_final = cuda.device_array(1, dtype=np.float32)
    block_reduce_min[1, threads](d_block_results, d_final, blocks)
    
    return d_final.copy_to_host()[0]

In [None]:
# Test max and min
n = 10_000_000
arr = np.random.rand(n).astype(np.float32)

gpu_max = gpu_reduce_max(arr)
gpu_min = gpu_reduce_min(arr)

cpu_max = np.max(arr)
cpu_min = np.min(arr)

print(f"Max: GPU={gpu_max:.6f}, CPU={cpu_max:.6f} {'‚úì' if np.isclose(gpu_max, cpu_max) else '‚úó'}")
print(f"Min: GPU={gpu_min:.6f}, CPU={cpu_min:.6f} {'‚úì' if np.isclose(gpu_min, cpu_min) else '‚úó'}")

---

## Part 6: Optimized Reduction

### Sequential Addressing (Avoids Bank Conflicts)

In [None]:
@cuda.jit
def block_reduce_sum_optimized(arr, block_results, n):
    """
    Optimized reduction with sequential addressing.
    Avoids shared memory bank conflicts.
    """
    shared = cuda.shared.array(256, dtype=np.float32)
    
    tid = cuda.threadIdx.x
    bid = cuda.blockIdx.x
    gid = cuda.grid(1)
    stride = cuda.gridsize(1)
    block_size = cuda.blockDim.x
    
    # Phase 1: Grid-stride accumulation
    local_sum = 0.0
    for i in range(gid, n, stride):
        local_sum += arr[i]
    
    shared[tid] = local_sum
    cuda.syncthreads()
    
    # Phase 2: Sequential addressing reduction
    # Active threads are at the beginning (no divergence until end)
    s = block_size // 2
    while s > 0:
        if tid < s:
            shared[tid] += shared[tid + s]
        cuda.syncthreads()
        s //= 2
    
    if tid == 0:
        block_results[bid] = shared[0]

In [None]:
# Compare interleaved vs sequential addressing
print("Reduction Addressing Patterns")
print("="*50)
print()
print("INTERLEAVED (can cause bank conflicts):")
print("Step 1: Thread 0 adds [0]+[1], Thread 2 adds [2]+[3], ...")
print("Step 2: Thread 0 adds [0]+[2], Thread 4 adds [4]+[6], ...")
print("  ‚Üí Gaps between active threads grow!")
print()
print("SEQUENTIAL (preferred):")
print("Step 1: Thread 0 adds [0]+[128], Thread 1 adds [1]+[129], ...")
print("Step 2: Thread 0 adds [0]+[64], Thread 1 adds [1]+[65], ...")
print("  ‚Üí Active threads always contiguous!")

---

## Part 7: Comprehensive Benchmark

In [None]:
def benchmark_reductions(sizes, iterations=50):
    """Benchmark reduction across different sizes."""
    results = []
    
    for n in sizes:
        arr = np.random.rand(n).astype(np.float32)
        
        # CPU
        start = time.perf_counter()
        for _ in range(iterations):
            _ = np.sum(arr)
        cpu_time = (time.perf_counter() - start) / iterations * 1000
        
        # GPU warmup
        _ = gpu_reduce_sum(arr)
        
        # GPU timed
        start = time.perf_counter()
        for _ in range(iterations):
            _ = gpu_reduce_sum(arr)
        cuda.synchronize()
        gpu_time = (time.perf_counter() - start) / iterations * 1000
        
        speedup = cpu_time / gpu_time
        results.append((n, cpu_time, gpu_time, speedup))
    
    return results

sizes = [100_000, 500_000, 1_000_000, 5_000_000, 10_000_000, 50_000_000]
results = benchmark_reductions(sizes)

print(f"\n{'Size':>12} | {'CPU (ms)':>10} | {'GPU (ms)':>10} | {'Speedup':>8}")
print("-" * 50)
for n, cpu_t, gpu_t, speedup in results:
    print(f"{n:>12,} | {cpu_t:>10.3f} | {gpu_t:>10.3f} | {speedup:>7.1f}x")

---

## Exercises

### Exercise 1: Product Reduction

In [None]:
# TODO: Implement product reduction (multiply all elements)
# Hint: Use local_product = 1.0, then multiply

@cuda.jit
def block_reduce_product(arr, block_results, n):
    """Tree reduction for product of elements."""
    shared = cuda.shared.array(256, dtype=np.float32)
    
    tid = cuda.threadIdx.x
    bid = cuda.blockIdx.x
    gid = cuda.grid(1)
    stride = cuda.gridsize(1)
    
    # TODO: Implement product reduction
    pass

# Test with small numbers to avoid overflow
# arr = [1.01, 1.02, 1.03, ...] ‚Üí product should be reasonable

### Exercise 2: Mean Calculation

In [None]:
# TODO: Calculate mean using sum reduction
# mean = sum / n

def gpu_mean(arr):
    """Calculate mean using GPU reduction."""
    # Hint: Reuse gpu_reduce_sum and divide by len(arr)
    pass

# Test
# arr = np.random.rand(1_000_000).astype(np.float32)
# print(f"GPU mean: {gpu_mean(arr)}, NumPy mean: {np.mean(arr)}")

### Exercise 3: ArgMax (Index of Maximum)

In [None]:
# TODO: Find index of maximum element
# This is trickier - need to track both value AND index

@cuda.jit
def block_reduce_argmax(arr, block_vals, block_idxs, n):
    """Find index of maximum element."""
    # Hint: Store both value and index in shared memory
    # Compare values, but propagate indices
    pass

---

## Summary

### Reduction Pattern Overview

```
1. GRID-STRIDE ACCUMULATION
   Each thread reduces its portion of the array
   
2. SHARED MEMORY STORAGE
   Threads store local results in shared memory
   
3. TREE REDUCTION
   log‚ÇÇ(blockDim) steps to reduce to single value
   
4. BLOCK RESULT OUTPUT
   Thread 0 writes block's result
   
5. SECOND PASS
   Reduce block results to final answer
```

### Key Takeaways

1. **Naive atomic reduction is SLOW** - O(N) atomic operations
2. **Tree reduction is fast** - O(log N) parallel steps
3. **Use shared memory** for intra-block communication
4. **Two-pass approach** handles any array size
5. **Sequential addressing** avoids bank conflicts

### Complexity

| Approach | Time Complexity | Atomic Ops |
|----------|-----------------|------------|
| Naive | O(N) serial | N |
| Tree | O(log N) parallel | ~blocks |

### Next: Day 2 - Warp Primitives
Learn about warp-level shuffle operations for even faster reductions!