# GENERAL EXPLORATION OF MEMORY PINNING, NON BLOCKING SCHEDULING, and PARALLELISM ON ACCELERATION

In [1]:
%pip install torch 



import libraries:

In [2]:
import time 

import torch 
from torch.utils.data import DataLoader, Dataset

check whether the GPU is available: 

In [3]:
print("GPU is available" if torch.cuda.is_available() else "GPU not available")

GPU is available


##### we only use synthetic data to isolate and measure the impact of host/device transfer (pinned memory and non blocking concepts)

In [None]:
class SyntheticData(Dataset):
    def __init__(self, size=1000000, feature_dim=1024):
        self.data = torch.randn(size, feature_dim)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]


now we define a function to mesure the HOST/DEVICE transfer time: 

In [5]:
def measure_iteration(device, dataloader, non_blocking=False):
    transfer_times = []
    total_times = []

    for batch in dataloader:
        start_total = time.perf_counter()

        # Measure transfer
        torch.cuda.synchronize()
        start_transfer = time.perf_counter()
        batch = batch.to(device, non_blocking=non_blocking)
        torch.cuda.synchronize()
        end_transfer = time.perf_counter()

        #measure total time (to see that Non Blocking and number of workers has no effect here)
        end_total = time.perf_counter()

        transfer_times.append(end_transfer - start_transfer)
        total_times.append(end_total - start_total)

    return {
        "avg_transfer_time": sum(transfer_times) / len(transfer_times),
        "avg_total_time": sum(total_times) / len(total_times),
    }


#### we test several combinations of number of workers, whether or not to pin memory, and wether or not enable non blocking behavior:

In [6]:
dataset = SyntheticData()
batch_size = 256

configs = [
    #no parallelism
    {"num_workers": 0, "pin_memory": False, "non_blocking": False},
    {"num_workers": 0, "pin_memory": False, "non_blocking": True},
    {"num_workers": 0 , "pin_memory": True,  "non_blocking": False},
    {"num_workers": 0, "pin_memory": True,  "non_blocking": True},

    #num of workers suggested by torch in this device
    {"num_workers": 2, "pin_memory": False, "non_blocking": False},
    {"num_workers": 2, "pin_memory": False, "non_blocking": True},
    {"num_workers": 2 , "pin_memory": True,  "non_blocking": False},
    {"num_workers": 2, "pin_memory": True,  "non_blocking": True},
]


In [7]:
results = []

for cfg in configs:
        loader = DataLoader(
            dataset,
            batch_size=batch_size,
            num_workers=cfg["num_workers"],
            pin_memory=cfg["pin_memory"],
        )
        metrics = measure_iteration(torch.device("cuda"), loader, non_blocking=cfg["non_blocking"])
        results.append((cfg, metrics))


### We obtain the following results: 

In [8]:
for cfg, metrics in results:
    print("Configuration:", cfg)
    for k, v in metrics.items():
        print(f"  {k}: {round(v*1e3, 2)} micro seconds")
    print("-" * 40)


Configuration: {'num_workers': 0, 'pin_memory': False, 'non_blocking': False}
  avg_transfer_time: 0.36 micro seconds
  avg_total_time: 0.45 micro seconds
----------------------------------------
Configuration: {'num_workers': 0, 'pin_memory': False, 'non_blocking': True}
  avg_transfer_time: 0.33 micro seconds
  avg_total_time: 0.36 micro seconds
----------------------------------------
Configuration: {'num_workers': 0, 'pin_memory': True, 'non_blocking': False}
  avg_transfer_time: 0.14 micro seconds
  avg_total_time: 0.16 micro seconds
----------------------------------------
Configuration: {'num_workers': 0, 'pin_memory': True, 'non_blocking': True}
  avg_transfer_time: 0.12 micro seconds
  avg_total_time: 0.15 micro seconds
----------------------------------------
Configuration: {'num_workers': 2, 'pin_memory': False, 'non_blocking': False}
  avg_transfer_time: 0.74 micro seconds
  avg_total_time: 0.79 micro seconds
