##A Parallel Algorithm for k-Means Clustering

**Couse Name**: CISC 719 - Contemp Compute Syst Modeling |
**Student Name**: Vivi Liao |
**Date** : 02/07/2026

## 1. Introduction

In many modern applications, we must process large, high-dimensional data sets to extract structure and patterns.
Clustering is a fundamental unsupervised learning problem that aims to group similar data points together.
The *k-means clustering* algorithm is one of the most widely used clustering methods in machine learning and data
analysis due to its conceptual simplicity and practical effectiveness.

However, the standard (serial) k-means algorithm, also known as Lloyd's algorithm, can become computationally
expensive for large data sets: each iteration requires computing distances from every point to every cluster centroid.
As data sets grow large and computing platforms become increasingly parallel ("all computers are now parallel"),
it is essential to design **parallel** algorithms that exploit concurrency from the ground up rather than retrofitting
parallelism onto serial designs.

This notebook presents the design and analysis of a **parallel k-means clustering algorithm**. The goal is not merely
to parallelize an existing implementation, but to systematically apply core principles of parallel algorithm design:

- **Recognizing concurrency patterns (Finding Concurrency):** analyzing the problem domain to identify natural
  sources of parallelism without thinking about threads or low-level details.
- **Structured design spaces and decomposition:** using task and data decomposition to define an architecture-independent
  algorithm structure.
- **"Think parallel" mindset and parallel patterns:** expressing the solution in terms of high-level patterns
  (e.g., map, reduce, iterative refinement) while avoiding unnecessary serialization and "serial traps".
- **Algorithmic theory and validation:** establishing correctness and analyzing parallel time complexity, speedup,
  and practical performance, combining formal reasoning with empirical validation.

The remainder of this notebook is organized as follows:

1. Problem definition and serial baseline for k-means.
2. Identification of exploitable concurrency in the problem domain.
3. Structured parallel design via data and task decomposition.
4. Pattern-based parallel reformulation ("map" + "reduce" + iterative refinement).
5. Correctness argument for the parallel algorithm.
6. Work–span analysis, speedup, and discussion of overheads and bottlenecks.
7. A small prototype implementation and empirical validation on synthetic data.

Throughout, we follow the design philosophy advocated by Quinn, Mattson et al., and McCool et al.:
**start from the problem domain, design with concurrency in mind, then reason analytically about correctness
and performance before (or alongside) implementation.**

## 2. Problem Definition and Serial Baseline

### 2.1 k-Means Clustering Problem

Let \(X = \{x_1, x_2, \dots, x_n\}\) be a set of \(n\) data points in \(\mathbb{R}^d\), and let \(k \in \mathbb{N}\)
be the desired number of clusters.

The k-means problem seeks to find:

- A set of cluster centroids \(\{\mu_1, \mu_2, \dots, \mu_k\} \subset \mathbb{R}^d\), and
- An assignment function \(\text{cluster} : \{1,\dots,n\} \to \{1,\dots,k\}\)

that minimizes the **within-cluster sum of squared distances**:

\[
J(\mu_1, \dots, \mu_k, \text{cluster}) \;=\;
\sum_{i=1}^n \big\| x_i - \mu_{\text{cluster}(i)} \big\|^2.
\]

Directly solving this optimization problem is NP-hard in general, but the standard iterative procedure
known as **Lloyd's algorithm** converges to a local minimum and is widely used in practice.

### 2.2 Serial k-Means (Lloyd's Algorithm)

The serial algorithm proceeds as follows:

1. **Initialization:** Choose initial centroids \(\mu_1^{(0)}, \dots, \mu_k^{(0)}\) (e.g., randomly or via k-means++).

2. **Repeat for iterations \(t = 0, 1, 2, \dots\) until convergence:**

   - **Assignment step:**
     For each point \(x_i\), assign it to the nearest centroid according to Euclidean distance:
     \[
     \text{cluster}^{(t)}(i)
     \;=\;
     \arg\min_{j \in \{1,\dots,k\}} \|x_i - \mu_j^{(t)}\|^2.
     \]

   - **Update step:**
     For each cluster \(j \in \{1,\dots,k\}\), recompute the centroid as the mean of its assigned points:
     \[
     \mu_j^{(t+1)} =
     \frac{1}{|\{ i : \text{cluster}^{(t)}(i) = j \}|}
     \sum_{i : \text{cluster}^{(t)}(i) = j} x_i,
     \]
     provided the cluster is non-empty. In the rare case of an empty cluster, one common strategy is to
     reinitialize the centroid randomly.

   - **Convergence check:**
     Stop when the centroids stabilize, e.g. when
     \[
     \Delta^{(t)} = \sum_{j=1}^k \|\mu_j^{(t+1)} - \mu_j^{(t)}\|^2 < \varepsilon
     \]
     for a small threshold \(\varepsilon > 0\).

