# Distributed PyTorch Operations with nbdistributed

This notebook demonstrates distributed all_gather operations across multiple devices using nbdistributed package.

## Setup Overview
- **Coordinator (Mac)**: Runs this notebook and coordinates operations
- **Worker (Windows RTX 2080)**: Provides GPU computation power

## Requirements
- Both machines must be on the same network
- Windows machine should have CUDA-enabled PyTorch installed
- nbdistributed package installed on both machines


In [1]:
# Install required packages
%pip install nbdistributed torch torchvision


Note: you may need to restart the kernel to use updated packages.


In [None]:
# Controller-only: start rendezvous store (TCPStore) on the Mac
# This does NOT join the training process group.
from torch.distributed import TCPStore
import socket, time

# Detect this Mac's LAN IP for copy/paste on Windows
def get_local_ip():
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
            s.connect(("8.8.8.8", 80))
            return s.getsockname()[0]
    except Exception:
        return socket.gethostbyname(socket.gethostname())

LOCAL_IP = get_local_ip()
print(f"📋 Copy this Mac IP for Windows: {LOCAL_IP}")

# Start TCPStore listening on all interfaces
MAC_BIND = "0.0.0.0"
PORT = 12355
WORLD_SIZE = 2       # number of Windows worker processes that will join

store = TCPStore(host_name=LOCAL_IP, port=PORT, world_size=WORLD_SIZE, is_master=True, use_libuv=False)
print(f"✅ TCPStore up on {MAC_BIND}:{PORT}, expecting {WORLD_SIZE} workers")
print("Keep this cell running while workers start… (Ctrl+C to stop)")

try:
    while True:
        time.sleep(3600)
except KeyboardInterrupt:
    print("🛑 TCPStore stopped")


📋 Copy this Mac IP for Windows: 192.168.29.234
✅ TCPStore up on 0.0.0.0:12355, expecting 2 workers
Keep this cell running while workers start… (Ctrl+C to stop)


In [None]:
# Fixed imports and setup (removing problematic nbdistributed decorator)
import torch
import torch.distributed as dist
import os
import socket
import time

# Check if CUDA is available on this machine (Mac)
print(f"CUDA available on Mac: {torch.cuda.is_available()}")
print(f"Mac will run as CPU coordinator (Rank 0)")

# Check for Mac's Metal GPU support
if hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
    print("Mac Metal Performance Shaders (MPS) available for local operations")
    mac_device = "mps"
else:
    print("Using CPU for Mac operations")
    mac_device = "cpu"

print(f"Mac device: {mac_device}")

# Get local IP address automatically from network interface
def get_local_ip():
    """Get the actual local IP address (not localhost)"""
    try:
        # Connect to a remote address to determine which interface to use
        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
            # Connect to Google DNS (doesn't actually send data)
            s.connect(("8.8.8.8", 80))
            local_ip = s.getsockname()[0]
        return local_ip
    except Exception:
        # Fallback method
        import subprocess
        try:
            # Get IP from ifconfig (macOS/Linux)
            result = subprocess.run(['ifconfig'], capture_output=True, text=True)
            for line in result.stdout.split('\n'):
                if 'inet ' in line and '127.0.0.1' not in line and 'inet 169.254' not in line:
                    ip = line.split('inet ')[1].split(' ')[0]
                    if ip.startswith('192.168.') or ip.startswith('10.') or ip.startswith('172.'):
                        return ip
        except:
            pass
        return "192.168.1.1"  # Last resort fallback

local_ip = get_local_ip()
hostname = socket.gethostname()

print(f"🔍 Auto-detected Mac IP (Master): {local_ip}")
print(f"📱 Hostname: {hostname}")
print(f"✅ Using auto-detected IP address (not hostname)")
print("\nSetup: Mac (CPU/MPS coordinator) + Windows (CUDA GPU worker)")


In [None]:
# Distributed configuration
MASTER_ADDR = local_ip  # This Mac will be the master
MASTER_PORT = "12355"   # Port for communication
WORLD_SIZE = 2          # Total number of processes (Mac + Windows)
RANK = 0               # This machine's rank (master = 0)

