# **WAN Fusion X IMAGE TO VIDEO WITH FAST Q3 GGUF MODEL**
- QuantStack/Wan2.1_I2V_14B_FusionX-GGUF
- The videos are generated at 16fps. You can use the `Frame Interpolation` notebook in this github repository (https://github.com/Isi-dev/Google-Colab_Notebooks) to increase it.

In [None]:
# @title Prepare Environment
!pip install --upgrade --quiet torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
%cd /content

!pip install -q torchsde einops diffusers accelerate xformers
!pip install av

%cd /content

# 1. Clone the OFFICIAL ComfyUI repository for maximum stability
!git clone https://github.com/comfyanonymous/ComfyUI.git
%cd /content/ComfyUI
!pip install -r requirements.txt

# 2. Install the community-recommended GGUF custom node from city96
%cd /content/ComfyUI/custom_nodes
!git clone https://github.com/city96/ComfyUI-GGUF.git
# The cloned directory has a hyphen, but Python needs an underscore for imports.
# We rename the directory to make it a valid Python package.
!mv /content/ComfyUI/custom_nodes/ComfyUI-GGUF /content/ComfyUI/custom_nodes/ComfyUI_GGUF
# Now, continue with the custom node's requirements installation
%cd /content/ComfyUI/custom_nodes/ComfyUI_GGUF
!pip install -r requirements.txt
%cd /content/ComfyUI

!apt -y install -qq aria2 ffmpeg

!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/QuantStack/Wan2.1_I2V_14B_FusionX-GGUF/resolve/main/Wan2.1_I2V_14B_FusionX-Q3_K_M.gguf -d /content/ComfyUI/models/unet -o Wan2.1_I2V_14B_FusionX-Q3_K_M.gguf

encoder_filename = "umt5-xxl-encoder-Q8_0.gguf"
#!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/Comfy-Org/Wan_2.1_ComfyUI_repackaged/resolve/main/split_files/text_encoders/umt5_xxl_fp8_e4m3fn_scaled.safetensors -d /content/ComfyUI/models/text_encoders -o umt5_xxl_fp8_e4m3fn_scaled.safetensors
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/city96/umt5-xxl-encoder-gguf/resolve/main/umt5-xxl-encoder-Q8_0.gguf -d /content/ComfyUI/models/text_encoders -o umt5-xxl-encoder-Q8_0.gguf

!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/Comfy-Org/Wan_2.1_ComfyUI_repackaged/resolve/main/split_files/vae/wan_2.1_vae.safetensors -d /content/ComfyUI/models/vae -o wan_2.1_vae.safetensors
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/Comfy-Org/Wan_2.1_ComfyUI_repackaged/resolve/main/split_files/clip_vision/clip_vision_h.safetensors -d /content/ComfyUI/models/clip_vision -o clip_vision_h.safetensors

import torch
import numpy as np
from PIL import Image
import gc
import sys
import random
import os
import imageio
import subprocess
from google.colab import files
from IPython.display import display, HTML, Image as IPImage
from google.colab import drive

sys.path.insert(0, '/content/ComfyUI')

from comfy import model_management

from nodes import (
    CheckpointLoaderSimple,
    CLIPLoader,
    CLIPTextEncode,
    VAEDecode,
    VAELoader,
    KSampler,
    UNETLoader,
    LoadImage,
    CLIPVisionLoader,
    CLIPVisionEncode,
    LoraLoader
)

from custom_nodes.ComfyUI_GGUF.nodes import UnetLoaderGGUF, CLIPLoaderGGUF

from comfy_extras.nodes_model_advanced import ModelSamplingSD3
from comfy_extras.nodes_wan import WanImageToVideo

import os
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'

drive.mount('/content/drive')
!mkdir -p /content/ComfyUI/models/loras

lora_filename = "sample_lora.safetensors"
lora_strength_value = 1.0
!cp "/content/drive/MyDrive/AI/LoRAs/Wan2.1/{lora_filename}" "/content/ComfyUI/models/loras/"

def clear_memory():
    # This is the command that tells the ComfyUI backend to release its grip on the models.
    from comfy import model_management
    model_management.unload_all_models()

    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        torch.cuda.ipc_collect()
    for obj in list(globals().values()):
        if torch.is_tensor(obj) or (hasattr(obj, "data") and torch.is_tensor(obj.data)):
            del obj
    gc.collect()

def save_as_mp4(images, filename_prefix, fps, output_dir="/content/ComfyUI/output"):
    os.makedirs(output_dir, exist_ok=True)
    output_path = f"{output_dir}/{filename_prefix}.mp4"

    frames = [(img.cpu().numpy() * 255).astype(np.uint8) for img in images]

    with imageio.get_writer(output_path, fps=fps) as writer:
        for frame in frames:
            writer.append_data(frame)

    return output_path

def save_as_webp(images, filename_prefix, fps, quality=90, lossless=False, method=4, output_dir="/content/ComfyUI/output"):
    """Save images as animated WEBP using imageio."""
    os.makedirs(output_dir, exist_ok=True)
    output_path = f"{output_dir}/{filename_prefix}.webp"


    frames = [(img.cpu().numpy() * 255).astype(np.uint8) for img in images]


    kwargs = {
        'fps': int(fps),
        'quality': int(quality),
        'lossless': bool(lossless),
        'method': int(method)
    }

    with imageio.get_writer(
        output_path,
        format='WEBP',
        mode='I',
        **kwargs
    ) as writer:
        for frame in frames:
            writer.append_data(frame)

    return output_path

def save_as_webm(images, filename_prefix, fps, codec="vp9", quality=32, output_dir="/content/ComfyUI/output"):
    """Save images as WEBM using imageio."""
    os.makedirs(output_dir, exist_ok=True)
    output_path = f"{output_dir}/{filename_prefix}.webm"


    frames = [(img.cpu().numpy() * 255).astype(np.uint8) for img in images]


    kwargs = {
        'fps': int(fps),
        'quality': int(quality),
        'codec': str(codec),
        'output_params': ['-crf', str(int(quality))]
    }

    with imageio.get_writer(
        output_path,
        format='FFMPEG',
        mode='I',
        **kwargs
    ) as writer:
        for frame in frames:
            writer.append_data(frame)

    return output_path

def save_as_image(image, filename_prefix, output_dir="/content/ComfyUI/output"):
    """Save single frame as PNG image."""
    os.makedirs(output_dir, exist_ok=True)
    output_path = f"{output_dir}/{filename_prefix}.png"

    frame = (image.cpu().numpy() * 255).astype(np.uint8)

    Image.fromarray(frame).save(output_path)

    return output_path


def upload_image():
    """Handle image upload in Colab and store in /content/ComfyUI/input/"""
    from google.colab import files
    import os
    import shutil

    os.makedirs('/content/ComfyUI/input', exist_ok=True)

    uploaded = files.upload()

    # Move each uploaded file to ComfyUI input directory
    for filename in uploaded.keys():
        src_path = f'/content/ComfyUI/{filename}'
        dest_path = f'/content/ComfyUI/input/{filename}'

        shutil.move(src_path, dest_path)
        print(f"Image saved to: {dest_path}")
        return dest_path

    return None

def generate_video(
    image_path: str = None,
    positive_prompt: str = "a cute cat playing with a ball of yarn",
    negative_prompt: str = "overexposed, blurred details, subtitles, worst quality, low quality, JPEG compression artifacts, ugly, incomplete, extra fingers, poorly drawn hands, poorly drawn faces, deformed, disfigured, fused fingers, still picture, three legs, walking backwards, watermark, text, signature",
    width: int = 832,
    height: int = 480,
    seed: int = 82628696717253,
    steps: int = 4,
    cfg_scale: float = 1.0,
    sampler_name: str = "uni_pc",
    scheduler: str = "simple",
    frames: int = 33,
    fps: int = 16,
    output_format: str = "mp4"
):
    import pdb

    # Initialize nodes
    unet_loader = UnetLoaderGGUF()
    model_sampling = ModelSamplingSD3()
    #clip_loader = CLIPLoader()
    clip_loader = CLIPLoaderGGUF()
    clip_encode_positive = CLIPTextEncode()
    clip_encode_negative = CLIPTextEncode()
    vae_loader = VAELoader()
    clip_vision_loader = CLIPVisionLoader()
    clip_vision_encode = CLIPVisionEncode()
    load_image = LoadImage()
    wan_image_to_video = WanImageToVideo()
    ksampler = KSampler()
    vae_decode = VAEDecode()
    load_lora = LoraLoader()

    with torch.inference_mode():
        # --- Stage 1: Patch the CLIP Model ---
        print(f"Loading Text_Encoder: {encoder_filename}")
        clip = clip_loader.load_clip(encoder_filename, "wan")[0]

        print("Loading Unet Model...")
        model = unet_loader.load_unet("Wan2.1_I2V_14B_FusionX-Q3_K_M.gguf")[0]

        print(f"Applying LoRA '{lora_filename}' to CLIP model...")
        # We call lora_loader, returns a tuple: (patched_model, patched_clip).
        patched_model, patched_clip = load_lora.load_lora(
            model=model,
            clip=clip,
            lora_name=lora_filename,
            strength_model=lora_strength_value,
            strength_clip=lora_strength_value
        )

        # --- Stage 2: Encode prompts using the now-patched CLIP ---
        print("Encoding prompts with LoRA-aware CLIP...")
        positive = clip_encode_positive.encode(patched_clip, positive_prompt)[0]
        negative = clip_encode_negative.encode(patched_clip, negative_prompt)[0]

        #pdb.set_trace() #for checking VRAM usage at this point

        # --- Stage 3: Purge CLIP from VRAM ---
        # and can now delete the original, un-patched UNET model
        print("Releasing CLIP model from VRAM...")
        del clip
        del patched_clip
        del model
        # Aggressive cleanup: The loaders and original models are no longer needed.
        del load_lora, unet_loader, clip_loader
        del clip_encode_positive, clip_encode_negative
        torch.cuda.empty_cache()
        gc.collect()

        # --- Stage 4: Prepare Image, VAE, and Latents ---
        # (This section is the same as before)
        if image_path is None: image_path = upload_image()
        if image_path is None: return

        loaded_image = load_image.load_image(image_path)[0]
        clip_vision = clip_vision_loader.load_clip("clip_vision_h.safetensors")[0]
        clip_vision_output = clip_vision_encode.encode(clip_vision, loaded_image, "none")[0]
        del clip_vision
        gc.collect()

        vae = vae_loader.load_vae("wan_2.1_vae.safetensors")[0]

        positive_out, negative_out, latent = wan_image_to_video.encode(
            positive, negative, vae, width, height, frames, 1, loaded_image, clip_vision_output
        )

        # We patch the *already LoRA-patched* model again with ModelSampling
        final_model = model_sampling.patch(patched_model, 8)[0]

        # Delete everything not needed for the final KSampler step.
        del positive, negative, loaded_image, clip_vision_output
        del load_image, clip_vision_loader, clip_vision_encode, vae_loader, wan_image_to_video
        del patched_model, model_sampling
        gc.collect()

        # --- Stage 5: Generate the video ---
        #pdb.set_trace() #for checking VRAM usage at this point
        print("Generating video...")
        sampled = ksampler.sample(
            model=final_model, # Use the final patched UNET
            seed=seed,
            steps=steps,
            cfg=cfg_scale,
            sampler_name=sampler_name,
            scheduler=scheduler,
            positive=positive_out,
            negative=negative_out,
            latent_image=latent
        )[0]

        del final_model
        torch.cuda.empty_cache()
        gc.collect()

        # --- Stage 6: Decode and save ---
        # (This section is the same as before)
        try:
            decoded = vae_decode.decode(vae, sampled)[0]
            del vae
            gc.collect()
            # ... rest of saving logic ...
            if frames == 1:
                output_path = save_as_image(decoded[0], "ComfyUI")
                display(IPImage(filename=output_path))
            else:
                if output_format.lower() == "mp4":
                    output_path = save_as_mp4(decoded, "ComfyUI", fps)
                else: # Defaulting to webm for any other case
                    output_path = save_as_webm(decoded, "ComfyUI", fps=fps, codec="vp9", quality=10)
                display_video(output_path)

        except Exception as e:
            print(f"Error during decoding/saving: {str(e)}")
            raise
        finally:
            # Explicitly delete all node objects
            del ksampler, vae_decode
            clear_memory()

def display_video(video_path):
    from IPython.display import HTML
    from base64 import b64encode

    video_data = open(video_path,'rb').read()

    # Determine MIME type based on file extension
    if video_path.lower().endswith('.mp4'):
        mime_type = "video/mp4"
    elif video_path.lower().endswith('.webm'):
        mime_type = "video/webm"
    elif video_path.lower().endswith('.webp'):
        mime_type = "image/webp"
    else:
        mime_type = "video/mp4"  # default

    data_url = f"data:{mime_type};base64," + b64encode(video_data).decode()

    display(HTML(f"""
    <video width=512 controls autoplay loop>
        <source src="{data_url}" type="{mime_type}">
    </video>
    """))

print("✅ Environment Setup Complete!")

In [None]:
# @title Generate Video

positive_prompt = "a black kitten playing with a ball of yarn" # @param {"type":"string"}
negative_prompt = "overexposed, blurred details, subtitles, worst quality, low quality, JPEG compression artifacts, ugly, incomplete, extra fingers, poorly drawn hands, poorly drawn faces, deformed, disfigured, fused fingers, still picture, three legs, walking backwards, watermark, text, signature" # @param {"type":"string"}
width = 480 # @param {"type":"number"}
height = 832 # @param {"type":"number"}
seed = 42 # @param {"type":"integer"}
steps = 4 # @param {"type":"integer", "min":1, "max":100}
cfg_scale = 1.0 # @param {"type":"number", "min":1, "max":20}
sampler_name = "uni_pc" # @param ["uni_pc", "euler", "dpmpp_2m", "ddim", "lms"]
scheduler = "simple" # @param ["simple", "normal", "karras", "exponential"]
frames = 61 # @param {"type":"integer", "min":1, "max":121}
fps = 16 # @param {"type":"integer", "min":1, "max":60}
output_format = "mp4" # @param ["mp4", "webm"]

import random
seed = seed if seed != 0 else random.randint(0, 2**32 - 1)
print(f"Using seed: {seed}")

# with torch.inference_mode():
generate_video(
    image_path=None,
    positive_prompt=positive_prompt,
    negative_prompt=negative_prompt,
    width=width,
    height=height,
    seed=seed,
    steps=steps,
    cfg_scale=cfg_scale,
    sampler_name=sampler_name,
    scheduler=scheduler,
    frames=frames,
    fps=fps,
    output_format=output_format
)
clear_memory()

In [None]:
# @title Clear Memory in case of stopping execution
clear_memory()