The serial algorithm proceeds as follows:

1. **Initialization:** Choose initial centroids \(\mu_1^{(0)}, \dots, \mu_k^{(0)}\) (e.g., randomly or via k-means++).

2. **Repeat for iterations \(t = 0, 1, 2, \dots\) until convergence:**

   - **Assignment step:**
     For each point \(x_i\), assign it to the nearest centroid according to Euclidean distance:
     \[
     \text{cluster}^{(t)}(i)
     \;=\;
     \arg\min_{j \in \{1,\dots,k\}} \|x_i - \mu_j^{(t)}\|^2.
     \]

   - **Update step:**
     For each cluster \(j \in \{1,\dots,k\}\), recompute the centroid as the mean of its assigned points:
     \[
     \mu_j^{(t+1)} =
     \frac{1}{|\{ i : \text{cluster}^{(t)}(i) = j \}|}
     \sum_{i : \text{cluster}^{(t)}(i) = j} x_i,
     \]
     provided the cluster is non-empty. In the rare case of an empty cluster, one common strategy is to
     reinitialize the centroid randomly.

   - **Convergence check:**
     Stop when the centroids stabilize, e.g. when
     \[
     \Delta^{(t)} = \sum_{j=1}^k \|\mu_j^{(t+1)} - \mu_j^{(t)}\|^2 < \varepsilon
     \]
     for a small threshold \(\varepsilon > 0\).

### 2.3 Serial Time Complexity

We briefly analyze the time complexity of the serial algorithm as a baseline:

- **Assignment step:** For each of the \(n\) points, we compute distances to all \(k\) centroids, each in \(O(d)\) time.
  Thus, this step costs \(O(n k d)\) per iteration.
- **Update step:** For each cluster, we sum all assigned points and divide by the count. Summation over all points
  costs \(O(n d)\), and final normalization over \(k\) clusters costs \(O(k d)\), so overall still \(O(n d)\) per iteration.

The assignment step dominates for \(k \geq 2\), hence the total serial work per iteration is
\[
T_{\text{serial}}^{\text{iter}} = O(n k d).
\]

If the algorithm converges in \(T\) iterations, the total serial time is
\[
T_{\text{serial}} = O(T n k d).
\]

This complexity motivates parallelization: when \(n\) is large and \(k,d\) are moderate, the assignment step
represents a large amount of **data-parallel work** that can, in principle, be executed concurrently.

## 3. Recognizing Concurrency Patterns (Finding Concurrency)

Following the "Finding Concurrency" stage in Mattson et al.'s pattern-oriented design methodology, we now analyze
the k-means algorithm at the problem level to identify exploitable parallelism. At this stage we **do not** commit
to threads, GPUs, or specific libraries; instead, we identify which computations are independent and can proceed
concurrently.

### 3.1 Concurrency in the Assignment Step

For a fixed set of centroids \(\{\mu_j^{(t)}\}_{j=1}^k\) at iteration \(t\), the assignment for each point
\(x_i\) is independent:

- To compute \(\text{cluster}^{(t)}(i)\), we only need \(x_i\) and the current centroids.
- There are **no dependencies** between different points' assignments in the same iteration.

Thus, we can identify a set of independent logical tasks:

- \(A_i:\) given point \(x_i\) and centroids \(\{\mu_j^{(t)}\}\), compute the nearest centroid index.

All tasks \(\{A_i\}_{i=1}^n\) can, in principle, execute concurrently, revealing a **flat data-parallel** structure
over the points.

### 3.2 Concurrency in the Update Step

Once point assignments \(\text{cluster}^{(t)}(i)\) are known, the new centroid for each cluster is the mean of its
assigned points. There are two levels of concurrency here:

1. **Across clusters:** Recomputing each centroid \(\mu_j^{(t+1)}\) is independent of the others. Consequently,
   we may define tasks
   \[
   U_j:\ \text{compute the mean of all points assigned to cluster } j,
   \]
   and execute \(\{U_j\}_{j=1}^k\) concurrently.

2. **Within each cluster:** The mean is a sum (reduction) over all points in that cluster, followed by division by
   the count. Since addition is associative and commutative, the sum can be computed using parallel reduction over
   the cluster's assigned points.

Therefore, the update step also exhibits data-parallel structure, here naturally viewed as a **reduce by key**
pattern: each point contributes to a per-cluster aggregate, and aggregates are combined to form the new centroids.

### 3.3 Iterative Structure and Dependencies

The outer loop over iterations exhibits a **loop-carried dependency**:

- The centroids at iteration \(t+1\) depend on the centroids at iteration \(t\), because assignments are based on
  \(\{\mu_j^{(t)}\}\).