print(f"Master Address (this Mac): {MASTER_ADDR}")
print(f"Master Port: {MASTER_PORT}")
print(f"World Size: {WORLD_SIZE}")
print(f"This machine's rank: {RANK}")

# Set environment variables for distributed training
os.environ['MASTER_ADDR'] = MASTER_ADDR
os.environ['MASTER_PORT'] = MASTER_PORT
os.environ['WORLD_SIZE'] = str(WORLD_SIZE)
os.environ['RANK'] = str(RANK)

print(f"✅ Mac configured as master node at {MASTER_ADDR}:{MASTER_PORT}")
print(f"🔍 Environment check:")
print(f"   MASTER_ADDR = {os.environ.get('MASTER_ADDR')}")
print(f"   MASTER_PORT = {os.environ.get('MASTER_PORT')}")
print(f"⚠️  MUST be IP address, NOT hostname!")
print(f"ℹ️  Waiting for Windows worker to connect...")


In [None]:
# VERIFY IP-only setup (run this BEFORE starting Windows worker)
def verify_ip_setup():
    """Verify that we're using IP addresses, not hostnames"""
    print("🔍 IP Setup Verification:")
    print(f"✅ Auto-detected IP: {local_ip}")
    print(f"✅ MASTER_ADDR will be: {local_ip}")
    
    # Check that it's a valid IP format
    import re
    ip_pattern = r'^(\d{1,3}\.){3}\d{1,3}$'
    if re.match(ip_pattern, local_ip):
        print(f"✅ Valid IP format: {local_ip}")
    else:
        print(f"❌ Invalid IP format: {local_ip}")
        return False
    
    # Test if port is available
    import socket
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.bind((local_ip, 12355))
            print(f"✅ Port 12355 available on {local_ip}")
        return True
    except Exception as e:
        print(f"❌ Port 12355 not available: {e}")
        return False

# Run verification
verify_ip_setup()


In [None]:
# Initialize distributed process group (run this after Windows worker is ready)
def initialize_distributed():
    """Initialize the distributed environment with FORCED IP (no hostname)"""
    try:
        backend = 'gloo'
        
        # COMPLETELY CLEAR any hostname-related environment variables
        hostname_vars = ['HOSTNAME', 'HOST', 'COMPUTERNAME']
        for var in hostname_vars:
            if var in os.environ:
                del os.environ[var]
        
        # FORCE IP address - multiple methods to ensure no hostname resolution
        os.environ['MASTER_ADDR'] = local_ip
        os.environ['MASTER_PORT'] = '12355'
        os.environ['WORLD_SIZE'] = '2'
        os.environ['RANK'] = '0'
        
        # Clear any problematic gloo settings
        gloo_vars = ['GLOO_SOCKET_IFNAME', 'NCCL_SOCKET_IFNAME', 'GLOO_DEVICE_TRANSPORT', 'GLOO_SOCKET_FAMILY']
        for var in gloo_vars:
            if var in os.environ:
                del os.environ[var]
        
        print(f"🔧 FORCING IP-only connection:")
        print(f"   MASTER_ADDR = {os.environ['MASTER_ADDR']}")
        print(f"   NO hostname resolution allowed!")
        
        print(f"🔄 Initializing distributed training with backend: {backend}")
        
        # Method 1: Use environment variables only (more reliable on Mac)
        print(f"🔄 Using environment variable method...")
        
        dist.init_process_group(
            backend=backend, 
            timeout=torch.distributed.default_pg_timeout
        )
        print("✅ Environment method successful!")
        
        rank = dist.get_rank()
        world_size = dist.get_world_size()
        
        print(f"✅ Distributed initialized!")
        print(f"Rank: {rank}, World size: {world_size}")
        print("🖥️  This is the Mac coordinator (Rank 0)")
        
        return backend
        
    except Exception as e:
        print(f"❌ Failed to initialize distributed: {e}")
        print("🔍 Error details:")
        print(f"   Exception type: {type(e).__name__}")
        print(f"   Exception message: {str(e)}")
        print("\n💡 Troubleshooting:")
        print("1. Ensure Windows worker is running first")
        print("2. Check that Windows worker connected to the correct IP")
        print("3. Verify firewall settings on both machines")
        return None

# Don't run this yet - wait for Windows worker
print("⚠️  Ready to run coordinator")


