Skip to content

nomcycle/ComfyUI_Cluster

Repository files navigation

ComfyUI Cluster

Distributed Computing Extension for ComfyUI

License Python


Overview

ComfyUI Cluster is a sophisticated distributed computing system that enables horizontal scaling of ComfyUI workflows across multiple GPU instances. It transforms ComfyUI from a single-machine image generation tool into a powerful cluster-based parallel processing platform.

What Problem Does It Solve?

Single Machine Limitations:

  • Limited batch processing capacity
  • Sequential execution bottlenecks
  • Single GPU memory constraints
  • Long processing times for large image sets

ComfyUI Cluster Solution:

  • Parallel Execution: Distribute workloads across 2-100+ instances
  • Linear Scaling: 4 GPUs = ~4x throughput, 10 GPUs = ~10x throughput
  • Flexible Deployment: Local, LAN, cloud, or hybrid configurations
  • Transparent Integration: Use familiar ComfyUI workflows with cluster-aware nodes

Key Benefits

  • 4-100x Performance Improvement: Process hundreds of images in parallel
  • Cost Optimization: Leverage spot instances and auto-scaling
  • Geographic Distribution: Deploy instances across regions for redundancy
  • Zero Workflow Changes: Existing workflows run on single instance (instance 0)
  • Production Ready: Battle-tested with strict type checking and comprehensive error handling

Features

Distributed Execution

  • Leader-Follower Architecture: Automatic coordination across instances
  • Work Distribution: Fan-out image batches to multiple GPUs
  • Result Collection: Fan-in processed results back to leader
  • Workflow Execution: Run complete workflows across cluster

Tensor Synchronization Patterns

  • Broadcast: Leader sends tensor to all followers
  • Fan-out: Leader distributes sliced tensors for parallel processing
  • Fan-in: Followers send results to leader for aggregation
  • Gather: All-to-all tensor exchange for distributed computation

Multiple Deployment Modes

  • STUN Mode: Cloud/WAN deployments with NAT traversal
  • Broadcast Mode: Local network auto-discovery
  • Static Mode: Fixed IP configuration for known topologies

High-Performance Networking

  • Custom UDP Protocol: Low-latency tensor transfer optimized for throughput
  • Automatic Compression: PNG compression for images (10-100x bandwidth savings)
  • Reliable Delivery: ACK/retry mechanism with automatic chunking
  • Batch Processing: 174 packets per batch for efficiency

Cloud Platform Integration

  • RunPod Support: Auto-detects pod environments with internal DNS resolution
  • Generic Cloud: Works with AWS, GCP, Azure, etc.
  • NAT Traversal: STUN server enables communication through firewalls

Custom ComfyUI Nodes

19 specialized nodes for cluster operations:

  • Cluster management and info display
  • Image/latent/mask distribution
  • Workflow and subgraph execution
  • Batch processing utilities
  • Memory management

Architecture

High-Level Design

┌─────────────────────────────────────────────────────────────┐
│                    ComfyUI Cluster                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌───────────────────────────────────────────────────┐    │
│  │         Cluster Management Layer                  │    │
│  │  • Instance discovery & registration              │    │
│  │  • Leader/Follower coordination                   │    │
│  │  • State synchronization                          │    │
│  └───────────────────────────────────────────────────┘    │
│                          ↕                                 │
│  ┌───────────────────────────────────────────────────┐    │
│  │         Communication Layer                        │    │
│  │  • Custom reliable UDP protocol                    │    │
│  │  • Protobuf message serialization                  │    │
│  │  • Tensor compression & transfer                   │    │
│  └───────────────────────────────────────────────────┘    │
│                          ↕                                 │
│  ┌───────────────────────────────────────────────────┐    │
│  │         Node Extension Layer                       │    │
│  │  • 19 cluster-aware ComfyUI nodes                  │    │
│  │  • Fan-out/fan-in operations                       │    │
│  │  • Workflow execution                              │    │
│  └───────────────────────────────────────────────────┘    │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Leader-Follower Pattern

Leader Instance (Instance 0):

  • Accepts user workflows via ComfyUI interface
  • Distributes work to follower instances
  • Collects and aggregates results
  • Provides final output to user
  • Runs full ComfyUI web interface

Follower Instances (Instances 1+):

  • Wait for work assignments from leader
  • Process assigned tensor slices/batches
  • Send results back to leader
  • Minimal web interface (API only)

Component Breakdown

1. Instance Management (src/)

Instance Loop (instance_loop.py)

  • Singleton managing instance lifecycle
  • Runs two async threads:
    • State Thread: Instance state handling (1ms interval)
    • Packet Thread: Message/buffer queue processing

Instance Types (instance.py)

ThisInstance (current instance)
├── ThisLeaderInstance (coordinator)
└── ThisFollowerInstance (worker)