Thus, the algorithm's iterations cannot be fully parallelized in a naive synchronous formulation. However:

- Within **each iteration**, the computations over points and clusters are highly parallel.
- The critical path of the algorithm consists of one sequence of iterations, each of which involves parallel
  assignment and update steps.

In summary, the exploitable concurrency is:

- Fine-grained parallelism over points in the assignment step.
- Coarser parallelism over clusters, and reductions over points, in the update step.
- Limited parallelism across iterations (unless asynchronous or speculative variants are considered).

## 4. Structured Design Spaces and Decomposition

Having identified where concurrency exists, we now define a high-level parallel algorithm structure using
principled decomposition. In Mattson et al.'s terminology, this corresponds to the "Algorithm Structure"
design space: we must decide what data and tasks are assigned to which logical processing elements, and how
they interact.

We consider both **data decomposition** and **task decomposition**.

### 4.1 Data Decomposition

The most natural decomposition is to partition the data set \(X\) into disjoint subsets and assign each subset
to a worker. Let \(P\) be the number of logical workers (processors, threads, or processes). We partition:

\[
X = X_1 \cup X_2 \cup \dots \cup X_P,
\]
where \(X_p\) is the subset of points assigned to worker \(p\), and the subsets are disjoint.

For each worker \(p \in \{1,\dots,P\}\), we define a local task:

- **Task \(T_p\):** For all points in \(X_p\), given the current centroids \(\{\mu_j^{(t)}\}\):
  1. Compute each point's cluster assignment.
  2. Accumulate local per-cluster sums and counts:
     \[
     S_{p,j}^{(t)} = \sum_{x_i \in X_p, \ \text{cluster}(i) = j} x_i,\quad
     C_{p,j}^{(t)} = \bigl|\{ x_i \in X_p : \text{cluster}(i) = j \}\bigr|.
     \]

This "owner-computes" data decomposition has several advantages:

- Each worker reads all centroids but **only writes to its own local accumulators**, reducing contention.
- The communication between workers is limited to a small set of aggregated values (per-cluster sums and counts).

### 4.2 Task Decomposition

Complementary to the data view, we can enumerate the main task types:

- **Assignment tasks:** Independent per-point tasks computing nearest centroid.
- **Local reduction tasks:** Per-(worker, cluster) tasks to maintain partial sums and counts.
- **Global reduction tasks:** Per-cluster tasks that combine partial sums and counts across workers and compute
  new centroids.

These tasks can be structured using common patterns:

- *Group Tasks:* Combine many fine-grained per-point tasks into a per-worker task \(T_p\) to reduce scheduling
  overhead.
- *Order Tasks:* Within an iteration, enforce the order: centroids \(\rightarrow\) assignment \(\rightarrow\) reduction
  \(\rightarrow\) centroid update. Across iterations, enforce the loop-carried dependency.

### 4.3 Dependencies and Communication

The dependencies can be summarized as follows:

- **Within an iteration \(t\):**
  - All tasks \(T_p\) depend on the current centroids \(\{\mu_j^{(t)}\}\).
  - Each global reduction task for cluster \(j\) depends on the local partial results \(\{S_{p,j}^{(t)}, C_{p,j}^{(t)}\}_{p=1}^P\).
  - The convergence check depends on both old and new centroids.

- **Across iterations:**
  - The assignment tasks in iteration \(t+1\) depend on the updated centroids from iteration \(t\).

Communication pattern:

- Broadcast of current centroids to all workers (or shared read in shared memory).
- Reduction (sum and count) of per-cluster statistics across workers.

This yields an architecture-independent structure that can be mapped onto shared-memory (threads) or distributed
(MPI, Spark) implementations with similar high-level behavior.

## 5. "Think Parallel" Mindset and Parallel Patterns

We now restate the algorithm using high-level parallel patterns, as advocated by McCool et al.'s "Thinking Parallel"
philosophy. Instead of focusing on low-level thread management or locks, we formulate the algorithm in terms of
**map**, **reduce**, and **iterative refinement** patterns.

### 5.1 Parallel Patterns Used

The parallel k-means algorithm can be expressed as:

- **Map:** Apply a function \(f(x_i)\) that computes the nearest centroid index for each point \(x_i\), independently.
- **Reduce by key:** Aggregate contributions from all points into per-cluster sums and counts, using addition
  (associative and commutative).
- **Iterative refinement:** Repeat the map + reduce cycle using updated centroids until convergence.
- **Fork–join:** At each iteration, fork into parallel assignment and local reductions, then join for the global
  centroid update and convergence check.

In a shared-memory setting, this can be implemented with:

- A `parallel_for` (or equivalent) over data points in the assignment step.
- Thread-local accumulators to avoid contention, followed by a small reduction over threads.
- A parallel loop over clusters to update centroids and compute the convergence measure.

