# Distributed Matrix-Vector Product in PyTorch

This notebook demonstrates how to perform a distributed matrix-vector product across multiple GPUs using PyTorch.

### Prerequisites
- A machine with multiple GPUs.
- PyTorch installed with CUDA support.

In [1]:
import torch
import time

from torch.autograd import profiler

In [2]:
torch.set_default_dtype(torch.float64)

In [3]:
# Ensure CUDA is available and get the device count
if not torch.cuda.is_available():
    raise RuntimeError("CUDA is not available. This notebook requires a machine with CUDA-enabled GPUs.")
device_count = torch.cuda.device_count()
print(f"CUDA Device Count: {device_count}")

CUDA Device Count: 5


### Example with matrix in memory

In [4]:
# Initialize a large matrix A and vector x
n = 8192
d = 100000
matrix_size = (n, d)
vector_size = (d,)

A = torch.randn(matrix_size).cuda(0)
x = torch.randn(vector_size).cuda(0)

# Split matrix A to distribute across the GPUs (by rows)
A_chunks = [chunk.to(f"cuda:{i}", non_blocking=True) for i, chunk in enumerate(torch.chunk(A, device_count, dim=0))]
x_chunks = [x.to(f"cuda:{i}", non_blocking=True) for i in range(device_count)]  # Pre-distribute x across GPUs

In [5]:
# # Create CUDA streams for parallel computations
# streams = [torch.cuda.Stream(device=i) for i in range(device_count)]

# # You can now preallocate result tensors outside of the function
# preallocated_results = [torch.zeros(A_chunk.shape[0], device=f"cuda:{i}", dtype=A_chunk.dtype) for i, A_chunk in enumerate(A_chunks)]

# # Define the distributed matvec function
# def distributed_matvec(A_chunks, x_chunks, streams, results):    
#     # Launch asynchronous matvec operations
#     ts = time.time()
#     for i, (A_chunk, x_chunk, stream) in enumerate(zip(A_chunks, x_chunks, streams)):
#         with torch.cuda.stream(stream):
#             results[i].copy_(A_chunk @ x_chunk)
#     print(f"Elapsed time for launching matvec operations = {time.time() - ts}")

#     # Synchronize all streams
#     ts = time.time()
#     for stream in streams:
#         stream.synchronize()
#     print(f"Elapsed time for synchronizing streams = {time.time() - ts}")

#     # Gather results from all devices
#     ts = time.time()
#     results = [result.cpu() for result in results]
#     print(f"Elapsed time for gathering results = {time.time() - ts}")

#     # Concatenate the results
#     ts = time.time()
#     result = torch.cat(results, dim=0).cuda(0)
#     print(f"Elapsed time for concatenating results = {time.time() - ts}")
#     return result

# # Perform the distributed matrix-vector multiplication
# ts = time.time()
# result = distributed_matvec(A_chunks, x_chunks, streams, preallocated_results)
# print(f"Elapsed time for distributed matvec = {time.time() - ts}")

# # Verify the result with the non-distributed version
# ts = time.time()
# expected_result = A @ x
# print(f"Elapsed time for non-distributed matvec = {time.time() - ts}")
# print(f"Results are close: {torch.allclose(result, expected_result)}")
# print(f"Max difference: {torch.abs(result - expected_result).max()}")

In [6]:
# Create CUDA streams for parallel computations
streams = [torch.cuda.Stream(device=i) for i in range(device_count)]

# Preallocate result tensors outside of the function
preallocated_results = [torch.zeros(A_chunk.shape[0], device=f"cuda:{i}", dtype=A_chunk.dtype) for i, A_chunk in enumerate(A_chunks)]

# Define the distributed matvec function with CUDA timing
def distributed_matvec(A_chunks, x_chunks, streams, results):
    start_event = [torch.cuda.Event(enable_timing=True) for _ in range(device_count)]
    end_event = [torch.cuda.Event(enable_timing=True) for _ in range(device_count)]
    
    # Launch asynchronous matvec operations
    for i, (A_chunk, x_chunk, stream, start, end, result) in enumerate(zip(A_chunks, x_chunks, streams, start_event, end_event, results)):
        with torch.cuda.stream(stream):
            start.record(stream)
            result.copy_(torch.matmul(A_chunk, x_chunk))
            end.record(stream)
    
    # Synchronize all streams
    # for stream in streams:
    #     stream.synchronize()
    torch.cuda.synchronize()
    
    # Gather results from all devices
    results_cpu = [result.cpu() for result in results]

    # Concatenate the results
    final_result = torch.cat(results_cpu, dim=0).cuda(0)

    # Calculate elapsed time for each GPU's operations
    elapsed_times = [start.elapsed_time(end) for start, end in zip(start_event, end_event)]
    total_time = sum(elapsed_times)

    return final_result, total_time

# Perform the distributed matrix-vector multiplication
ts = time.time()
result, gpu_time = distributed_matvec(A_chunks, x_chunks, streams, preallocated_results)
print(f"Elapsed time for distributed matvec = {time.time() - ts}")
print(f"GPU total time for matvec = {gpu_time} ms")

# Verify the result with the non-distributed version
ts = time.time()
expected_result = A @ x
print(f"Elapsed time for non-distributed matvec = {time.time() - ts}")
print(f"Results are close: {torch.allclose(result, expected_result)}")
print(f"Max difference: {torch.abs(result - expected_result).max()}")

Elapsed time for distributed matvec = 0.11754894256591797
GPU total time for matvec = 116.28220844268799 ms
Elapsed time for non-distributed matvec = 0.0003750324249267578
Results are close: True
Max difference: 0.0