OtherInstance (remote instances)
├── OtherLeaderInstance
└── OtherFollowerInstance

Cluster Manager (cluster.py)

  • Registry of all instances
  • Connection status tracking
  • Message/buffer handler coordination

2. Communication Layer (src/udp/)

UDP Base System (udp_base.py)

  • Custom reliable UDP implementation
  • Components:
    • ThreadManager: Incoming/outgoing packet threads
    • AddressRegistry: Instance ID → network address mapping
    • PacketProcessor: Packet processing with batching

Listener (listener.py)

  • Dual socket system:
    • Broadcast listener (discovery)
    • Direct listener (point-to-point)
  • Non-blocking polling (1ms timeout)

Sender (sender.py)

  • UDPEmitter for packet transmission
  • Broadcast and direct messaging
  • Automatic chunking for large payloads

Message/Buffer Handlers

  • UDPMessageHandler: Control messages
  • UDPBufferHandler: Large data transfers (tensors)
  • ACK/retry with timeout tracking

3. Registration Strategies (src/registration_strategy.py)

STUN Strategy

  • Centralized registration server
  • REST API for instance discovery
  • NAT traversal support
  • Key-based authentication

Broadcast Strategy

  • UDP broadcast announcements
  • Automatic peer detection
  • LAN-only deployment

Static Strategy

  • Pre-configured addresses
  • No discovery overhead
  • Fixed topology

4. State Synchronization (src/states/)

SyncStateHandler (sync_state.py)

Broadcast Pattern:

Leader: [Tensor] ──────┐
                       ├──→ Follower 0: [Tensor]
                       ├──→ Follower 1: [Tensor]
                       └──→ Follower 2: [Tensor]

Fan-out Pattern:

Leader: [Batch=8]
    ├──→ Instance 0: [Slice 0-1]  (2 items)
    ├──→ Instance 1: [Slice 2-3]  (2 items)
    ├──→ Instance 2: [Slice 4-5]  (2 items)
    └──→ Instance 3: [Slice 6-7]  (2 items)
         ↓ Parallel Processing ↓

Fan-in Pattern:

Instance 0: [Result 0-1] ┐
Instance 1: [Result 2-3] ├──→ Leader: [Combined 0-7]
Instance 2: [Result 4-5] │
Instance 3: [Result 6-7] ┘

Gather Pattern:

All instances exchange tensors
Each instance receives all others' tensors
Used for distributed computations

5. STUN Server (stun/)

FastAPI Server (stun/src/server.py)

  • Port 8089 (HTTP) or 443 (HTTPS with Caddy)
  • Multi-cluster instance registry
  • Heartbeat-based liveness detection
  • Automatic stale instance cleanup (60s timeout)

Cluster Registry (stun/src/cluster.py)

  • Per-cluster instance tracking
  • State management
  • Cleanup routines

API Endpoints:

  • POST /register-instance: Register new instance
  • GET /instances/{cluster_id}: List cluster instances
  • POST /heartbeat/{cluster_id}/{instance_id}: Update heartbeat
  • DELETE /instance/{cluster_id}/{instance_id}: Remove instance
  • GET /health: Health check

6. Thread Architecture

State Thread (asyncio)

while running:
    await instance.handle_state()  # 1ms interval
    # Manages initialization, announcements, sync operations

Packet Thread (asyncio)

while running:
    await process_message_queue()
    await process_buffer_queue()

Incoming Thread (native)

while running:
    poll_udp_sockets(timeout=1ms)
    queue_received_packets()

Outgoing Thread (native)

while running:
    batch = dequeue_packets(max=174)
    send_batch()

Quick Start

Prerequisites

  • ComfyUI: Installed and working
  • Python: 3.10+ (3.13 recommended)
  • Network: UDP ports accessible between instances
  • GPUs: CUDA-enabled for followers (leader can be CPU-only)

Installation

  1. Clone into ComfyUI custom_nodes:
cd ComfyUI/custom_nodes
git clone https://github.com/your-org/ComfyUI_Cluster.git
cd ComfyUI_Cluster
  1. Install dependencies:
pip install -r requirements.txt
  1. Compile protobuf messages:
./setup.sh  # Downloads protoc and compiles messages.proto

Basic Local Setup (4 Instances)

Terminal 1 - Leader (Instance 0):

export COMFY_CLUSTER_INSTANCE_COUNT=4
export COMFY_CLUSTER_INSTANCE_INDEX=0
export COMFY_CLUSTER_ROLE=LEADER
export COMFY_CLUSTER_REGISTRATION_MODE=broadcast
export COMFY_CLUSTER_SINGLE_HOST=true

cd ComfyUI
python main.py --listen 0.0.0.0 --port 8189