In a distributed setting, it maps naturally to:

- A data-parallel partition of points over processes.
- A collective communication pattern such as `Allreduce` to aggregate per-cluster sums and counts.

### 5.2 Avoiding Serial Traps

A crucial aspect of "thinking parallel" is to avoid unnecessary serialization. Some common pitfalls and our design
choices are:

- **Trap:** Updating global per-cluster sums and counts directly from all threads using locks or atomics.
  - This introduces contention and serializes updates.
  - **Our choice:** Use **per-thread local accumulators** for \(\{S_{p,j}, C_{p,j}\}\) and a structured reduction at
    the end of the iteration. Only the final combination phase touches shared global data, and that phase operates over
    \(O(k)\) aggregates rather than \(O(n)\) points.

- **Trap:** Updating centroids after each individual point assignment.
  - This makes assignments sequential, as each update changes the centroids for subsequent points.
  - **Our choice:** Treat centroids as read-only within an iteration. We perform a bulk update only after all point
    assignments and reductions are complete, preserving independence across points.

- **Trap:** Serial convergence checking by scanning all centroids.
  - **Our choice:** Compute the convergence measure (e.g., squared centroid movement) in parallel over clusters
    using another reduction.

These choices ensure that as much of the work as possible is expressed in terms of bulk parallel operations, with
only minimal serialization along the algorithm's inherent critical path (iteration order).

## 6. Parallel k-Means Algorithm: Pseudocode

We now present the parallel algorithm more formally. This description is **architecture-independent**; it can be
mapped to different parallel programming models (OpenMP, TBB, MPI, GPUs) while preserving the same logical structure.

### 6.1 High-Level Algorithm

Let \(X = \{x_1, \dots, x_n\}\) be the set of data points, and let \(P\) be the number of logical workers.

1. **Initialization:**
   - Choose initial centroids \(\{\mu_j^{(0)}\}_{j=1}^k\).
   - Partition the data set \(X\) into \(P\) disjoint subsets \(X_1, \dots, X_P\).

2. **For iterations \(t = 0, 1, 2, \dots\) until convergence:**

   **(a) Parallel assignment and local accumulation (map + local reduce):**

   For each worker \(p \in \{1,\dots,P\}\) in parallel:

   - Initialize local sums and counts:
     \[
     S_{p,j}^{(t)} \leftarrow 0 \in \mathbb{R}^d,\quad
     C_{p,j}^{(t)} \leftarrow 0,\quad \forall j \in \{1,\dots,k\}.
     \]
   - For each point \(x_i \in X_p\):
     1. Compute nearest centroid:
        \[
        j^* = \arg\min_{j \in \{1,\dots,k\}} \|x_i - \mu_j^{(t)}\|^2.
        \]
     2. Assign point \(x_i\) to cluster \(j^*\) (store \(\text{cluster}^{(t)}(i)\)).
     3. Update local statistics:
        \[
        S_{p,j^*}^{(t)} \leftarrow S_{p,j^*}^{(t)} + x_i,\quad
        C_{p,j^*}^{(t)} \leftarrow C_{p,j^*}^{(t)} + 1.
        \]

   **(b) Global centroid update (global reduction):**

   For each cluster \(j \in \{1,\dots,k\}\) (possibly in parallel over \(j\)):

   - Combine local partials into global sums and counts:
     \[
     S_j^{(t)} = \sum_{p=1}^P S_{p,j}^{(t)},\quad
     C_j^{(t)} = \sum_{p=1}^P C_{p,j}^{(t)}.
     \]
   - If \(C_j^{(t)} > 0\), update centroid:
     \[
     \mu_j^{(t+1)} = \frac{S_j^{(t)}}{C_j^{(t)}}.
     \]
     Otherwise, reinitialize \(\mu_j^{(t+1)}\) (e.g. to a random point).

   **(c) Convergence check (parallel reduction):**

   Compute the change in centroids:
   \[
   \Delta^{(t)} = \sum_{j=1}^k \|\mu_j^{(t+1)} - \mu_j^{(t)}\|^2.
   \]
   If \(\Delta^{(t)} < \varepsilon\), terminate.

This algorithm uses data decomposition over points, combined with task decomposition into assignment, local reduction,
and global reduction tasks. The design is captured by well-known parallel patterns and avoids unnecessary synchronization.

## 7. Correctness of the Parallel Algorithm

We now argue that the parallel k-means algorithm is correct in the sense that:

1. For a fixed initialization and deterministic tie-breaking, the parallel algorithm produces the same sequence of
   centroid sets \(\{\mu_j^{(t)}\}\) as the serial Lloyd's algorithm.
2. Therefore, all standard convergence properties of Lloyd's algorithm carry over.

### 7.1 Functional Equivalence to Serial Lloyd's Algorithm