In [None]:
# START MAC COORDINATOR - Run this to start listening for Windows worker
print("🚀 Starting Mac coordinator...")
backend = initialize_distributed()

if backend:
    print("🎉 SUCCESS! Mac coordinator is running!")
    print("✅ Now start the Windows worker - it will connect automatically!")
    print("🧪 Ready for distributed operations!")
else:
    print("❌ Failed to start coordinator.")

In [None]:
import torch
import torch.distributed as dist
from nbdistributed import make_distributed
import os
import socket

# Check if CUDA is available on this machine (Mac)
print(f"CUDA available on Mac: {torch.cuda.is_available()}")
print(f"Mac will run as CPU coordinator (Rank 0)")

# Check for Mac's Metal GPU support (optional)
if hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
    print("Mac Metal Performance Shaders (MPS) available for local operations")
    mac_device = "mps"
else:
    print("Using CPU for Mac operations")
    mac_device = "cpu"

print(f"Mac device: {mac_device}")

# Get local IP address for network setup
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
print(f"Mac IP (Master): {local_ip}")
print(f"Hostname: {hostname}")
print("\nSetup: Mac (CPU/MPS coordinator) + Windows (CUDA GPU worker)")


In [None]:
# START MAC COORDINATOR - Run this to start listening for Windows worker
print("🚀 Starting Mac coordinator...")
backend = initialize_distributed()

if backend:
    print("🎉 SUCCESS! Mac coordinator is running!")
    print("✅ Now start the Windows worker - it will connect automatically!")
    print("🧪 Ready for distributed operations!")
else:
    print("❌ Failed to start coordinator.")


In [None]:
# Test distributed all_gather operations
def test_all_gather_basic():
    """Test basic all_gather operation across distributed nodes"""
    if not dist.is_initialized():
        print("❌ Distributed not initialized! Call initialize_distributed() first.")
        return None
    
    rank = dist.get_rank()
    world_size = dist.get_world_size()
    
    # Device allocation based on rank and machine type
    if rank == 0:
        # Mac coordinator - use CPU or MPS if available
        device = torch.device(mac_device)  # "cpu" or "mps"
        print(f"🖥️  Rank {rank} (Mac): Running on device {device}")
    else:
        # Windows worker - should use CUDA
        device = torch.device("cuda:0")  # Windows RTX 2080
        print(f"💻 Rank {rank} (Windows): Running on device {device}")
    
    # Create tensors with different data for each rank
    tensor_list = [torch.zeros(3, dtype=torch.int64) for _ in range(world_size)]
    
    # Create input tensor with rank-specific data
    input_tensor = torch.tensor([rank*100, rank*100+10, rank*100+20], dtype=torch.int64)
    
    # Move to appropriate device
    if rank == 0:
        input_tensor = input_tensor.to(device)
        tensor_list = [t.to(device) for t in tensor_list]
    
    print(f"Rank {rank}: Before all_gather - input: {input_tensor}")
    print(f"Rank {rank}: Before all_gather - tensor_list: {tensor_list}")
    
    # Perform all_gather
    dist.all_gather(tensor_list, input_tensor)
    
    print(f"Rank {rank}: After all_gather - tensor_list: {tensor_list}")
    
    # Synchronize based on device type
    if rank == 0 and mac_device == "mps":
        torch.mps.synchronize()
    elif rank != 0:  # Windows with CUDA
        torch.cuda.synchronize()
    
    return tensor_list

print("🧪 test_all_gather_basic() function ready")


In [None]:
# COMPLETE INITIALIZATION - Run this cell after Windows worker is ready

# Initialize distributed process group
def initialize_distributed():
    """Initialize the distributed environment"""
    try:
        # Use 'gloo' backend for CPU-GPU mixed setup (Mac CPU + Windows GPU)
        backend = 'gloo'
        
        print(f"🔄 Initializing distributed training with backend: {backend}")
        print(f"Backend 'gloo' chosen for Mac (CPU/MPS) + Windows (GPU) setup")
        
        # Initialize the process group
        dist.init_process_group(backend=backend, timeout=torch.distributed.default_pg_timeout)
        
        rank = dist.get_rank()
        world_size = dist.get_world_size()
        
        print(f"✅ Distributed initialized!")
        print(f"Rank: {rank}, World size: {world_size}")
        
        if rank == 0:
            print("🖥️  This is the Mac coordinator (Rank 0)")
        else:
            print(f"💻 This is worker rank {rank}")
        
        return backend
    except Exception as e:
        print(f"❌ Failed to initialize distributed: {e}")
        print("Make sure Windows worker is running first!")
        return None