Terminal 2 - Follower 1 (Instance 1):

export COMFY_CLUSTER_INSTANCE_COUNT=4
export COMFY_CLUSTER_INSTANCE_INDEX=1
export COMFY_CLUSTER_ROLE=FOLLOWER
export COMFY_CLUSTER_REGISTRATION_MODE=broadcast
export COMFY_CLUSTER_SINGLE_HOST=true

cd ComfyUI
python main.py --listen 0.0.0.0 --port 8190

Repeat for Instances 2 and 3 (ports 8191, 8192)

Verify Cluster

  1. Open ComfyUI: http://localhost:8189
  2. Add a ClusterInfo node to workflow
  3. Queue workflow - should show "Connected: 4/4 instances"

Configuration Reference

Required Environment Variables

# Cluster Topology
COMFY_CLUSTER_INSTANCE_COUNT=4          # Total instances in cluster
COMFY_CLUSTER_INSTANCE_INDEX=0          # This instance's ID (0-based)
COMFY_CLUSTER_ROLE=LEADER               # LEADER or FOLLOWER

# Registration Mode
COMFY_CLUSTER_REGISTRATION_MODE=stun    # stun, broadcast, or static

STUN Mode Configuration

For cloud/WAN deployments with NAT traversal:

# Registration
COMFY_CLUSTER_REGISTRATION_MODE=stun
COMFY_CLUSTER_STUN_SERVER=https://stun.example.com:443
COMFY_CLUSTER_CLUSTER_ID=my-production-cluster
COMFY_CLUSTER_KEY=your-secret-cluster-key

# Networking
COMFY_CLUSTER_LISTEN_ADDRESS=0.0.0.0
COMFY_CLUSTER_INSTANCE_ADDRESS=           # Auto-detected public IP
COMFY_CLUSTER_DIRECT_LISTEN_PORT=9998
COMFY_CLUSTER_COMFY_PORT=8188

Broadcast Mode Configuration

For LAN deployments with auto-discovery:

# Registration
COMFY_CLUSTER_REGISTRATION_MODE=broadcast
COMFY_CLUSTER_UDP_BROADCAST=true
COMFY_CLUSTER_BROADCAST_PORT=9997

# Networking
COMFY_CLUSTER_LISTEN_ADDRESS=0.0.0.0
COMFY_CLUSTER_DIRECT_LISTEN_PORT=9998

Static Mode Configuration

For fixed IP deployments:

# Registration
COMFY_CLUSTER_REGISTRATION_MODE=static
COMFY_CLUSTER_UDP_HOSTNAMES=192.168.1.10:9998,192.168.1.11:9998,192.168.1.12:9998,192.168.1.13:9998

# Networking
COMFY_CLUSTER_LISTEN_ADDRESS=0.0.0.0
COMFY_CLUSTER_DIRECT_LISTEN_PORT=9998

Optional Configuration

# Development
COMFY_CLUSTER_SINGLE_HOST=true          # All instances on same machine
COMFY_CLUSTER_HOT_RELOAD=true           # Enable hot reload during dev

# Performance
COMFY_CLUSTER_BATCH_SIZE=174            # Packets per batch (default: 174)

# Debugging
COMFY_CLUSTER_LOG_LEVEL=DEBUG           # Logging verbosity

RunPod-Specific Configuration

Automatic when RUNPOD_POD_ID is present:

# RunPod sets this automatically
RUNPOD_POD_ID=your-pod-id

# System auto-resolves internal addresses
# Internal DNS: {pod_id}.runpod.internal
# Enables pod-to-pod communication

Deployment Modes

Mode 1: STUN (Cloud/WAN)

Best for: Cloud deployments, instances behind NAT, geographic distribution

Setup:

  1. Deploy STUN Server:
cd stun
python run.py --host 0.0.0.0 --port 8089

# Optional: HTTPS with Caddy
caddy run --config caddy/Caddyfile
  1. Configure Instances:
export COMFY_CLUSTER_REGISTRATION_MODE=stun
export COMFY_CLUSTER_STUN_SERVER=https://your-stun-server.com:443
export COMFY_CLUSTER_CLUSTER_ID=prod-cluster-1
export COMFY_CLUSTER_KEY=your-secret-key
  1. Start Instances:
  • Instances auto-register with STUN server
  • STUN server provides peer discovery
  • Instances communicate peer-to-peer

Advantages:

  • NAT traversal support
  • Works across regions
  • Centralized instance discovery
  • Multi-cluster support

Considerations:

  • Requires STUN server deployment
  • Adds discovery latency (~100-500ms)
  • STUN server is single point of failure (can be replicated)

Mode 2: Broadcast (LAN)

Best for: Local deployments, same subnet, development

Setup:

export COMFY_CLUSTER_REGISTRATION_MODE=broadcast
export COMFY_CLUSTER_UDP_BROADCAST=true
export COMFY_CLUSTER_BROADCAST_PORT=9997

Advantages:

  • Zero configuration
  • Automatic peer discovery
  • No external dependencies
  • Fastest discovery (~100ms)

Considerations:

  • LAN-only (same broadcast domain)
  • Not suitable for cloud
  • Broadcast traffic overhead

Mode 3: Static (Fixed IPs)

Best for: Fixed infrastructure, predictable topology

Setup:

export COMFY_CLUSTER_REGISTRATION_MODE=static
export COMFY_CLUSTER_UDP_HOSTNAMES=10.0.0.1:9998,10.0.0.2:9998,10.0.0.3:9998

Advantages:

  • No discovery overhead
  • Predictable behavior
  • No broadcast/STUN dependencies

Considerations:

  • Manual configuration required
  • IP changes require reconfiguration
  • No dynamic scaling

RunPod Deployment

Automatic Detection:

# RunPod sets RUNPOD_POD_ID automatically
# System detects and uses internal DNS

export COMFY_CLUSTER_REGISTRATION_MODE=stun
export COMFY_CLUSTER_STUN_SERVER=https://your-stun.com
# Instance auto-resolves: {pod_id}.runpod.internal

Benefits:

  • Pod-to-pod internal networking
  • No internet bandwidth charges
  • Automatic address resolution

Node Reference

Cluster Management Nodes

ClusterInfo

Purpose: Display cluster configuration and status Inputs: None Outputs: info (string) Use Case: Debugging, verifying cluster connectivity

ClusterFreeNow

Purpose: Force immediate memory cleanup Inputs: trigger (any) Outputs: trigger (passthrough) Use Case: Free GPU memory between operations

ClusterFinallyFree

Purpose: Cleanup memory after workflow completes Inputs: trigger (any) Outputs: trigger (passthrough) Use Case: Final cleanup step

Workflow Execution Nodes

ClusterExecuteWorkflow

Purpose: Execute a workflow file across cluster Inputs:

  • workflow_path (string): Path to workflow JSON
  • inputs (dict): Input values Outputs: results (dict) Use Case: Batch processing multiple workflows

ClusterExecuteCurrentWorkflow

Purpose: Re-execute current workflow Inputs: trigger (any) Outputs: results (dict) Use Case: Iterative generation

ClusterStartSubgraph / ClusterEndSubgraph / ClusterUseSubgraph

Purpose: Define and execute workflow subgraphs Use Case: Reusable workflow components, conditional execution

Tensor Operations

ClusterBroadcastTensor

Purpose: Leader broadcasts tensor to all followers Inputs: tensor (tensor) Outputs: tensor (passthrough on leader) Use Case: Distribute model weights, shared parameters Instance: Leader only

ClusterListenTensorBroadcast

Purpose: Receive broadcasted tensor Inputs: None Outputs: tensor (received tensor) Use Case: Receive shared parameters Instance: Followers only

Image Operations

ClusterBroadcastLoadedImage

Purpose: Load image on leader and broadcast Inputs: image_path (string) Outputs: image (tensor) Use Case: Shared reference images

ClusterFanOutImage

Purpose: Distribute image slices to instances Inputs: images (tensor [batch, h, w, c]) Outputs: image_slice (tensor) Use Case: Parallel batch processing Behavior: Splits batch dimension across instances

Example:

Input: [8, 512, 512, 3] on 4 instances
Output: [2, 512, 512, 3] per instance

ClusterFanInImages

Purpose: Collect processed images from instances Inputs: images (tensor) Outputs: combined_images (tensor) Use Case: Reassemble processed batches Instance: Leader only

ClusterGatherImages

Purpose: All-to-all image exchange Inputs: images (tensor) Outputs: all_images (list of tensors) Use Case: Distributed operations requiring all results

Latent Operations

ClusterFanOutLatent

Purpose: Distribute latent slices Inputs: latent (dict with 'samples' key) Outputs: latent_slice (dict) Use Case: Parallel latent processing

ClusterGatherLatents

Purpose: Collect latents from all instances Inputs: latent (dict) Outputs: all_latents (list of dicts) Use Case: Distributed latent operations

Mask Operations

ClusterFanOutMask

Purpose: Distribute mask slices Inputs: mask (tensor) Outputs: mask_slice (tensor) Use Case: Parallel mask processing

ClusterGatherMasks

Purpose: Collect masks from all instances Inputs: mask (tensor) Outputs: all_masks (list of tensors) Use Case: Distributed mask operations

Batch Processing Utilities

ClusterGetInstanceWorkItemFromBatch

Purpose: Extract work item for this instance Inputs: batch (list), index_offset (int, optional) Outputs: item (any) Use Case: Manual work distribution