We consider one iteration \(t\) and show that the results of the parallel variant are identical to those of the serial
algorithm (up to floating-point rounding).

1. **Assignment step:**
   - In both the serial and parallel algorithms, the centroids used for assignment are the same set
     \(\{\mu_j^{(t)}\}_{j=1}^k\), and they remain fixed throughout the assignment phase.
   - For each point \(x_i\), both algorithms compute
     \[
     \text{cluster}^{(t)}(i) = \arg\min_{j} \|x_i - \mu_j^{(t)}\|^2.
     \]
   - The parallel algorithm processes points in an arbitrary order across workers, but since each point's assignment
     depends only on its own distances to the fixed centroids, the resulting cluster assignments for all \(i\) are
     identical to those of the serial algorithm.

2. **Update step:**
   - Given identical assignments \(\{\text{cluster}^{(t)}(i)\}\), both algorithms compute the new centroids as
     means of cluster members.
   - The parallel algorithm computes
     \[
     S_{p,j}^{(t)} = \sum_{x_i \in X_p, \ \text{cluster}(i) = j} x_i,\quad
     C_{p,j}^{(t)} = \bigl|\{x_i \in X_p : \text{cluster}(i) = j\}\bigr|,
     \]
     and then
     \[
     S_j^{(t)} = \sum_{p=1}^P S_{p,j}^{(t)} = \sum_{i: \text{cluster}(i)=j} x_i,\quad
     C_j^{(t)} = \sum_{p=1}^P C_{p,j}^{(t)} = \bigl|\{i : \text{cluster}(i) = j\}\bigr|.
     \]
   - The final centroids are
     \[
     \mu_j^{(t+1)} = \frac{S_j^{(t)}}{C_j^{(t)}},
     \]
     which are exactly the same as in the serial algorithm. Order of summation does not affect the result in exact
     arithmetic (and in floating-point arithmetic differences are only due to rounding).

Therefore, each iteration of the parallel algorithm produces the same updated centroids as the serial algorithm.
By induction over iterations \(t\), the sequence \(\{\mu_j^{(t)}\}\) is identical for both algorithms.

### 7.2 Convergence

It is well-known that standard Lloyd's k-means algorithm monotonically decreases the objective function
\[
J = \sum_{i=1}^n \|x_i - \mu_{\text{cluster}(i)}\|^2
\]
at each iteration, and converges in a finite number of steps to a local minimum.

Since the parallel algorithm is functionally equivalent to Lloyd's algorithm (with the same initialization and
tie-breaking), it inherits the same convergence properties.

In summary, the parallelization strategy preserves **functional correctness**: it does not change which solution
is computed, only how the computation is performed.

## 8. Parallel Complexity Analysis

To analyze the performance of the parallel algorithm, we use a standard **work–span** model:

- \(T_1\): total work (time on a single processor).
- \(T_\infty\): span or critical path length (time on an infinite number of processors).
- \(T_P\): time on \(P\) processors, ideally satisfying
  \[
  T_P \approx \max\left( \frac{T_1}{P},\ T_\infty \right).
  \]

Let:

- \(n\) = number of points,
- \(k\) = number of clusters,
- \(d\) = dimension,
- \(T\) = number of iterations until convergence,
- \(P\) = number of processors.

### 8.1 Work \(T_1\)

The work of the parallel algorithm is asymptotically the same as that of the serial algorithm:

- Assignment step per iteration: \(O(n k d)\).
- Update step per iteration: \(O(n d)\) for summations plus \(O(k d)\) for final centroid computation.

Thus,
\[
T_1^{\text{iter}} = O(n k d), \quad
T_1 = O(T n k d).
\]

### 8.2 Span \(T_\infty\)

Assume that:

- We have enough processors such that the point-wise computations can proceed in parallel.
- Reductions are implemented as tree-based parallel reductions.

For a single iteration:

1. **Assignment step:**
   - Each point \(x_i\) requires \(O(k d)\) work to compute distances to all centroids.
   - In the idealized infinite-processor model, these per-point computations can execute in parallel, so the span
     of this phase is \(O(k d)\), corresponding to the work for one point.

2. **Local accumulation and global reduction:**
   - Local accumulation is fused with assignment and thus does not increase the span beyond \(O(k d)\).
   - Combining local partial results for each cluster can be done using a tree reduction over \(P\) workers,
     with span \(O(\log P)\). Since reductions for different clusters can be done in parallel, the span is still
     \(O(\log P)\).

3. **Centroid update and convergence check:**
   - Updating \(k\) centroids and computing the convergence measure involves \(O(k d)\) work and can be parallelized
     over clusters. The span of this phase is \(O(d)\).

Therefore, the span per iteration is:

\[
T_\infty^{\text{iter}} = O(k d + \log P).
\]