# NOW: Initialize the distributed connection with Windows
print("🚀 Attempting to connect to Windows worker...")
backend = initialize_distributed()

if backend:
    print("🎉 SUCCESS! Connected to Windows worker!")
    print("🧪 Ready to run distributed operations!")
else:
    print("❌ Connection failed. Check Windows worker status.")


## Configure Distributed Setup

**Step 1**: Note down your Mac's IP address from above. You'll need this for the Windows machine.

**Step 2**: Configure the distributed environment. Update the `WINDOWS_IP` with your Windows laptop's IP address.


In [None]:
# Distributed configuration
MASTER_ADDR = local_ip  # This Mac will be the master
MASTER_PORT = "12355"   # Port for communication
WORLD_SIZE = 2          # Total number of processes (Mac + Windows)
RANK = 0               # This machine's rank (master = 0)

# TODO: Update this with your Windows laptop's IP
WINDOWS_IP = "192.168.1.XXX"  # Replace with actual Windows IP

print(f"Master Address (this Mac): {MASTER_ADDR}")
print(f"Master Port: {MASTER_PORT}")
print(f"World Size: {WORLD_SIZE}")
print(f"This machine's rank: {RANK}")
print(f"Windows machine IP: {WINDOWS_IP}")

# Set environment variables for distributed training
os.environ['MASTER_ADDR'] = MASTER_ADDR
os.environ['MASTER_PORT'] = MASTER_PORT
os.environ['WORLD_SIZE'] = str(WORLD_SIZE)
os.environ['RANK'] = str(RANK)


In [None]:
# Initialize distributed process group
@make_distributed
def initialize_distributed():
    """Initialize the distributed environment"""
    # Use 'gloo' backend for CPU-GPU mixed setup (Mac CPU + Windows GPU)
    # 'gloo' supports heterogeneous setups better than 'nccl'
    backend = 'gloo'
    
    print(f"Initializing distributed training with backend: {backend}")
    print(f"Backend 'gloo' chosen for Mac (CPU) + Windows (GPU) setup")
    print(f"Rank: {dist.get_rank()}, World size: {dist.get_world_size()}")
    
    rank = dist.get_rank()
    if rank == 0:
        print("This is the Mac coordinator (Rank 0)")
    else:
        print(f"This is worker rank {rank}")
    
    return backend

# This decorator will handle the distributed setup
backend = initialize_distributed()


## All-Gather Operations Test

Now let's implement the distributed all_gather operations similar to your original code, but adapted for distributed execution.


In [None]:
@make_distributed
def test_all_gather_basic():
    """Test basic all_gather operation across distributed nodes"""
    
    rank = dist.get_rank()
    world_size = dist.get_world_size()
    
    # Device allocation based on rank and machine type
    if rank == 0:
        # Mac coordinator - use CPU or MPS if available
        device = torch.device(mac_device)  # "cpu" or "mps"
        print(f"Rank {rank} (Mac): Running on device {device}")
    else:
        # Windows worker - should use CUDA
        device = torch.device(f"cuda:0")  # Windows RTX 2080
        print(f"Rank {rank} (Windows): Running on device {device}")
    
    # Create tensors with different data for each rank
    tensor_list = [torch.zeros(3, dtype=torch.int64).to(device) for _ in range(world_size)]
    
    # Create input tensor with rank-specific data
    input_tensor = torch.tensor([rank*100, rank*100+10, rank*100+20], dtype=torch.int64).to(device)
    
    print(f"Rank {rank}: Before all_gather - input: {input_tensor}")
    print(f"Rank {rank}: Before all_gather - tensor_list: {tensor_list}")
    
    # Perform all_gather
    dist.all_gather(tensor_list, input_tensor)
    
    print(f"Rank {rank}: After all_gather - tensor_list: {tensor_list}")
    
    # Synchronize based on device type
    if rank == 0 and mac_device == "mps":
        torch.mps.synchronize()
    elif rank != 0:  # Windows with CUDA
        torch.cuda.synchronize()
    
    return tensor_list