Example:

Instance 0: Gets batch[0]
Instance 1: Gets batch[1]
Instance 2: Gets batch[2]
Instance 3: Gets batch[3]

ClusterSplitBatchToList

Purpose: Split batch tensor into list Inputs: batch (tensor [batch, ...]) Outputs: list (list of tensors) Use Case: Convert batch to list for processing

ClusterFlattenBatchedImageList

Purpose: Flatten nested batch lists Inputs: batched_list (list of lists) Outputs: flat_list (list) Use Case: Post-gather processing

ClusterInsertAtIndex

Purpose: Insert item at specific index Inputs: list, item, index Outputs: list (modified) Use Case: Result reordering

ClusterStridedReorder

Purpose: Reorder with stride pattern Inputs: list, stride Outputs: reordered_list Use Case: Interleave results


Usage Examples

Example 1: Batch Image Generation (4 GPUs)

Scenario: Generate 100 images with Stable Diffusion

Workflow:

LoadCheckpoint → KSampler → VAEDecode → SaveImage

Cluster Workflow:

1. ClusterFanOutLatent (empty latent [100, 4, 64, 64])
   ├─→ Instance 0: [25, 4, 64, 64]
   ├─→ Instance 1: [25, 4, 64, 64]
   ├─→ Instance 2: [25, 4, 64, 64]
   └─→ Instance 3: [25, 4, 64, 64]

2. KSampler (parallel processing on each instance)

3. VAEDecode (parallel processing)

4. ClusterFanInImages
   └─→ Leader: [100, 512, 512, 3]

5. SaveImage (on leader)

Performance:

  • Single GPU: 100 images × 2s = 200s total
  • 4 GPUs: 25 images × 2s = 50s total
  • Speedup: 4x

Example 2: Parameter Sweep

Scenario: Test 20 different CFG values

Workflow:

cfg_values = [3.0, 3.5, 4.0, ..., 12.0]  # 20 values

# Each instance gets 5 values
instance_cfgs = ClusterGetInstanceWorkItemFromBatch(cfg_values)

# Process in parallel
for cfg in instance_cfgs:
    generate_with_cfg(cfg)

# Gather results
all_results = ClusterGatherImages(local_results)

Example 3: Style Transfer Grid

Scenario: Apply 10 styles to 10 images (100 combinations)

Workflow:

1. Leader loads 10 styles → ClusterBroadcastTensor
2. ClusterFanOutImage (10 images across instances)
   Instance 0: images 0-2 (3 images × 10 styles = 30)
   Instance 1: images 3-4 (2 images × 10 styles = 20)
   Instance 2: images 5-7 (3 images × 10 styles = 30)
   Instance 3: images 8-9 (2 images × 10 styles = 20)

3. Each instance processes its images with all styles
4. ClusterGatherImages → Leader gets all 100 results

Example 4: Subgraph Execution

Scenario: Reusable upscaling subgraph

Workflow:

ClusterStartSubgraph "upscale_4x"
├─→ Upscaler (RealESRGAN)
└─→ SaveImage
ClusterEndSubgraph

# Use in main workflow
ClusterFanOutImage → ProcessImages → ClusterUseSubgraph("upscale_4x")

Example 5: Iterative Refinement

Scenario: Progressive enhancement over 5 iterations

Workflow:

for i in range(5):
    # Distribute work
    slices = ClusterFanOutImage(current_images)

    # Parallel refinement
    refined = RefineNode(slices, strength=0.3)

    # Collect results
    current_images = ClusterFanInImages(refined)

# Final output
SaveImage(current_images)

STUN Server Setup

Installation

cd stun

# Install dependencies
pip install -r requirements.txt

# Run server
python run.py --host 0.0.0.0 --port 8089

Configuration

Environment Variables:

STUN_SERVER_HOST=0.0.0.0          # Bind address
STUN_SERVER_PORT=8089              # HTTP port
STUN_CLEANUP_INTERVAL=30           # Cleanup check interval (seconds)
STUN_INSTANCE_TIMEOUT=60           # Instance timeout (seconds)

HTTPS with Caddy

Caddyfile:

stun.example.com {
    reverse_proxy localhost:8089

    tls your-email@example.com

    encode gzip

    log {
        output file /var/log/caddy/stun.log
    }
}

Start Caddy:

cd stun/caddy
caddy run --config Caddyfile

API Reference

Register Instance

POST /register-instance
Content-Type: application/json
X-Cluster-Key: your-secret-key

{
  "cluster_id": "prod-cluster-1",
  "instance_id": 0,
  "address": "203.0.113.1",
  "port": 9998,
  "role": "LEADER"
}

Response:
{
  "status": "registered",
  "instance_id": 0,
  "instances": [...]
}

List Instances