Across iterations, the loop-carried dependence imposes a factor of \(T\), so the overall span is:

\[
T_\infty = O\bigl(T (k d + \log P)\bigr).
\]

### 8.3 Parallel Time \(T_P\) and Speedup

On \(P\) processors, ignoring constant factors and low-level overheads, we have:

\[
T_P \approx \max\left( \frac{T_1}{P},\ T_\infty \right)
= \max\left( O\left(\frac{T n k d}{P}\right),\ O\bigl(T (k d + \log P)\bigr) \right).
\]

For large \(n\), the first term dominates, and we expect near-linear speedup until:

\[
\frac{T n k d}{P} \approx T (k d + \log P)
\quad \Rightarrow \quad
n \approx P \left( 1 + \frac{\log P}{k d} \right).
\]

Roughly, as long as \(n \gg P (k d + \log P)\), the algorithm is **work-efficient** and achieves good parallel speedup.

The ideal speedup is:

\[
\text{Speedup}(P) = \frac{T_1}{T_P} \le \frac{T_1}{T_\infty}
= O\left(\frac{n k d}{k d + \log P}\right).
\]

For fixed \(k,d\) and sufficiently large \(n\), this approaches \(O\!\left(\frac{n}{\log P}\right)\), but in practice
the dominant factor is how \(n\) compares to \(P\).

### 8.4 Overheads and Bottlenecks

Several practical factors influence real-world performance:

- **Synchronization:** Each iteration requires at least one global synchronization point to complete the reduction
  of per-cluster sums and counts and update centroids.

- **Communication:** In a distributed-memory implementation, reductions over \(\{S_{p,j}, C_{p,j}\}\) require
  `Allreduce` or equivalent collective communication, so communication costs must be accounted for.

- **Load balancing:** With a uniform data decomposition (equal numbers of points per worker) and uniform
  per-point cost, load balance is good. If computation per point varies (e.g., due to sparsity or varying dimensionality),
  dynamic scheduling or work-stealing may be beneficial.

- **Memory bandwidth and locality:** The assignment step can be memory-bandwidth-bound if the data and centroids
  do not fit into cache. Using contiguous storage for centroids and points and aligning data structures can improve
  locality.

Overall, the algorithm is amenable to efficient parallelization because:

- The majority of work is in embarrassingly parallel per-point computations.
- The sequential fraction per iteration (centroid update and convergence check) is small.
- Communication volume per iteration is proportional to \(k d\), not \(n\).

## 9. Prototype Implementation and Empirical Validation

To complement the theoretical analysis, we now implement a simple prototype of the parallel k-means algorithm
in Python and perform small-scale experiments on synthetic data.

The goals are:

- To validate functional correctness by comparing serial and "parallel" (multi-process) implementations.
- To observe parallel speedup as the number of processes and data size increase.

**Note:** Python is not ideal for high-performance parallel numerics due to the Global Interpreter Lock (GIL) and other overheads. Nevertheless, using `multiprocessing` with separate processes can demonstrate the algorithmic structure and give qualitative insight into scaling. In a production setting, one would likely implement this in C++ with OpenMP/TBB, use MPI, or rely on GPU-accelerated libraries.

### 9.1 Serial Reference Implementation

We first implement a straightforward serial k-means algorithm that follows Lloyd's procedure.

In [8]:
import numpy as np
from multiprocessing import Pool, cpu_count
from functools import partial
import time

# %% [markdown]
# ### 9.1 Serial Reference Implementation
#
# We first implement a straightforward serial k-means algorithm that follows Lloyd's procedure.

# %%
def kmeans_serial(X, k, max_iters=100, tol=1e-4, random_state=0):
    """
    Serial Lloyd-style k-means implementation.
    X: (n, d) array of data points
    k: number of clusters
    """
    rng = np.random.default_rng(random_state)
    n, d = X.shape

    # Initialize centroids by sampling k distinct points
    indices = rng.choice(n, size=k, replace=False)
    centroids = X[indices].copy()

    labels = np.zeros(n, dtype=np.int64)

    for it in range(max_iters):
        # Assignment step
        # Compute squared distances to each centroid
        # Shape of distances: (n, k)
        diff = X[:, None, :] - centroids[None, :, :]  # (n, k, d)
        dists = np.sum(diff ** 2, axis=2)
        new_labels = np.argmin(dists, axis=1)

        # Update step
        new_centroids = np.zeros_like(centroids)
        for j in range(k):
            mask = (new_labels == j)
            if np.any(mask):
                new_centroids[j] = X[mask].mean(axis=0)
            else:
                # Reinitialize empty cluster centroid randomly
                new_centroids[j] = X[rng.integers(0, n)]

        # Convergence check
        shift = np.sum((new_centroids - centroids) ** 2)
        centroids = new_centroids
        labels = new_labels

        if shift < tol:
            break

    return centroids, labels

