### Dataset Preprocessing with Dask

In [None]:
import os
import cv2
import torch
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import logging

import torch
import torch.nn as nn
import torchvision.models as models
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
import matplotlib.pyplot as plt
import numpy as np
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torch.optim import Adam
import torch.nn.functional as F

This cell defines the `LaneDatasetWithDask` class, a custom PyTorch dataset that utilizes **Dask** for delayed execution and parallelized preprocessing. It improves the data loading pipeline by optimizing image and segmentation preprocessing.

**Key Steps in the Code:**  
1. **Initialization:**  
   - Specifies paths for images and segmentation files based on training or testing mode.  
   - Loads and processes file lists into a structured format.

2. **Preprocessing:**  
   - **Images**: Loaded using OpenCV, resized, and converted to RGB format.  
   - **Segmentation Maps**: Loaded in grayscale, resized, and converted to binary segmentation.  

3. **Dask Integration:**  
   - Utilizes `delayed` and `compute` from Dask to parallelize image and segmentation preprocessing.  
   - Dask ensures that these preprocessing steps are efficient and optimized for multi-core execution.

4. **Error Handling:**  
   - Checks for missing or invalid image and segmentation files.  
   - Provides meaningful error messages for debugging.

**Why Dask?**  
Dask enhances the data loading process by splitting computations across multiple threads or processes, resulting in faster and more efficient dataset preparation. This is particularly useful for large-scale datasets like TuSimple.

**Output of the Code:**  
Returns individual samples in the dataset containing:  
- **Image tensor**  
- **Segmentation tensor**  
- **Instance segmentation tensor**  
- **Existence labels tensor**

This setup prepares the dataset for training or evaluation with Dask-powered preprocessing.!

In [None]:
import os
import cv2
import torch
import numpy as np
from torch.utils.data import Dataset
from dask import delayed, compute

class LaneDatasetWithDask(Dataset):
    def __init__(self, dataset_path='./dataset/TUSimple', train=True, size=(800, 360)):
        self._dataset_path = dataset_path
        self._mode = "train" if train else "test"
        self._image_size = size  # w, h
        self._data = []

        if self._mode == "train":
            file_path = "train_val_gt.txt"
        elif self._mode == "test":
            file_path = "test_gt.txt"
        
        # Process file list
        self._process_list(os.path.join(self._dataset_path, "train_set/seg_label/list", file_path))

    def __getitem__(self, idx):
        # Extract relative paths
        img_rel_path, seg_rel_path, exists = self._data[idx]

        # Correct the base paths
        img_path = os.path.join(self._dataset_path, "train_set", img_rel_path.lstrip('/'))
        seg_path = os.path.join(self._dataset_path, "train_set", seg_rel_path.lstrip('/'))

        # Validate paths
        if not os.path.exists(img_path):
            raise FileNotFoundError(f"Image file not found: {img_path}")
        if not os.path.exists(seg_path):
            raise FileNotFoundError(f"Segmentation file not found: {seg_path}")

        # Preprocess with Dask
        raw_image = self.preprocess_image(img_path)
        segmentation_results = self.preprocess_segmentation(seg_path)

        # Compute delayed tasks
        raw_image, segmentation_results = compute(raw_image, segmentation_results)

        # Unpack segmentation results
        ins_segmentation_image, segmentation_image = segmentation_results

        # Convert to tensors
        image_tensor = torch.from_numpy(raw_image).float().permute((2, 0, 1))
        segmentation_tensor = torch.from_numpy(segmentation_image).to(torch.int64)
        ins_segmentation_tensor = torch.from_numpy(ins_segmentation_image)

        exists_tensor = torch.as_tensor([int(i) for i in exists])

        return {
            "img": image_tensor,
            "segLabel": segmentation_tensor,
            "IsegLabel": ins_segmentation_tensor,
            "exist": exists_tensor,
        }

    def __len__(self):
        return len(self._data)

    def preprocess_image(self, img_path):
        """
        Delayed preprocessing for image files.
        """
        return delayed(self._load_and_resize_image)(img_path)

    def preprocess_segmentation(self, seg_path):
        """
        Delayed preprocessing for segmentation files.
        """
        return delayed(self._load_and_process_segmentation)(seg_path)

    def _load_and_resize_image(self, img_path):
        """
        Load and resize an image using OpenCV.
        """
        image = cv2.imread(img_path)
        if image is None:
            raise ValueError(f"Invalid image at path: {img_path}")
        image = cv2.resize(image, self._image_size, interpolation=cv2.INTER_LINEAR)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        return image

    def _load_and_process_segmentation(self, seg_path):
        """
        Load and process segmentation maps.
        """
        seg = cv2.imread(seg_path, cv2.IMREAD_GRAYSCALE)
        if seg is None:
            raise ValueError(f"Invalid segmentation at path: {seg_path}")
        seg_resized = cv2.resize(seg, self._image_size, interpolation=cv2.INTER_LINEAR)
        binary_seg = (seg_resized > 0).astype(np.int64)  # Binary segmentation
        return seg_resized, binary_seg

    def _process_list(self, file_path):
        """
        Parse file containing relative paths to images and segmentation maps.
        """
        with open(file_path) as f:
            for line in f:
                words = line.split()
                image = words[0]
                segmentation = words[1]
                exists = words[2:]
                self._data.append((image, segmentation, exists))