GET /instances/{cluster_id}
X-Cluster-Key: your-secret-key

Response:
{
  "cluster_id": "prod-cluster-1",
  "instances": [
    {
      "instance_id": 0,
      "address": "203.0.113.1",
      "port": 9998,
      "role": "LEADER",
      "last_ping": "2025-01-15T10:30:45.123Z"
    },
    ...
  ]
}

Heartbeat

POST /heartbeat/{cluster_id}/{instance_id}
X-Cluster-Key: your-secret-key

Response:
{
  "status": "updated",
  "instance_id": 0
}

Security

Key-Based Authentication:

  • Each cluster has unique key
  • Key required for all operations
  • Prevents unauthorized access

HTTPS:

  • Caddy provides automatic TLS
  • Let's Encrypt integration
  • Secure cluster key transmission

Recommendations:

  • Use long random keys (32+ characters)
  • Rotate keys periodically
  • Deploy STUN server in trusted network
  • Enable firewall rules (port 443 only)

Performance & Tuning

Network Optimization

Increase UDP Buffer Sizes:

# Linux/Mac
sudo ./set_kernal_buffer_sizes.sh

# Manual
sudo sysctl -w net.core.rmem_max=268435456
sudo sysctl -w net.core.wmem_max=268435456

# Make permanent
echo "net.core.rmem_max=268435456" | sudo tee -a /etc/sysctl.conf
echo "net.core.wmem_max=268435456" | sudo tee -a /etc/sysctl.conf
sudo sysctl -p

Benefits:

  • Prevents packet drops during bursts
  • Reduces retransmissions
  • Improves throughput by 2-5x

Tensor Compression

Automatic PNG Compression:

  • Applied to 4D image tensors [batch, h, w, channels]
  • 10-100x bandwidth reduction
  • Lossless for uint8 images
  • No configuration needed

Compression Stats:

Raw tensor: [1, 512, 512, 3] × 4 bytes = 3.1 MB
Compressed: ~31-310 KB (10-100x reduction)

When Compression Activates:

  • 4D tensors with shape [batch, height, width, channels]
  • uint8 or float32 dtype
  • Channels = 1, 3, or 4

Instance Placement

Same Region:

  • <1ms latency
  • Full bandwidth utilization
  • Optimal for fan-out/fan-in

Cross-Region:

  • 20-100ms latency
  • Bandwidth limited by internet
  • Suitable for broadcast/gather only

Recommendations:

  • Keep cluster in same region
  • Use fastest network tier
  • Co-locate with storage

Monitoring

Key Metrics:

  • Instance connection status (ClusterInfo)
  • Message retry rates (logs)
  • Buffer transfer times (logs)
  • GPU utilization per instance

Logging:

export COMFY_CLUSTER_LOG_LEVEL=INFO

# Watch for:
# - "All instances registered" (good)
# - "Retrying message" (network issues)
# - "Instance timeout" (connectivity problem)

Scaling Guidelines

2-4 Instances:

  • Simple broadcast mode
  • LAN deployment
  • Development/testing

5-10 Instances:

  • STUN or static mode
  • Cloud deployment
  • Production workloads

10+ Instances:

  • STUN mode required
  • Consider multiple clusters
  • Monitor STUN server load

Network Bandwidth Requirements:

Per instance, per image (512×512 RGB):
- Raw: 3.1 MB
- Compressed: ~100 KB (typical)
- 4 instances: ~300 KB/s during fan-out

Development

Building from Source

# Clone repository
git clone https://github.com/your-org/ComfyUI_Cluster.git
cd ComfyUI_Cluster

# Create virtual environment
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

# Install dependencies
pip install -r requirements.txt

# Compile protobuf
./setup.sh

Protobuf Compilation

Automatic:

./setup.sh  # Downloads protoc, compiles messages.proto

Manual:

# Install protoc
# Download from: https://github.com/protocolbuffers/protobuf/releases

# Compile
cd protobuf/src
protoc --python_out=../../src/protobuf messages.proto

Type Checking

Run mypy:

mypy src/ --strict

Configuration: pyproject.toml

[tool.mypy]
strict = true
python_version = "3.13"

Linting

Pylint:

pylint src/

Ruff:

ruff check src/
ruff format src/

VS Code Debug Configuration

Pre-configured launches (.vscode/launch.json):

  • Cluster LEADER: Port 8189, instance 0
  • Cluster FOLLOWER 1: Port 8190, instance 1
  • Cluster FOLLOWER 2: Port 8191, instance 2
  • Cluster FOLLOWER 3: Port 8192, instance 3

Usage:

  1. Open folder in VS Code
  2. Set breakpoints
  3. Press F5, select "Cluster LEADER"
  4. Start followers in separate terminals or debug sessions

Testing

Unit Tests:

cd stun
pytest tests/ -v

Integration Testing:

# Start 4 instances
# Run test workflow with ClusterInfo node
# Verify "Connected: 4/4"

Project Structure

ComfyUI_Cluster/
├── src/                      # Main implementation (5,022 LOC)
│   ├── nodes/               # ComfyUI custom nodes
│   ├── states/              # State synchronization
│   ├── udp/                 # UDP communication
│   ├── protobuf/            # Generated protobuf code
│   ├── cluster.py           # Cluster management
│   ├── instance.py          # Instance types
│   ├── instance_loop.py     # Event loop
│   └── registration_strategy.py
│
├── stun/                    # STUN server
│   ├── src/                # Server implementation
│   ├── tests/              # Unit tests
│   └── caddy/              # HTTPS proxy config
│
├── protobuf/               # Protobuf definitions
│   └── src/messages.proto
│
├── js/                     # Frontend
│   ├── cluster.js          # "Queue Cluster" button
│   └── container*.js       # Visual nodes
│
├── __init__.py             # ComfyUI plugin entry
├── requirements.txt        # Dependencies
├── pyproject.toml          # Project config
└── README.md               # This file

Troubleshooting

Instances Not Connecting

Symptom: ClusterInfo shows "Connected: 1/4"

Solutions:

  1. Check network connectivity:
# On each instance
ping <other-instance-ip>

# Check UDP port
nc -u -v <instance-ip> 9998
  1. Verify environment variables:
echo $COMFY_CLUSTER_INSTANCE_COUNT
echo $COMFY_CLUSTER_INSTANCE_INDEX
echo $COMFY_CLUSTER_REGISTRATION_MODE
  1. Check firewall rules:
# Linux
sudo ufw allow 9998/udp
sudo ufw allow 9997/udp  # Broadcast mode

# Check if blocked
sudo iptables -L -n | grep 9998
  1. Verify registration mode:
# STUN: Check server reachable
curl https://your-stun-server.com/health

# Broadcast: Ensure same subnet
ip addr show

# Static: Verify IP list
echo $COMFY_CLUSTER_UDP_HOSTNAMES

STUN Registration Fails

Symptom: "Failed to register with STUN server"

Solutions:

  1. Check STUN server:
curl https://your-stun-server.com/health
# Should return: {"status": "healthy"}
  1. Verify cluster key:
echo $COMFY_CLUSTER_KEY
# Must match server configuration
  1. Check network from instance:
curl -X POST https://your-stun-server.com/register-instance \
  -H "X-Cluster-Key: $COMFY_CLUSTER_KEY" \
  -H "Content-Type: application/json" \
  -d '{...}'
  1. STUN server logs:
cd stun
python run.py  # Check console output

Slow Tensor Transfer

Symptom: Fan-out/fan-in takes >10s for small batches

Solutions:

  1. Increase UDP buffers:
sudo ./set_kernal_buffer_sizes.sh
  1. Check network bandwidth:
# Test between instances
iperf3 -s  # On one instance
iperf3 -c <instance-ip>  # On another
  1. Monitor packet loss:
# Check logs for "Retrying message"
grep "Retrying" comfyui.log
  1. Reduce network hops:
  • Deploy in same region/AZ
  • Use internal networking (RunPod, AWS VPC)

Memory Issues

Symptom: "CUDA out of memory"

Solutions:

  1. Use memory management nodes:
ClusterFreeNow → After each operation
ClusterFinallyFree → End of workflow
  1. Reduce batch size per instance:
# Instead of fan-out [100] to 4 instances
# Fan-out [40] to 4 instances, run 3 iterations
  1. Monitor GPU memory:
watch -n 1 nvidia-smi

Broadcast Mode Not Working

Symptom: Instances don't discover each other

Solutions:

  1. Check broadcast address:
ip addr show
# Ensure all instances on same subnet
  1. Verify broadcast port:
echo $COMFY_CLUSTER_BROADCAST_PORT
# Should be same on all instances
  1. Test broadcast:
# Send test packet
echo "test" | nc -u -b 255.255.255.255 9997
  1. Check router settings:
  • Some routers block broadcast
  • Enable "multicast" in router config

Debug Mode

Enable verbose logging:

export COMFY_CLUSTER_LOG_LEVEL=DEBUG

# Start ComfyUI
python main.py

# Logs show:
# - Instance registration
# - Message send/receive
# - Buffer transfers
# - State transitions

Common Error Messages

"Instance index X exceeds instance count Y"

  • COMFY_CLUSTER_INSTANCE_INDEX >= COMFY_CLUSTER_INSTANCE_COUNT
  • Fix: Ensure index is 0-based and < count

"Missing required environment variable"

  • Not all required vars set
  • Fix: Check required variables section