----------------------------------------
Configuration: {'num_wor

#### as we can see, the most acceleration effect in this case comes from **memory pinning** 
as it's directly related to HOST2DEVICE transfer of data, so total time is the same as transfer time. 
memory pinning means pinning the necessary data physical adresses in memory (no paging), so OS cannot move it nor swap it to disk.

**NOTE:** we can see that Non Blocking has no significant effect in this case as we don't have any GPU computations. the same goes for the Number of Workers (which in fact harmed acceleration) as we're not doing any preprocessing or loading of data from disk (because we're using synthetic data that's already in memory)

#### in the following cell, we will add some GPU Computation to remarque the effect of Non Blocking argument: 

In [9]:
def measure_iteration(device, dataloader, non_blocking=False):
    compute_times = []
    total_times = []

    for batch in dataloader:
        start_total = time.perf_counter()

        batch = batch.to(device, non_blocking=non_blocking)

        #add some GPU Computation 
        torch.cuda.synchronize()
        start_compute = time.perf_counter()
        batch @ (batch.T)**8
        torch.cuda.synchronize()
        end_compute = time.perf_counter()

        end_total = time.perf_counter()

        compute_times.append(end_compute - start_compute)
        total_times.append(end_total - start_total)

    return {
        "avg_GPU_time": sum(compute_times) / len(compute_times),
        "avg_total_time": sum(total_times) / len(total_times),
    }



configs = [
    #no parallelism
    {"num_workers": 0, "pin_memory": False, "non_blocking": False},
    {"num_workers": 0, "pin_memory": False, "non_blocking": True},
    {"num_workers": 0 , "pin_memory": True,  "non_blocking": False},
    {"num_workers": 0, "pin_memory": True,  "non_blocking": True},

    #num of workers suggested by torch in this device
    {"num_workers": 2, "pin_memory": False, "non_blocking": False},
    {"num_workers": 2, "pin_memory": False, "non_blocking": True},
    {"num_workers": 2 , "pin_memory": True,  "non_blocking": False},
    {"num_workers": 2, "pin_memory": True,  "non_blocking": True},
]


esults = []

for cfg in configs:
        loader = DataLoader(
            dataset,
            batch_size=batch_size,
            num_workers=cfg["num_workers"],
            pin_memory=cfg["pin_memory"],
        )
        metrics = measure_iteration(torch.device("cuda"), loader, non_blocking=cfg["non_blocking"])
        results.append((cfg, metrics))

for cfg, metrics in results:
    print("Configuration:", cfg)
    for k, v in metrics.items():
        print(f"  {k}: {round(v*1e3, 2)} micro seconds")
    print("-" * 40)



Configuration: {'num_workers': 0, 'pin_memory': False, 'non_blocking': False}
  avg_transfer_time: 0.36 micro seconds
  avg_total_time: 0.45 micro seconds
----------------------------------------
Configuration: {'num_workers': 0, 'pin_memory': False, 'non_blocking': True}
  avg_transfer_time: 0.33 micro seconds
  avg_total_time: 0.36 micro seconds
----------------------------------------
Configuration: {'num_workers': 0, 'pin_memory': True, 'non_blocking': False}
  avg_transfer_time: 0.14 micro seconds
  avg_total_time: 0.16 micro seconds
----------------------------------------
Configuration: {'num_workers': 0, 'pin_memory': True, 'non_blocking': True}
  avg_transfer_time: 0.12 micro seconds
  avg_total_time: 0.15 micro seconds
----------------------------------------
Configuration: {'num_workers': 2, 'pin_memory': False, 'non_blocking': False}
  avg_transfer_time: 0.74 micro seconds
  avg_total_time: 0.79 micro seconds
----------------------------------------
Configuration: {'num_wor

#### As we can see, Non Blocking only enhances acceleration in case of presence of GPU operations, 
**NOTE:** Non Blocking requires Memory Pinning in order to enhance acceleration in case of present GPU computations

it does so by allowing the CPU not to wait for the host/device transfer to finish before scheduling GPU operations, however, this requires no OS interventions during transfer and also fix data memory adresses, and these requirements are only satisfied when we use Memory Pinning. 

Furthermore, with synthetic data, increasing num_workers does not improve performance, because there's no I/O bound task or preprocessing as data is already in memory.
To showcase this, we will modify the SyntheticData class to introduce artificial CPU preprocessing and I/O latency.

In [15]:
class SyntheticIODataset(Dataset):
    def __init__(
    self,
    size=100_000,
    feature_dim=1024,
    io_delay=0.002, # simulate I/O bound task
    cpu_work=True   #enable CPU preprocessing
    ):
        self.data = torch.randn(size, feature_dim)
        self.io_delay = io_delay
        self.cpu_work = cpu_work


    def __len__(self):
        return self.data.shape[0]



    def __getitem__(self, idx):
        # simulate I/O-bound latency 
        if self.io_delay > 0:
            time.sleep(self.io_delay)

        x = self.data[idx]

        #simulate CPU-bound preprocessing
        if self.cpu_work:
            #reshape: 1D to 2D 
            x = x.view(32, 32)

            #typical CPU preprocessing ops
            x = (x - x.mean()) / (x.std() + 1e-6)

            #force memory access + compute
            x = x * 1.1 + 0.1

            #force a copy for more real simulation
            x = x.contiguous().view(-1)

        return x

we will now define the function that will help us do the measurements

In [18]:

def benchmark_dataloader(
    dataset,
    batch_size=256,
    num_workers=0,
    pin_memory=False,
    device="cpu",
    num_batches=100,
    warmup_batches=10,
):
    loader = DataLoader(
        dataset,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        shuffle=False
    )

    it = iter(loader)

    for _ in range(warmup_batches):
        batch = next(it)
        if device == "cuda":
            batch = batch.to(device, non_blocking=pin_memory)

    if device == "cuda":
        torch.cuda.synchronize()

    start = time.perf_counter()

    for _ in range(num_batches):
        batch = next(it)
        if device == "cuda":
            batch = batch.to(device, non_blocking=pin_memory)

    if device == "cuda":
        torch.cuda.synchronize()

    end = time.perf_counter()

    total_time = end - start
    samples = num_batches * batch_size

    return {
        "num_workers": num_workers,
        "pin_memory": pin_memory,
        "device": device,
        "avg_iter_time_ms": (total_time / num_batches) * 1e3,
        "throughput_samples_per_sec": samples / total_time,
    }


dataset = SyntheticIODataset(
    io_delay=0.002,
    cpu_work=True
)



In [19]:
if torch.cuda.is_available():
    dataset = SyntheticIODataset(
        io_delay=0.002,
        cpu_work=True
    )

    gpu_results = []

    for nw in [0, 2, 4, 8]:
        res = benchmark_dataloader(
            dataset,
            batch_size=256,
            num_workers=nw,
            pin_memory=True,
            device="cuda"
        )
        gpu_results.append(res)

    for r in gpu_results:
        print(r)


{'num_workers': 0, 'pin_memory': True, 'device': 'cuda', 'avg_iter_time_ms': 572.8698136900016, 'throughput_samples_per_sec': 446.87290878714356}
{'num_workers': 2, 'pin_memory': True, 'device': 'cuda', 'avg_iter_time_ms': 291.3731124500009, 'throughput_samples_per_sec': 878.598570223013}
{'num_workers': 4, 'pin_memory': True, 'device': 'cuda', 'avg_iter_time_ms': 146.57597732000113, 'throughput_samples_per_sec': 1746.5344913996855}
{'num_workers': 8, 'pin_memory': True, 'device': 'cuda', 'avg_iter_time_ms': 70.49252318000072, 'throughput_samples_per_sec': 3631.590819161219}


### as we can see, Increasing the number of workers improves throughput by parallelizing data preparation and hiding latency.
 this confirms that num_workers optimizes the DATA LOADING AND PREPROCESSING stage rather than memory transfer or GPU computation, in contrast to before having any I/O or CPU preprocessing where adding parallelism increased latency.