#### Setting Up Dask Distributed Client**
---

This cell initializes a **Dask Distributed Client**, which manages and coordinates distributed computation. It defines the number of workers, threads per worker, and memory allocation per worker for efficient task execution.

**Key Components in the Code:**  

1. **Client Initialization:**  
   - `n_workers=4`: Specifies the number of independent workers in the Dask cluster.
   - `threads_per_worker=2`: Allocates 2 threads per worker, optimizing parallelism.
   - `memory_limit='4GB'`: Limits memory usage per worker to 4 GB to prevent overloads and ensure efficient memory usage.

2. **Dashboard Link:**  
   - `client.dashboard_link`: Provides a link to the Dask dashboard, offering a real-time view of task execution, memory consumption, and cluster health.

**Why Dask Client?**  
The Dask client allows for:
- Coordinated distributed execution of tasks.  
- Dynamic scaling based on workload requirements.  
- Real-time monitoring and debugging through the dashboard.

**Output of the Code:**  
- A link to the Dask dashboard, where you can monitor worker activity, task graphs, and system resource usage.

**Instructions:**  
Click on the dashboard link to explore the performance metrics and workload distribution of your Dask cluster.


In [None]:
from dask.distributed import Client

client = Client(n_workers=4, threads_per_worker=2, memory_limit='4GB')
print(client.dashboard_link)  # Access the Dask dashboard

http://127.0.0.1:8787/status


In [None]:
train_dataset = LaneDatasetWithDask(train=True, size=(800, 360))

In [None]:
print(client.scheduler_info())