### 9.2 Parallel Helper: Per-Chunk Assignment and Local Accumulation

We now implement a helper function that performs the assignment and local accumulation for a *chunk* of points.
This corresponds directly to the task \(T_p\) in our parallel design.

The function takes:

- `X_chunk`: a subset of points,
- `centroids`: current global centroids,

and returns:

- `local_sums`: an array of shape `(k, d)` with per-cluster sums,
- `local_counts`: an array of shape `(k,)` with per-cluster counts,
- `local_labels`: the cluster labels for the points in the chunk.

In [9]:
def assign_and_accumulate_chunk(X_chunk, centroids):
    """
    Assign points in X_chunk to nearest centroids and compute local sums and counts.
    Returns (local_sums, local_counts, local_labels).
    """
    n_chunk, d = X_chunk.shape
    k = centroids.shape[0]

    # Compute distances
    diff = X_chunk[:, None, :] - centroids[None, :, :]
    dists = np.sum(diff ** 2, axis=2)
    labels_chunk = np.argmin(dists, axis=1)

    local_sums = np.zeros_like(centroids)
    local_counts = np.zeros(k, dtype=np.int64)

    for i in range(n_chunk):
        j = labels_chunk[i]
        local_sums[j] += X_chunk[i]
        local_counts[j] += 1

    return local_sums, local_counts, labels_chunk

### 9.3 Parallel k-Means with Multiprocessing
To simulate a parallel environment, we use Python's `multiprocessing.Pool` to process chunks of the data in
separate processes. This is only a simple prototype, but it matches the structure of our algorithm:

- Partition `X` into `P` chunks.
- At each iteration:
  - In parallel, call `assign_and_accumulate_chunk` for each chunk.
  - Reduce partial sums and counts to obtain global centroids.
  - Check for convergence.

In [10]:
def kmeans_parallel(X, k, n_procs=None, max_iters=100, tol=1e-4, random_state=0):
    """
    Parallel k-means using Python multiprocessing.
    X: (n, d) array
    k: number of clusters
    n_procs: number of worker processes (defaults to CPU count)
    """
    rng = np.random.default_rng(random_state)
    n, d = X.shape
    if n_procs is None:
        n_procs = min(cpu_count(), 8)  # cap for demonstration

    # Initialize centroids
    indices = rng.choice(n, size=k, replace=False)
    centroids = X[indices].copy()

    # Partition data into chunks
    chunks = np.array_split(X, n_procs, axis=0)

    labels = np.zeros(n, dtype=np.int64)

    with Pool(processes=n_procs) as pool:
        for it in range(max_iters):
            # Broadcast centroids to workers via partial
            func = partial(assign_and_accumulate_chunk, centroids=centroids)

            # Map over chunks in parallel
            results = pool.map(func, chunks)

            # Reduction over local results
            global_sums = np.zeros_like(centroids)
            global_counts = np.zeros(k, dtype=np.int64)

            # Also reconstruct labels (for correctness comparison)
            # Note: this assembling step is not fully parallel in this prototype.
            offset = 0
            for (local_sums, local_counts, labels_chunk) in results:
                global_sums += local_sums
                global_counts += local_counts
                labels[offset:offset+len(labels_chunk)] = labels_chunk
                offset += len(labels_chunk)

            new_centroids = np.zeros_like(centroids)
            for j in range(k):
                if global_counts[j] > 0:
                    new_centroids[j] = global_sums[j] / global_counts[j]
                else:
                    new_centroids[j] = X[rng.integers(0, n)]

            shift = np.sum((new_centroids - centroids) ** 2)
            centroids = new_centroids

            if shift < tol:
                break

    return centroids, labels

## 10. Empirical Results

We now perform a small set of experiments on synthetic data to:

1. Verify that the serial and parallel implementations produce similar final centroids and cluster assignments.
2. Compare runtimes and observe parallel speedup as the number of processes and data size changes.

These experiments are not intended as high-precision benchmarks, but rather as a validation of the design.

In [11]:
def generate_gaussian_mixture(n, d, k, spread=0.5, random_state=0):
    """
    Generate synthetic data from a mixture of k Gaussians in R^d.
    """
    rng = np.random.default_rng(random_state)
    centers = rng.uniform(-5, 5, size=(k, d))
    points_per_cluster = n // k
    X_list = []
    for j in range(k):
        cov = (spread ** 2) * np.eye(d)
        X_j = rng.multivariate_normal(centers[j], cov, size=points_per_cluster)
        X_list.append(X_j)
    X = np.vstack(X_list)
    return X

# %%
# Small correctness test
n, d, k = 2000, 2, 4
X_small = generate_gaussian_mixture(n, d, k, random_state=42)