# Run the basic all_gather test
result = test_all_gather_basic()


In [None]:
@make_distributed  
def test_all_gather_advanced():
    """Test more complex all_gather operations with timing"""
    import time
    
    rank = dist.get_rank()
    world_size = dist.get_world_size()
    
    # Device allocation based on rank and machine type
    if rank == 0:
        # Mac coordinator
        device = torch.device(mac_device)
        machine_type = "Mac"
    else:
        # Windows worker with RTX 2080
        device = torch.device(f"cuda:0")
        machine_type = "Windows RTX 2080"
    
    print(f"\n--- Advanced All-Gather Test ---")
    print(f"Rank {rank} ({machine_type}): Running on device {device}")
    
    # Test with larger tensors
    tensor_size = (2, 3)  # 2x3 tensor
    tensor_list = [torch.zeros(tensor_size, dtype=torch.int64).to(device) for _ in range(world_size)]
    
    # Create input tensor with more complex data
    input_tensor = torch.tensor([[rank*10, rank*10+1, rank*10+2], 
                                [rank*10+3, rank*10+4, rank*10+5]], 
                               dtype=torch.int64).to(device)
    
    print(f"Rank {rank}: Input tensor shape: {input_tensor.shape}")
    print(f"Rank {rank}: Input tensor:\n{input_tensor}")
    
    # Time the operation
    start_time = time.time()
    
    # Perform all_gather
    dist.all_gather(tensor_list, input_tensor)
    
    # Synchronize based on device type
    if rank == 0 and mac_device == "mps":
        torch.mps.synchronize()
    elif rank != 0:  # Windows with CUDA
        torch.cuda.synchronize()
    
    end_time = time.time()
    
    print(f"Rank {rank}: All-gather completed in {end_time - start_time:.4f} seconds")
    print(f"Rank {rank}: Gathered tensors from all ranks:")
    for i, tensor in enumerate(tensor_list):
        machine = "Mac (CPU/MPS)" if i == 0 else f"Windows RTX 2080"
        print(f"  From rank {i} ({machine}):\n{tensor}")
    
    return tensor_list, end_time - start_time

# Run the advanced all_gather test
result_advanced, timing = test_all_gather_advanced()


## Windows RTX 2080 Setup Instructions

**On your Windows laptop with RTX 2080, create this Python script (`worker.py`):**

```python
import torch
import torch.distributed as dist
import os

# Configuration - UPDATE THESE VALUES
MASTER_ADDR = "YOUR_MAC_IP_HERE"  # Replace with your Mac's IP from above
MASTER_PORT = "12355"
WORLD_SIZE = 2
RANK = 1  # Windows machine is rank 1

# Set environment variables
os.environ['MASTER_ADDR'] = MASTER_ADDR
os.environ['MASTER_PORT'] = MASTER_PORT
os.environ['WORLD_SIZE'] = str(WORLD_SIZE)
os.environ['RANK'] = str(RANK)

# Initialize distributed process group
dist.init_process_group(backend='gloo')

print(f"Windows worker initialized with CUDA: {torch.cuda.is_available()}")
print(f"CUDA device: {torch.cuda.get_device_name(0)}")
print(f"Rank: {dist.get_rank()}, World size: {dist.get_world_size()}")

# Keep the worker running and ready for operations
try:
    print("Windows worker ready for distributed operations...")
    # This will keep the process alive for distributed operations
    dist.barrier()  # Wait for coordination
    print("Distributed operations completed!")
finally:
    dist.destroy_process_group()
```

**Steps to run:**
1. Install PyTorch with CUDA support on Windows: `pip install torch torchvision --index-url https://download.pytorch.org/whl/cu118`
2. Update `MASTER_ADDR` in the script with your Mac's IP
3. Run: `python worker.py`
4. Then run the cells in this notebook on your Mac

**Network requirements:**
- Both machines on same network
- Port 12355 open for communication
- Firewall may need configuration