{'type': 'Scheduler', 'id': 'Scheduler-77ad0f12-8060-4575-bb7d-95218467d59d', 'address': 'tcp://127.0.0.1:42401', 'services': {'dashboard': 8787}, 'started': 1733620137.9999335, 'workers': {'tcp://127.0.0.1:37481': {'type': 'Worker', 'id': 1, 'host': '127.0.0.1', 'resources': {}, 'local_directory': '/tmp/dask-scratch-space/worker-ci1q8yrj', 'name': 1, 'nthreads': 2, 'memory_limit': 4000000000, 'last_seen': 1733620154.12777, 'services': {'dashboard': 39730}, 'metrics': {'task_counts': {}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'tick-duration': 0.49991273880004883, 'latency': 0.002624034881591797}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 0, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 0}, 'event_loop_interval': 0.019990134239196777, 'cpu': 6.0, 'memory': 125218816, 'time': 1733620153.627441, 'host_net_io': {'r

In [None]:
print(client.get_task_stream())  # Monitor task stream in real-time.


()


In [None]:
from torch.utils.data import DataLoader

# Create train and validation datasets
train_dataset = LaneDatasetWithDask(dataset_path="./dataset/TUSimple", train=True, size=(800, 360))
val_dataset = LaneDatasetWithDask(dataset_path="./dataset/TUSimple", train=False, size=(800, 360))
# Create DataLoader
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)


In [None]:
import os
print(os.getcwd())  # Ensure the current directory is correct
print(os.listdir('./dataset/TUSimple/train_set'))  # Check if files are visible


/home/pendem.mu/LaneDetect
['clips', 'label_data_0313.json', 'label_data_0531.json', 'label_data_0601.json', 'readme.md', 'seg_label']




#### Testing Dask Configurations for Training Performance**
---
 
This cell evaluates the impact of various configurations of **Dask workers** and **threads per worker** on training performance. It uses **Dask Distributed** for parallelizing tasks and monitors the time taken to process the dataset across multiple configurations.

---

#### **Key Steps in the Code:**

1. **Define Testing Parameters:**
   - `workers_range`: Range of numbers of Dask workers to evaluate (e.g., 8, 4, 2, 1).
   - `threads_range`: Threads per worker for parallelism (e.g., 1, 2, 4).

2. **Test Function:**  
   - `test_config`: Tests a specific configuration by:
     - Initializing a Dask client with specified workers and threads.
     - Measuring the time taken for training batches.
     - Saving a **performance report** (`performance_report_{n_workers}w_{threads_per_worker}t.html`) for analysis.

3. **Run All Configurations:**  
   Iterates over combinations of `workers_range` and `threads_range`:
   - Prints the configuration being tested.
   - Appends the elapsed time to `results`.

4. **Visualization:**  
   - **Heatmap:** Represents the performance data:
     - X-axis: Threads per worker.
     - Y-axis: Number of workers.
     - Color: Time taken for processing (lower is better).

5. **Save and Display Heatmap:**  
   - Saves the heatmap as `training_performance_heatmap.png` for later reference.
   - Displays the heatmap for immediate analysis.

---

#### **Expected Outputs:**
1. A heatmap showing:
   - The impact of increasing workers and threads per worker.
   - Optimal configurations (minimum time taken).
2. Individual performance reports for each configuration:
   - Detailed insights into task scheduling and resource usage.

---

**Instructions for Analysis:**
- Review the heatmap to identify the best-performing configuration.
- Open the saved **performance reports** (`HTML` files) to understand task distribution and bottlenecks.
- Experiment with configurations based on results to optimize performance further.



### Explanation of Warnings

#### **Garbage Collection Warning**
- **Message**: `full garbage collections took XX% CPU time recently (threshold: 10%)`
- **Cause**: Python's garbage collector is spending significant time cleaning up unused memory. This typically happens when large temporary objects are frequently created and destroyed.

#### **RecursionError**
- **Message**: `maximum recursion depth exceeded`
- **Cause**: Deeply nested data structures (e.g., lists of lists) or recursive operations exceed Python's default recursion limit.

In [None]:
import os
import time
import matplotlib.pyplot as plt
from dask.distributed import Client, performance_report
from tqdm import tqdm
import torch

# Define the range of workers, threads, and chunk sizes for testing
workers_range = [8, 4, 2, 1]
threads_range = [1, 2, 4]
chunk_sizes = [256, 512, 1024, 2048]

# Results storage
results = []

# Create output directories for dashboard and visualizations
output_dir = "dashboard_results_dask"
os.makedirs(output_dir, exist_ok=True)

def test_dask_config(n_workers, threads_per_worker, chunk_size, total_memory_limit=128):
    """
    Test Dask configuration dynamically allocating memory per worker and thread.

    Args:
        n_workers (int): Number of Dask workers.
        threads_per_worker (int): Threads per Dask worker.
        chunk_size (int): Dataset chunk size.
        total_memory_limit (int): Total memory limit in GB.
    """
    # Calculate memory per worker based on total memory
    memory_per_worker = total_memory_limit // (n_workers * threads_per_worker)
    print(f"Testing: Workers={n_workers}, Threads={threads_per_worker}, Chunk Size={chunk_size}, Memory={memory_per_worker}GB per worker")

    client = None  # Initialize client
    try:
        # Initialize Dask client
        client = Client(n_workers=n_workers, threads_per_worker=threads_per_worker, memory_limit=f"{memory_per_worker}GB")
        report_path = os.path.join(output_dir, f"report_{n_workers}w_{threads_per_worker}t_{chunk_size}c.html")

        with performance_report(filename=report_path):
            start_time = time.time()
            train_dataset = LaneDatasetWithDask(dataset_path="./dataset/TUSimple", train=True, size=(800, 360))
            for idx in tqdm(range(len(train_dataset)), desc=f"Processing with {n_workers}w, {threads_per_worker}t"):
                _ = train_dataset[idx]  # Simulate loading each data point
            total_time = time.time() - start_time

        print(f"Completed in {total_time:.2f} seconds")
        results.append((n_workers, threads_per_worker, chunk_size, memory_per_worker, total_time))
    except Exception as e:
        print(f"Error: {e}")
        results.append((n_workers, threads_per_worker, chunk_size, memory_per_worker, float('inf')))
    finally:
        if client is not None:
            client.close()


# Iterate over configurations and test
for n_workers in workers_range:
    for threads_per_worker in threads_range:
        for chunk_size in chunk_sizes:
            test_dask_config(n_workers, threads_per_worker, chunk_size, total_memory_limit=128)


# Convert results into a heatmap-friendly format
heatmap_data = {}
for n_workers, threads_per_worker, chunk_size, memory_limit, elapsed_time in results:
    key = (n_workers, threads_per_worker)
    if key not in heatmap_data:
        heatmap_data[key] = {}
    heatmap_data[key][chunk_size] = elapsed_time

# Generate heatmaps for each chunk size
for chunk_size in chunk_sizes:
    plt.figure(figsize=(10, 6))
    data_matrix = [
        [heatmap_data.get((w, t), {}).get(chunk_size, float('inf')) for t in threads_range]
        for w in workers_range
    ]

    plt.imshow(data_matrix, cmap="viridis", aspect="auto")
    plt.colorbar(label="Time (seconds)")
    plt.xticks(range(len(threads_range)), labels=threads_range, fontsize=10)
    plt.yticks(range(len(workers_range)), labels=workers_range, fontsize=10)
    plt.xlabel("Threads per Worker", fontsize=12)
    plt.ylabel("Number of Workers", fontsize=12)
    plt.title(f"Performance: Chunk Size {chunk_size}", fontsize=14)

    heatmap_path = os.path.join(output_dir, f"performance_chunk_{chunk_size}.png")
    plt.savefig(heatmap_path)
    print(f"Saved heatmap for chunk size {chunk_size}: {heatmap_path}")
    plt.show()

Testing: Workers=8, Threads=1, Chunk Size=256, Memory=16GB per worker


Perhaps you already have a cluster running?
Hosting the HTTP server on port 33140 instead
Processing with 8w, 1t: 100%|██████████| 3626/3626 [05:56<00:00, 10.17it/s]
Traceback (most recent call last):
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/distributed/sizeof.py", line 17, in safe_sizeof
    return sizeof(obj)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 96, in sizeof_python_dict
    + sizeof(list(d.values()))
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 59, in sizeof_python_collection
    return sys.getsizeof(seq) + sum(map(siz

Completed in 356.43 seconds
Testing: Workers=8, Threads=1, Chunk Size=512, Memory=16GB per worker


Perhaps you already have a cluster running?
Hosting the HTTP server on port 33554 instead
Processing with 8w, 1t: 100%|██████████| 3626/3626 [05:53<00:00, 10.25it/s]
Traceback (most recent call last):
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/distributed/sizeof.py", line 17, in safe_sizeof
    return sizeof(obj)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 96, in sizeof_python_dict
    + sizeof(list(d.values()))
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 59, in sizeof_python_collection
    return sys.getsizeof(seq) + sum(map(siz

Completed in 353.71 seconds
Testing: Workers=8, Threads=1, Chunk Size=1024, Memory=16GB per worker


Perhaps you already have a cluster running?
Hosting the HTTP server on port 34687 instead
Processing with 8w, 1t: 100%|██████████| 3626/3626 [06:36<00:00,  9.15it/s]
Traceback (most recent call last):
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/distributed/sizeof.py", line 17, in safe_sizeof
    return sizeof(obj)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 96, in sizeof_python_dict
    + sizeof(list(d.values()))
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 59, in sizeof_python_collection
    return sys.getsizeof(seq) + sum(map(siz

Completed in 396.46 seconds
Testing: Workers=8, Threads=1, Chunk Size=2048, Memory=16GB per worker


Perhaps you already have a cluster running?
Hosting the HTTP server on port 38954 instead
Processing with 8w, 1t: 100%|██████████| 3626/3626 [06:37<00:00,  9.13it/s]
Traceback (most recent call last):
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/distributed/sizeof.py", line 17, in safe_sizeof
    return sizeof(obj)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 96, in sizeof_python_dict
    + sizeof(list(d.values()))
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 59, in sizeof_python_collection
    return sys.getsizeof(seq) + sum(map(siz

Completed in 397.07 seconds
Testing: Workers=8, Threads=2, Chunk Size=256, Memory=8GB per worker


Perhaps you already have a cluster running?
Hosting the HTTP server on port 42406 instead
Processing with 8w, 2t: 100%|██████████| 3626/3626 [06:01<00:00, 10.02it/s]
Traceback (most recent call last):
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/distributed/sizeof.py", line 17, in safe_sizeof
    return sizeof(obj)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 96, in sizeof_python_dict
    + sizeof(list(d.values()))
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 59, in sizeof_python_collection
    return sys.getsizeof(seq) + sum(map(siz

Completed in 361.79 seconds
Testing: Workers=8, Threads=2, Chunk Size=512, Memory=8GB per worker


Perhaps you already have a cluster running?
Hosting the HTTP server on port 40371 instead
Processing with 8w, 2t: 100%|██████████| 3626/3626 [05:51<00:00, 10.31it/s]
Traceback (most recent call last):
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/distributed/sizeof.py", line 17, in safe_sizeof
    return sizeof(obj)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 96, in sizeof_python_dict
    + sizeof(list(d.values()))
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 59, in sizeof_python_collection
    return sys.getsizeof(seq) + sum(map(siz

Completed in 351.55 seconds
Testing: Workers=8, Threads=2, Chunk Size=1024, Memory=8GB per worker


Perhaps you already have a cluster running?
Hosting the HTTP server on port 33839 instead
Processing with 8w, 2t: 100%|██████████| 3626/3626 [06:07<00:00,  9.86it/s]
Traceback (most recent call last):
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/distributed/sizeof.py", line 17, in safe_sizeof
    return sizeof(obj)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 96, in sizeof_python_dict
    + sizeof(list(d.values()))
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 59, in sizeof_python_collection
    return sys.getsizeof(seq) + sum(map(siz

Completed in 367.83 seconds
Testing: Workers=8, Threads=2, Chunk Size=2048, Memory=8GB per worker


Perhaps you already have a cluster running?
Hosting the HTTP server on port 42079 instead
Processing with 8w, 2t: 100%|██████████| 3626/3626 [06:08<00:00,  9.83it/s]
Traceback (most recent call last):
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/distributed/sizeof.py", line 17, in safe_sizeof
    return sizeof(obj)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 96, in sizeof_python_dict
    + sizeof(list(d.values()))
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 59, in sizeof_python_collection
    return sys.getsizeof(seq) + sum(map(siz

Completed in 368.88 seconds
Testing: Workers=8, Threads=4, Chunk Size=256, Memory=4GB per worker


Perhaps you already have a cluster running?
Hosting the HTTP server on port 45506 instead
Processing with 8w, 4t: 100%|██████████| 3626/3626 [05:57<00:00, 10.14it/s]
Traceback (most recent call last):
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/distributed/sizeof.py", line 17, in safe_sizeof
    return sizeof(obj)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 96, in sizeof_python_dict
    + sizeof(list(d.values()))
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 59, in sizeof_python_collection
    return sys.getsizeof(seq) + sum(map(siz

Completed in 357.47 seconds
Testing: Workers=8, Threads=4, Chunk Size=512, Memory=4GB per worker


Perhaps you already have a cluster running?
Hosting the HTTP server on port 41876 instead
Processing with 8w, 4t: 100%|██████████| 3626/3626 [06:02<00:00, 10.00it/s]
Traceback (most recent call last):
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/distributed/sizeof.py", line 17, in safe_sizeof
    return sizeof(obj)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 96, in sizeof_python_dict
    + sizeof(list(d.values()))
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 59, in sizeof_python_collection
    return sys.getsizeof(seq) + sum(map(siz

Completed in 362.59 seconds
Testing: Workers=8, Threads=4, Chunk Size=1024, Memory=4GB per worker


Perhaps you already have a cluster running?
Hosting the HTTP server on port 42207 instead
Processing with 8w, 4t: 100%|██████████| 3626/3626 [05:59<00:00, 10.09it/s]
Traceback (most recent call last):
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/distributed/sizeof.py", line 17, in safe_sizeof
    return sizeof(obj)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 96, in sizeof_python_dict
    + sizeof(list(d.values()))
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 59, in sizeof_python_collection
    return sys.getsizeof(seq) + sum(map(siz

Completed in 359.47 seconds
Testing: Workers=8, Threads=4, Chunk Size=2048, Memory=4GB per worker


Perhaps you already have a cluster running?
Hosting the HTTP server on port 41769 instead
Processing with 8w, 4t: 100%|██████████| 3626/3626 [06:09<00:00,  9.81it/s]
Traceback (most recent call last):
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/distributed/sizeof.py", line 17, in safe_sizeof
    return sizeof(obj)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 96, in sizeof_python_dict
    + sizeof(list(d.values()))
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 59, in sizeof_python_collection
    return sys.getsizeof(seq) + sum(map(siz

Completed in 369.63 seconds
Testing: Workers=4, Threads=1, Chunk Size=256, Memory=32GB per worker


Perhaps you already have a cluster running?
Hosting the HTTP server on port 36345 instead
Processing with 4w, 1t: 100%|██████████| 3626/3626 [06:22<00:00,  9.49it/s]
Traceback (most recent call last):
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/distributed/sizeof.py", line 17, in safe_sizeof
    return sizeof(obj)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 96, in sizeof_python_dict
    + sizeof(list(d.values()))
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 59, in sizeof_python_collection
    return sys.getsizeof(seq) + sum(map(siz

Completed in 382.24 seconds
Testing: Workers=4, Threads=1, Chunk Size=512, Memory=32GB per worker


Perhaps you already have a cluster running?
Hosting the HTTP server on port 36047 instead
Processing with 4w, 1t: 100%|██████████| 3626/3626 [06:31<00:00,  9.27it/s]
Traceback (most recent call last):
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/distributed/sizeof.py", line 17, in safe_sizeof
    return sizeof(obj)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 96, in sizeof_python_dict
    + sizeof(list(d.values()))
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 59, in sizeof_python_collection
    return sys.getsizeof(seq) + sum(map(siz

Completed in 391.34 seconds
Testing: Workers=4, Threads=1, Chunk Size=1024, Memory=32GB per worker


Perhaps you already have a cluster running?
Hosting the HTTP server on port 35147 instead
Processing with 4w, 1t: 100%|██████████| 3626/3626 [06:34<00:00,  9.20it/s]
Traceback (most recent call last):
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/distributed/sizeof.py", line 17, in safe_sizeof
    return sizeof(obj)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 96, in sizeof_python_dict
    + sizeof(list(d.values()))
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/pendem.mu/.conda/envs/pytorch_env_new/lib/python3.9/site-packages/dask/sizeof.py", line 59, in sizeof_python_collection
    return sys.getsizeof(seq) + sum(map(siz

Completed in 394.11 seconds
Testing: Workers=4, Threads=1, Chunk Size=2048, Memory=32GB per worker


Perhaps you already have a cluster running?
Hosting the HTTP server on port 42473 instead
Processing with 4w, 1t: 100%|██████████| 3626/3626 [06:13<00:00,  9.70it/s]