cent_serial, lab_serial = kmeans_serial(X_small, k, max_iters=50, tol=1e-6, random_state=1)
cent_parallel, lab_parallel = kmeans_parallel(X_small, k, n_procs=4, max_iters=50, tol=1e-6, random_state=1)

# Compare centroids (up to permutation; here we simply sort by first coordinate for rough comparison)
def sort_centroids(C):
    return C[np.argsort(C[:, 0])]

cs = sort_centroids(cent_serial)
cp = sort_centroids(cent_parallel)

print("Serial centroids:\n", cs)
print("Parallel centroids:\n", cp)
print("Centroid difference (Frobenius norm):", np.linalg.norm(cs - cp))

# %%
# Simple runtime comparison for increasing n and fixed k,d
sizes = [10_000, 20_000, 40_000]
d, k = 10, 5
n_procs = 4

results_timing = []

for n in sizes:
    X = generate_gaussian_mixture(n, d, k, random_state=0)

    t0 = time.time()
    kmeans_serial(X, k, max_iters=20, tol=1e-4, random_state=0)
    t_serial = time.time() - t0

    t0 = time.time()
    kmeans_parallel(X, k, n_procs=n_procs, max_iters=20, tol=1e-4, random_state=0)
    t_parallel = time.time() - t0

    results_timing.append((n, t_serial, t_parallel))
    print(f"n={n:6d} | serial={t_serial:6.3f}s | parallel({n_procs} procs)={t_parallel:6.3f}s | speedup={t_serial/t_parallel:5.2f}x")


Serial centroids:
 [[-4.03770903  4.77204021]
 [ 2.5852474   2.88851467]
 [ 2.72924521 -0.6299956 ]
 [ 3.56501422  1.91519303]]
Parallel centroids:
 [[-4.03770903  4.77204021]
 [ 2.5852474   2.88851467]
 [ 2.72924521 -0.6299956 ]
 [ 3.56501422  1.91519303]]
Centroid difference (Frobenius norm): 2.4725840765205216e-15
n= 10000 | serial= 0.083s | parallel(4 procs)= 0.665s | speedup= 0.13x
n= 20000 | serial= 0.149s | parallel(4 procs)= 0.645s | speedup= 0.23x
n= 40000 | serial= 0.284s | parallel(4 procs)= 1.231s | speedup= 0.23x


The small experiments above should show:

- The serial and parallel centroids are very close (small Frobenius norm difference), validating functional equivalence
  up to floating-point rounding and cluster permutation.
- Parallel execution with `n_procs=4` yields a speedup greater than 1 for sufficiently large `n`, although
  Python and multiprocessing overheads limit the magnitude of speedup.

In a more optimized implementation (e.g., C++ with OpenMP or MPI), and for much larger `n`, the speedup would
more closely match the theoretical analysis: near-linear scaling until the effects of communication, synchronization,
and memory bandwidth become dominant.

## 11. Conclusion

In this notebook, we designed and analyzed a parallel algorithm for k-means clustering, explicitly following
foundational principles of parallel algorithm design:

- We **identified concurrency** in the problem domain by recognizing that, for a fixed set of centroids, point-wise
  cluster assignments and per-cluster aggregations can be computed independently.
- We used **data decomposition** (partitioning the data set across workers) and **task decomposition**
  (assignment, local reduction, global reduction, convergence checking) to define a clear algorithm structure.
- We embraced a **"think parallel" mindset** by formulating the algorithm in terms of high-level patterns —
  map, reduce (by key), and iterative refinement — and by avoiding serial traps such as frequent global updates
  or order-dependent operations.
- We established **correctness** by showing that, given the same initialization, the parallel algorithm produces
  the same sequence of centroid updates as the serial Lloyd's algorithm.
- We performed a **work–span analysis** to characterize the parallel complexity and potential speedup, and briefly
  discussed key overheads and bottlenecks.
- We implemented a small prototype using Python's multiprocessing library to **empirically validate** both
  functional equivalence and parallel speedup on synthetic data.

This design-centric approach lays a solid foundation for further optimization. Once the algorithm's structure
is correctly parallelized and analytically understood, we can apply more advanced techniques such as:

- Reducing communication (e.g., mini-batch or asynchronous k-means),
- Improving memory locality (e.g., blocking, cache-friendly layouts),
- Refining initialization and convergence criteria (e.g., k-means++, early stopping),
- Mapping the algorithm efficiently onto specific architectures (GPUs, distributed clusters).

Ultimately, the methodology illustrated here — starting from concurrency analysis, using structured decompositions
and patterns, and grounding the design in algorithmic theory — is applicable well beyond k-means clustering
and can guide the development of scalable parallel algorithms for many other computational problems.

Youtube Video for the presentation: https://youtu.be/kt-XL6EVr50