"Failed to bind UDP socket"

  • Port already in use
  • Fix: Kill process or change port

"ACK timeout for message"

  • Network connectivity issue
  • Fix: Check firewall, network connectivity

Technical Details

UDP Protocol Specification

Message Format:

┌─────────────────────────────────────────┐
│ Protobuf Message (Header)              │
│  - message_id (uint64)                  │
│  - message_type (enum)                  │
│  - sender_instance (uint32)             │
│  - target_instance (uint32)             │
│  - require_ack (bool)                   │
│  - payload (bytes)                      │
└─────────────────────────────────────────┘

Buffer Format (Large Transfers):

┌─────────────────────────────────────────┐
│ Chunk Header                            │
│  - buffer_id (uint64)                   │
│  - chunk_index (uint32)                 │
│  - total_chunks (uint32)                │
│  - chunk_size (uint32)                  │
├─────────────────────────────────────────┤
│ Chunk Data (up to MTU - header)         │
│  - Compressed tensor data               │
│  - Or msgpack serialized data           │
└─────────────────────────────────────────┘

Reliability Mechanism:

  1. Sender transmits message with require_ack=true
  2. Receiver processes and sends ACK message
  3. Sender tracks pending ACKs with timeout (5s)
  4. Retry on timeout (max 3 retries)
  5. Error logged if all retries fail

Batch Processing:

  • Outgoing queue batches up to 174 packets
  • Sent in single burst for efficiency
  • Reduces syscall overhead

Tensor Serialization

Image Tensors (4D):

# Input: PyTorch tensor [batch, h, w, channels]
# Process:
1. Convert to numpy
2. Reshape to [h, w, channels] per image
3. Compress each image as PNG
4. Msgpack serialize: {
    "shape": original_shape,
    "dtype": dtype_string,
    "images": [png_bytes, ...],
    "compressed": true
}

Other Tensors:

# Input: PyTorch tensor (any shape)
# Process:
1. Convert to numpy
2. Binary serialize with numpy.tobytes()
3. Msgpack serialize: {
    "shape": shape,
    "dtype": dtype_string,
    "data": binary_bytes,
    "compressed": false
}

Deserialization:

# Process:
1. Msgpack deserialize
2. If compressed:
   - Decode each PNG
   - Stack into batch dimension
3. Else:
   - Reshape binary data using shape
4. Convert to PyTorch tensor
5. Move to GPU if needed

State Machine

Instance States:

INITIALIZE
    ↓
POPULATING (registering with cluster)
    ↓
IDLE (waiting for work)
    ↓
EXECUTING (processing workflow)
    ↓
IDLE
    ↓
ERROR (if failure occurs)

State Transitions:

  • INITIALIZE → POPULATING: On startup
  • POPULATING → IDLE: All instances registered
  • IDLE → EXECUTING: Workflow queued
  • EXECUTING → IDLE: Workflow complete
  • * → ERROR: On unrecoverable error

Message Types

Discovery:

  • ANNOUNCE: Instance announces presence
  • ACK: Acknowledge message receipt

Workflow:

  • DISTRIBUTE_PROMPT: Leader sends workflow to followers
  • EXECUTE_WORKFLOW: Trigger workflow execution

Tensor Sync:

  • DISTRIBUTE_BUFFER_BROADCAST: Broadcast tensor
  • DISTRIBUTE_BUFFER_FANOUT: Fan-out tensor
  • DISTRIBUTE_BUFFER_FANIN: Fan-in tensor
  • DISTRIBUTE_BUFFER_GATHER: Gather tensors

Control:

  • FREE_MEMORY: Trigger memory cleanup
  • HEARTBEAT: Keep-alive message

Security Model

Trust Assumptions:

  • Cluster instances are trusted
  • Network is private/encrypted at lower layer (VPN/cloud private network)
  • STUN server is trusted

Authentication:

  • STUN: Key-based per cluster
  • Cluster: No authentication (trusted network assumption)

Recommendations for Production:

  1. Deploy in private network (VPC, VPN)
  2. Use firewall rules (whitelist instance IPs)
  3. Enable network encryption (WireGuard, AWS PrivateLink)
  4. Rotate STUN keys periodically
  5. Monitor access logs

NOT Secure For:

  • Public internet without VPN
  • Untrusted networks
  • Multi-tenant environments without isolation

License

Apache License 2.0

See LICENSE file for full text.


Support & Contributing

Getting Help

Contributing

Contributions welcome! Please:

  1. Fork repository
  2. Create feature branch
  3. Add tests if applicable
  4. Run type checking: mypy src/ --strict
  5. Run linting: ruff check src/
  6. Submit pull request

Development Setup

See Development section above.


Acknowledgments


Built with ❤️ for the ComfyUI community

About

Very early W.I.P of clustered ComfyUI inference.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published