# STAT 548/CSE 547 Tutorial: Parallel Processing

### **Instructor**:</b> Marina Meila
### **TA**: Medha Agarwal
### **Date**: May 09, 2025

---

## ⏰ Previously...

We looked at vectorization - both as a process of converting scalar operations (loop-based) into array operations and as a method for converting input data from its raw format (i.e. text ) into vectors of real numbers.

---

## 📘 Tutorial Overview

In this tutorial, we will look at methods for paralell processing.

## 1 Compute Environments


### 1.1. Basic Hardware

*Source:* [NVIDIA](https://blogs.nvidia.com/blog/2009/12/16/whats-the-difference-between-a-cpu-and-a-gpu/) and [TDS](https://towardsdatascience.com/what-is-a-gpu-and-do-you-need-one-in-deep-learning-718b9597aa0d)

- Processors are electronic circuitry that execute computer instructions, whose minimal unit is a **transistor**. 
- The two standard processors on most systems are called the **Central Processing Unit (CPU)** and the **Graphic Processing Unit (GPU)**.
- These processors differ in their allotment of transistors to computing processes. 
    - CPUs dedicate these transistors into a small number of **logical cores**, with a stronger emphasis on circuitry used for **caches** (data storage that speeds up access) and **control flow** (`if-else` statements, `for` and `while` loops, etc). 
    - GPUs, on the other hand, can have thousands of cores, each capable only of simple arithmetic operations.
- Thus, for highly parallelizable operations (those that can be split into independent **threads**), such as matrix multiplications common in ML/DL, GPUs have become an essential part of any computing infrastructure.
- That being said, parallelizing more complex operations, such as hyperparameter search, is still done on machines with a large number of CPU cores. Examples include **CPU Clusters** such as the Stat cluster, which uses a manager called Slurm to schedule computation.
- **Tensor Processing Units (TPUs)** are processors that are built specifically for the tensor operations in neural networks training. Specifically, it was made to speed up the operations of TensorFlow, which used to be the dominant DL framework.

Use `htop` in a terminal in order to "see" the physical CPU cores at work. When running operations that make use of all of the cores (such as a large matrix multiplications), you should see the bars light up. 

In [60]:
np.random.seed(123)

n_sim = 10000
n = 10000

try:
    for i in range(10000):
        A = np.random.normal(size=(n, n))
        B = np.random.normal(size=(n, n))

        C = A @ B
except KeyboardInterrupt:
    print("Graceful Exit")

When working on a server, use `tmux` to let processes run in the background while you can break the connection. This is not only a convenient method, but very safe as well. A few relevant commands:
- `tmux`: Create a session.
- `ctrl + B`, then `%`: Create a panel within a session.
- `ctrl + B`, then `D`: Detach from a session.
- `tmux ls`: List the current sessions.
- `tmux attach -t <session_name>`: Attach to a running session.

## 2. Parallelism over CPU and GPU

Parallel programming is a method of speeding up computations by dividing tasks across multiple processing units that execute simultaneously. In machine learning, this is essential for handling large datasets and complex models efficiently. 

- CPU design philosophy: optimized for sequential processing

  - Fewer, more powerful cores with complex control units
  - Large caches and branch prediction
  - Optimized for irregular memory access and control flow
  - Parallelism achieved through multithreading or multiprocessing - where a few powerful cores execute different parts of a task concurrently—useful for data loading or feature processing.

- GPU design philosophy: optimized for parallel processing
  - Thousands of simpler cores
  - Specialized for floating-point operations
  - SIMT (Single Instruction, Multiple Thread) execution model
  - Memory architecture optimized for throughput
  - well-suited for operations like matrix multiplication and deep learning training


## 3. Application 1: Vectorized Implementation

**Implicit Parallelism**

- Because matrix operations are the prototypical use case of parallelism, many computing processes can benefit from parallelization implicitly by being represented in linear algebraic form.
- Such implementations are said to be **vectorized**. The main idea is to avoid all `for` loops where possible, and use libraries like `numpy` and `pytorch` for matrix and tensor algebra.
- To develop software/algorithms in any kind of large-scale or production setting, it is very important to make vectorization a constant habit. A core part of why GPUs (and even CPUs) make processes faster is by distributing the load across all of the logical cores. Using `for` loops ruins this benefit and wastes valuable compute time.
- Another separate but related issue is list concatenation; if you are trying to apply a function to a sequence of outputs, it is sometimes tempting to have a "running" list for which you concatenate the new value through the iteration of a `for` loop. In these sitatuations, it is always better to **pre-allocate** arrays, or be confident that you are **appending** (in $O(1)$) instead of copying the entire contents of the array every time.

#### Example: Nadaraya-Watson with Gaussian Kernel

In this example, we implement multiple versions of the Nadaraya-Watson regression estimator. Specifically, consider a supervised learning problem in which we are given an $n$-sized training set $(x_1, y_1), ..., (x_n, y_n)$ of feature vectors $x_i \in \mathbb{R}^d$ and label $y_i \in \mathbb{R}$. Given a test point $x \in \mathbb{R}^d$, the predicted value of the label $\hat{y}$ is given by
\begin{equation}
    \hat{y} = \sum_{i=1}^n \frac{y_i k\left(\frac{x - x_i}{h}\right)}{\sum_{j=1}^n k\left(\frac{x - x_j}{h}\right)},
\end{equation}
where
\begin{align*}
    k(z) = e^{-\frac{1}{2}||z||^2}
\end{align*}
is the Gaussian kernel and and $h > 0$ is a bandwidth parameter. Kernel methods offer plenty of opportunities to replace wasteful `for` loops with efficient vectorized implementations, especially when $n$ and $d$ are very large.

In [5]:
import time

from sklearn.utils.validation import check_X_y, check_array
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
from sklearn.metrics import pairwise_distances
import numpy as np

In [6]:
def naive_gaussian_kernel(x1, x2, h):
    sq_norm = 0
    for j in range(len(x1)):
        sq_norm += (x1[j] - x2[j]) ** 2
    return np.exp(-0.5 * sq_norm / h ** 2)

class NaiveNadarayaWatsonRegressor:
    def __init__(self, bandwidth, kernel=naive_gaussian_kernel):
        self.h = bandwidth
        self.kernel = kernel
    
    def fit(self, X, y):
        X, y = check_X_y(X, y)
        
        self.X_train = X
        self.y_train = y
        
        return self
    
    def predict(self, X):
        X = check_array(X)
        
        y_pred = []
        for x in X:
            num = 0
            denom = 0
            for i, x_i in enumerate(self.X_train):
                num += self.y_train[i] * self.kernel(x, x_i, self.h)
                denom += self.kernel(x, x_i, self.h)
            y_pred.append(num / denom)
            
        return np.array(y_pred).reshape(-1)

**Exercise 1:** Why does the `fit` method above simply store the training data? 

**Exercise 2:** What are other algorithms that might be implemented in this way? (These are referred to as **lazy learners**.

**Exercise 3:** Based on our discussion of nearest neighbors in high dimensions, what kinds of operations could occur in the `fit` method for approximate lazy learners?

In [7]:
def generate_data(n, d):
    X = np.random.normal(size=(n, d))
    coef = np.random.normal(size=(d,))
    intercept = np.random.normal()
    noise = np.random.normal(size=(n,))
    y = (X @ coef + intercept + noise).reshape(-1)
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    return X_train, y_train, X_test, y_test

Using a test set $(x_1, y_1), ..., (x_n, y_n)$, we measure performance using the $R^2$ score (also known as explained variance).
\begin{equation}
    R^2 = \frac{\sum_{i=1}^n (\hat{y}_i - y_i)^2}{\sum_{i=1}^n (y_i - \bar{y})^2},
\end{equation}
where
\begin{equation}
    \bar{y} = \frac{1}{n} \sum_{i=1}^n y_i
\end{equation}
is the sample mean.

In [8]:
# DEMO: 
np.random.seed(123)

n = 10000
d = 10

X_train, y_train, X_test, y_test = generate_data(n, d)

print(X_train.shape)
print(y_train.shape)
print(X_test.shape)
print(y_test.shape)

(8000, 10)
(8000,)
(2000, 10)
(2000,)


In [9]:
try:
    start = time.time()
    naive_model = NaiveNadarayaWatsonRegressor(bandwidth=0.5).fit(X_train, y_train)
    y_pred = naive_model.predict(X_test)
    print("R^2: ", r2_score(y_test, y_pred))
    end = time.time()
    print("Time: %0.4f seconds." % (end - start))
except KeyboardInterrupt:
    print("Too slow! :'(")

Too slow! :'(


Give evalutation set $z_1, ..., z_m$, let $K \in \mathbb{R}^{m \times n}$ be the matrix such that 
\begin{align*}
    K_{ij} = k\left(\frac{z_i - x_j}{h}\right).
\end{align*}
Then, let $y = (y_1, ..., y_n)^\top \in \mathbb{R}^n$ and $\hat{y} = (\hat{y}_1, ..., \hat{y}_m)^\top \in \mathbb{R}^m$. It holds that 
\begin{align*}
\hat{y} = \frac{Ky}{K 1_n},
\end{align*}
where the division is element-wise. This will lead to a vectorized implementation. For now, we use the `sklearn` function for `pairwise_distances`, but we will soon see how to make a vectorized implementation of this as well.

In [10]:
class VectorizedNadarayaWatsonRegressor:
    def __init__(self, bandwidth):
        self.h = bandwidth
    
    def fit(self, X, y):
        X, y = check_X_y(X, y)
        
        self.X_train = X
        self.y_train = y
        
        return self
    
    def predict(self, X):
        X = check_array(X)
        
        # n_eval by n_train kernel matrix.
        kernels = np.exp(-0.5 * pairwise_distances(X, self.X_train) ** 2 / self.h ** 2)
        
        # Element-wise division
        return (kernels @ self.y_train) / kernels.sum(axis=1)

**Exercise 4:** How can we use matrix operations to implement a vectorized version of pairwise Euclidean distances? That is, given the matrix $X \in \mathbb{R}^{n_1 \times d}$, and the matrix $Z \in \mathbb{R}^{n_1 \times d}$, how can we produce the matrix $D \in \mathbb{R}^{n_1 \times n_2}$, where
\begin{align*}
    D_{ij} = ||x_i - z_j||_2 ?
\end{align*}

**Solution:** Represent the Euclidean distance as a scalar product.
\begin{align*}
    D_{ij} = ||x_i - z_j||_2 = \sqrt{(x_i - z_j)^\top (x_i - z_j)} = \sqrt{x_i^\top x_i + z_j^\top z_j - 2 x_i^\top z_j}.
\end{align*}
The $x_i^\top x_i$ and $z_j^\top z_j$ terms can be computed by taking the norms of the rows of $X$ and $Z$. The cross term $x_i^\top z_j$ can be computed via the matrix multiplication $X^\top Z$. Do we need to compute $X^\top X$ or $Z^\top Z$?

In [11]:
try:
    start = time.time()
    vectorized_model = VectorizedNadarayaWatsonRegressor(bandwidth=0.5).fit(X_train, y_train)
    y_pred = vectorized_model.predict(X_test)
    print("R^2: ", r2_score(y_test, y_pred))
    end = time.time()
    print("Time: %0.4f seconds." % (end - start))
except KeyboardInterrupt:
    print("Too slow! :'(")

R^2:  0.7880181382483237
Time: 0.2995 seconds.


#### Extending to the GPU

- These kinds of operations can be made even faster by using a GPU. In order to interact with it, we use the `torch` package (PyTorch).
- Using PyTorch is largely the same as `numpy`. The main difference is that the quintessential array object becomes a `torch.tensor` instead of a `numpy.ndarray`. Second, tensors exist on a "device" which can either be `"cpu"` for the CPU, or `"cuda"` for the GPU.

In [12]:
import torch

def set_device():
    # If there's a GPU available...
    if torch.cuda.is_available():

        # Tell PyTorch to use the GPU.
        device = torch.device("cuda")
        print("There are %d GPU(s) available." % torch.cuda.device_count())
        print("We will use the GPU:", torch.cuda.get_device_name(0))

    # If not...
    else:
        print("No GPU available, using the CPU instead.")
        device = torch.device("cpu")

    return device

def pdist(sample_1, sample_2, eps=1e-5):
    n_1, n_2 = sample_1.size(0), sample_2.size(0)
    norms_1 = torch.sum(sample_1**2, dim=1, keepdim=True)
    norms_2 = torch.sum(sample_2**2, dim=1, keepdim=True)
    norms = (norms_1.expand(n_1, n_2) +
             norms_2.transpose(0, 1).expand(n_1, n_2))
    distances_squared = norms - 2 * sample_1.mm(sample_2.t())
    return torch.sqrt(eps + torch.abs(distances_squared))

class GPUNadarayaWatsonRegressor:
    def __init__(self, bandwidth):
        self.h = bandwidth
        self.device = set_device()
    
    def fit(self, X, y):
        X, y = check_X_y(X, y)
        
        self.X_train = torch.from_numpy(X).to(self.device)
        self.y_train = torch.from_numpy(y).to(self.device)
        
        return self
    
    def predict(self, X):
        X = check_array(X)
        X = torch.from_numpy(X).to(self.device)
        
        # n_eval by n_train kernel matrix.
        kernels = torch.exp(-0.5 * pdist(X, self.X_train) ** 2 / self.h ** 2)
        
        # Element-wise division
        return (torch.matmul(kernels, self.y_train) / kernels.sum(axis=1)).to("cpu")

In [13]:
# DEMO: Run in Colab environment.
try:
    start = time.time()
    gpu_model = GPUNadarayaWatsonRegressor(bandwidth=0.5).fit(X_train, y_train)
    y_pred = gpu_model.predict(X_test)
    print("R^2: ", r2_score(y_test, y_pred))
    end = time.time()
    print("Time: %0.4f seconds." % (end - start))
except KeyboardInterrupt:
    print("Too slow! :'(")

No GPU available, using the CPU instead.
R^2:  0.7880181382483237
Time: 0.3924 seconds.


## 4. Application 2: Embarassingly Parallel Loops

**Definition**

- In the previous section we were using a combination of linear algebra representations and built-in functions to benefit from parallelization. This method only works for inherently mathematical objects, which is also the reason we got a speed up from the GPU.
- When we would like to parallelize more complicated programming logic, we have to use explicit parallelization. This means we describe explicitly a "batch" operation which should be split among various threads.
- A special (but very common) case of this situation is **embarrassingly parallel loops**. This occurs when there is a `for` loop for which the result of computation in each iteration has no relationship to the result of computation in other iterations. 
- In these cases, the code essentially "looks" like a `for` loop. We just have to instruct the machine to split this load over a certain number of cores.
- One thing to note that explicit parallelism often has an overhead cost, so sometimes for small scale problems it can even be slower than just iterating sequentially. Doing this correctly requires some knowledge of the hardware system its running on.

**Exercise 4.1:** Please name some iterated computations in machine learning settings that are and are not embarrassingly parallelizable. 

**Exercise 4.2:** Based on their description, are embarrassingly parallel workloads better distributed over CPU cores or GPU cores? Why?

### **Exercise 4.1**

*Embarrassingly parallelizable computations:*

* **Inference on independent samples**: Making predictions on multiple test inputs where each input can be evaluated separately.
* **Hyperparameter tuning** (e.g., grid search or random search): Each model configuration can be trained independently.
* **Cross-validation folds**: Training models on different folds does not depend on the others.
* **Data augmentation (offline)**: Applying random transformations to different images or sequences independently.

*Not embarrassingly parallelizable computations:*

* **Gradient descent with shared parameters**: Each iteration depends on the model state updated in previous steps.
* **Sequential RNN training**: Later steps depend on hidden states from earlier ones.
* **Backpropagation through time (BPTT)** in recurrent models: Requires passing gradients backward through dependent steps.
* **Batch normalization during training**: Requires computing statistics (mean/variance) over the entire batch, introducing dependencies.

**Exercise 4.2**

*Embarrassingly parallel workloads are often better distributed over GPU cores**—**but it depends on the nature of the task*.

* **GPUs** are ideal when the computation per task is numeric-heavy, uniform, and can be written in a vectorized or SIMD fashion. They offer massive throughput for simple operations on large datasets (e.g., image processing, vector math, matrix ops).
* **CPUs** are often better when each task involves complex logic, branching, or irregular memory access, which GPUs handle less efficiently.


When it comes to implementation, typical options in Python are `joblib` and `multiprocessing` for parallel CPU processing and PyTorch with CUDA for GPU-based inference and augmentation. In R, the `apply` family is a common option. 

In [14]:
import numpy as np
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from joblib import Parallel, delayed
import time

### 4.1 Inference on independent samples

For CPU parallelism (e.g., with scikit-learn):

In [15]:
# Load MNIST (~70,000 samples, 784 features)
mnist = fetch_openml('mnist_784', version=1, as_frame=False)
X, y = mnist.data, mnist.target.astype(int)

# Reduce size for faster training, but keep test large for benchmarking
X_train, X_test, y_train, y_test = train_test_split(X[:20000], y[:20000], test_size=0.5, random_state=42)
print(X_train.shape, X_test.shape)

# Train a logistic regression model
model = LogisticRegression(max_iter=1000, solver='lbfgs', n_jobs=-1)
model.fit(X_train, y_train)

(10000, 784) (10000, 784)


In [16]:
start_serial = time.time()

y_pred_serial = [model.predict(x.reshape(1, -1))[0] for x in X_test]

end_serial = time.time()
print(f"Serial inference time: {end_serial - start_serial:.4f} seconds")

def predict_one(x):
    return model.predict(x.reshape(1, -1))[0]

start_parallel = time.time()

y_pred_parallel = Parallel(n_jobs=-1)(delayed(predict_one)(x) for x in X_test)

end_parallel = time.time()
print(f"Parallel inference time: {end_parallel - start_parallel:.4f} seconds")


Serial inference time: 0.3704 seconds
Parallel inference time: 1.2726 seconds


If using a GPU-compatible model (e.g., PyTorch), batch inference uses GPU cores naturally:

In [39]:
import torch
import torch.nn as nn
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import time

In [50]:
# Load MNIST and move to GPU
transform = transforms.ToTensor()
test_dataset = datasets.MNIST(root="./data", train=False, transform=transform, download=True)
print(test_dataset.data.shape)
test_loader = DataLoader(test_dataset, batch_size=1024, shuffle=False)

# Simple model
class SimpleNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.fc = nn.Linear(28*28, 10)

    def forward(self, x):
        x = self.flatten(x)
        return self.fc(x)

# Initialize and move model to GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleNN().to(device)

# Dummy training step (skipped here) – just use random weights
model.eval()

# Inference on GPU
start_gpu = time.time()
all_preds = []

with torch.no_grad():
    for images, _ in test_loader:
        images = images.to(device)
        outputs = model(images)
        preds = torch.argmax(outputs, dim=1)
        all_preds.append(preds.cpu())

end_gpu = time.time()
print(f"GPU inference time: {end_gpu - start_gpu:.4f} seconds")


torch.Size([10000, 28, 28])
GPU inference time: 0.6741 seconds


### 4.2 Hyperparameter tuning

We use `joblib` to search for the best bandwidth for the Nadaraya-Watson estimator. As before, use `htop` to ensure that your code is working properly. 

In [18]:
is_parallel = True

bandwidths = np.logspace(-1, 3, 30)

# Define function that performs each iteration.
def worker(bandwidth):
    model = VectorizedNadarayaWatsonRegressor(bandwidth=bandwidth).fit(X_train, y_train)
    return r2_score(y_test, model.predict(X_test))

try:
    if is_parallel:
        # Use n_jobs to determine how many cores will be used. Negative values imply "all but".
        r2 = Parallel(n_jobs=-2)(delayed(worker)(bandwidth) for bandwidth in bandwidths)
    else:
        r2 = []
        for bandwidth in bandwidths:
            r2.append(worker(bandwidth))
except KeyboardInterrupt:
    print("Too slow! :'('")



ValueError: Input contains NaN.

In [None]:
plt.plot(bandwidths, r2, linewidth=3, color="red")
plt.xscale('log')
plt.xlabel("Bandwidth")
plt.ylabel(r"$R^2$ Score")
plt.title("Nadaraya-Watson Performance vs. Bandwidth")

### 4.3 Cross-validation folds

### 4.4 Data augmentation (offline)