In [42]:
import cv2
import numpy as np

def display(source, driving, generated=None):
    """Show the source image and the driving video using OpenCV."""
    # Convert source to BGR (for OpenCV)
    source = cv2.cvtColor(source, cv2.COLOR_RGB2BGR)

    # Iterate over the frames of the driving video
    for i in range(len(driving)):
        # Convert driving frame to BGR (for OpenCV)
        driving_frame = cv2.cvtColor(driving[i], cv2.COLOR_RGB2BGR)
        
        # Create the frame to be shown
        frame = np.concatenate([source, driving_frame], axis=1)

        # If generated frames are provided, concatenate them as well
        if generated is not None:
            generated_frame = cv2.cvtColor(generated[i], cv2.COLOR_RGB2BGR)
            frame = np.concatenate([source, driving_frame, generated_frame], axis=1)
        
        # Display the concatenated frame
        cv2.imshow('Source + Driving + Generated', frame)
        
        # Wait for a key press to move to the next frame
        key = cv2.waitKey(50)  # 50ms delay between frames
        if key == 27:  # Press 'Esc' to exit the video display
            break

    # Release the OpenCV window
    cv2.destroyAllWindows()

In [62]:
import cv2
import numpy as np

def display(source, driving, generated):
    """Show the source image and the driving video using OpenCV."""
    # Convert source to BGR (for OpenCV)
    if source.dtype != np.uint8:  # Check if the image is not 8-bit unsigned integer
        source = cv2.normalize(source, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)
    source = cv2.cvtColor(source, cv2.COLOR_RGB2BGR)

    # Iterate over the frames of the driving video
    for i in range(len(driving)):
        # Convert driving frame to BGR (for OpenCV)
        if driving[i].dtype != np.uint8:  # Check if the frame is not 8-bit unsigned integer
            driving[i] = cv2.normalize(driving[i], None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)
        driving_frame = cv2.cvtColor(driving[i], cv2.COLOR_RGB2BGR)
        
        # Create the frame to be shown
        frame = np.concatenate([source, driving_frame], axis=1)

        # If generated frames are provided, concatenate them as well
        if generated is not None:
            if generated[i].dtype != np.uint8:  # Check if the frame is not 8-bit unsigned integer
                generated[i] = cv2.normalize(generated[i], None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)
            generated_frame = cv2.cvtColor(generated[i], cv2.COLOR_RGB2BGR)
            frame = np.concatenate([source, driving_frame, generated_frame], axis=1)
        
        # Display the concatenated frame
        cv2.imshow('Source + Driving + Generated', frame)
        
        # Wait for a key press to move to the next frame
        key = cv2.waitKey(50)  # 50ms delay between frames
        if key == 27:  # Press 'Esc' to exit the video display
            break

    # Release the OpenCV window
    cv2.destroyAllWindows()

In [16]:
def load_checkpoints(config_path, checkpoint_path, cpu=False):

    with open(config_path) as f:
        config = yaml.load(f, Loader=yaml.FullLoader)

    generator = OcclusionAwareGenerator(**config['model_params']['generator_params'],
                                        **config['model_params']['common_params'])
    if not cpu:
        generator.cpu()

    kp_detector = KPDetector(**config['model_params']['kp_detector_params'],
                             **config['model_params']['common_params'])
    if not cpu:
        kp_detector.cpu()
    
    if cpu:
        checkpoint = torch.load(checkpoint_path, map_location=torch.device('cpu'))
    else:
        checkpoint = torch.load(checkpoint_path)
 
    generator.load_state_dict(checkpoint['generator'])
    kp_detector.load_state_dict(checkpoint['kp_detector'])
    
    if not cpu:
        generator = DataParallelWithCallback(generator)
        kp_detector = DataParallelWithCallback(kp_detector)

    generator.eval()
    kp_detector.eval()
    
    return generator, kp_detector

In [41]:
import torch
import yaml

def load_checkpoints(config_path, checkpoint_path, cpu=False):
    # Load configuration
    with open(config_path) as f:
        config = yaml.load(f, Loader=yaml.FullLoader)

    # Initialize models
    generator = OcclusionAwareGenerator(**config['model_params']['generator_params'],
                                        **config['model_params']['common_params'])
    kp_detector = KPDetector(**config['model_params']['kp_detector_params'],
                             **config['model_params']['common_params'])

    # Move models to CPU if specified or if CUDA is not available
    if cpu or not torch.cuda.is_available():
        device = torch.device('cpu')
        generator.to(device)
        kp_detector.to(device)
    else:
        device = torch.device('cuda')
        generator.to(device)
        kp_detector.to(device)

    # Load checkpoint
    if cpu or not torch.cuda.is_available():
        checkpoint = torch.load(checkpoint_path, map_location=torch.device('cpu'))
    else:
        checkpoint = torch.load(checkpoint_path)

    # Load state dicts into models
    generator.load_state_dict(checkpoint['generator'])
    kp_detector.load_state_dict(checkpoint['kp_detector'])

    # Wrap models with DataParallel if CUDA is available and not running on CPU
    if not cpu and torch.cuda.is_available():
        generator = DataParallelWithCallback(generator)
        kp_detector = DataParallelWithCallback(kp_detector)

    # Set models to evaluation mode
    generator.eval()
    kp_detector.eval()

    return generator, kp_detector

In [8]:
import matplotlib

matplotlib.use('Agg')

import os, sys
import yaml
from argparse import ArgumentParser
from time import gmtime, strftime
from shutil import copy

from frames_dataset import FramesDataset

from modules.generator import OcclusionAwareGenerator
from modules.discriminator import MultiScaleDiscriminator
from modules.keypoint_detector import KPDetector

import torch

from train import train
from reconstruction import reconstruction
from animate import animate

  """


In [19]:
generator, kp_detector = load_checkpoints(config_path='config/vox-256.yaml', 
                            checkpoint_path="D:\Siddhant\chicken.pth.tar")

  checkpoint_path="D:\Siddhant\chicken.pth.tar")
  checkpoint = torch.load(checkpoint_path, map_location=torch.device('cpu'))


In [15]:
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

Defaulting to user installation because normal site-packages is not writeable
Looking in indexes: https://download.pytorch.org/whl/cu118
Collecting torchaudio
  Using cached https://download.pytorch.org/whl/cu118/torchaudio-2.5.1%2Bcu118-cp312-cp312-win_amd64.whl (4.0 MB)
Collecting torch
  Downloading https://download.pytorch.org/whl/cu118/torch-2.5.1%2Bcu118-cp312-cp312-win_amd64.whl (2700.1 MB)
     ---------------------------------------- 0.0/2.7 GB ? eta -:--:--
     ---------------------------------------- 0.0/2.7 GB 5.7 MB/s eta 0:07:54
     ---------------------------------------- 0.0/2.7 GB 5.7 MB/s eta 0:07:54
     ---------------------------------------- 0.0/2.7 GB 1.6 MB/s eta 0:27:42
     ---------------------------------------- 0.0/2.7 GB 3.1 MB/s eta 0:14:44
     ---------------------------------------- 0.0/2.7 GB 2.6 MB/s eta 0:17:34
     ---------------------------------------- 0.0/2.7 GB 3.2 MB/s eta 0:14:13
     ---------------------------------------- 0.0/2.7 GB 3.2

ERROR: Exception:
Traceback (most recent call last):
  File "C:\Users\Acer\AppData\Roaming\Python\Python312\site-packages\pip\_vendor\urllib3\response.py", line 438, in _error_catcher
    yield
  File "C:\Users\Acer\AppData\Roaming\Python\Python312\site-packages\pip\_vendor\urllib3\response.py", line 561, in read
    data = self._fp_read(amt) if not fp_closed else b""
           ^^^^^^^^^^^^^^^^^^
  File "C:\Users\Acer\AppData\Roaming\Python\Python312\site-packages\pip\_vendor\urllib3\response.py", line 527, in _fp_read
    return self._fp.read(amt) if amt is not None else self._fp.read()
           ^^^^^^^^^^^^^^^^^^
  File "C:\Users\Acer\AppData\Roaming\Python\Python312\site-packages\pip\_vendor\cachecontrol\filewrapper.py", line 98, in read
    data: bytes = self.__fp.read(amt)
                  ^^^^^^^^^^^^^^^^^^^
  File "c:\Program Files\Python312\Lib\http\client.py", line 479, in read
    s = self.fp.read(amt)
        ^^^^^^^^^^^^^^^^^
  File "c:\Program Files\Python312\Lib\socke

In [43]:
def make_animation(source_image, driving_video, generator, kp_detector, relative=True, adapt_movement_scale=True, cpu=False):
    with torch.no_grad():
        predictions = []
        source = torch.tensor(source_image[np.newaxis].astype(np.float32)).permute(0, 3, 1, 2)
        if not cpu:
            source = source.cpu()
        driving = torch.tensor(np.array(driving_video)[np.newaxis].astype(np.float32)).permute(0, 4, 1, 2, 3)
        kp_source = kp_detector(source)
        kp_driving_initial = kp_detector(driving[:, :, 0])

        for frame_idx in tqdm(range(driving.shape[2])):
            driving_frame = driving[:, :, frame_idx]
            if not cpu:
                driving_frame = driving_frame.cpu()
            kp_driving = kp_detector(driving_frame)
            kp_norm = normalize_kp(kp_source=kp_source, kp_driving=kp_driving,
                                   kp_driving_initial=kp_driving_initial, use_relative_movement=relative,
                                   use_relative_jacobian=relative, adapt_movement_scale=adapt_movement_scale)
            out = generator(source, kp_source=kp_source, kp_driving=kp_norm)

            predictions.append(np.transpose(out['prediction'].data.cpu().numpy(), [0, 2, 3, 1])[0])
    return predictions

In [67]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import animation

def display(source, driving, generated):
    """Show the source image and the driving video using OpenCV"""
    
    ims = []
    for i in range(len(driving)):
        cols = [source]
        cols.append(driving[i])
        if generated is not None:
            cols.append(generated[i])
        
        # Concatenate the images horizontally
        combined_image = np.concatenate(cols, axis=1)
        
        # Display image using OpenCV
        cv2.imshow('Frame', combined_image)
        
        # Wait for a key press, 50ms per frame (similar to animation speed)
        if cv2.waitKey(50) & 0xFF == ord('q'):
            break

    cv2.destroyAllWindows()

# Example Usage:
# Assuming source_image, driving_video, and predictions are already loaded
display(source_image, driving_video, predictions)

In [86]:
def display(source, driving, generated):
    """Show the source image and the driving video using OpenCV"""
    fig, ax = plt.subplots(figsize=(8 + 4 * (generated is not None), 6))
    ims = []
    for i in range(len(driving)):
        cols = [source]
        cols.append(driving[i])
        if generated is not None:
            cols.append(generated[i])
        concatenated = np.concatenate(cols, axis=1)
        im = ax.imshow(concatenated, animated=True)
        ax.axis('off')
        ims.append([im])
    ani = animation.ArtistAnimation(fig, ims, interval=50, repeat_delay=1000)
    plt.close(fig)
    return ani

SyntaxError: invalid non-printable character U+00A0 (3685874850.py, line 17)

In [87]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML

def display(source, driving, generated=None):
    """Show the source image and the driving video using matplotlib."""
    # Create a figure and axis
    fig, ax = plt.subplots(figsize=(8 + 4 * (generated is not None), 6))
    ims = []

    # Iterate over the frames of the driving video
    for i in range(len(driving)):
        # Create a list of frames to concatenate
        cols = [source]
        cols.append(driving[i])
        if generated is not None:
            cols.append(generated[i])
        
        # Concatenate frames horizontally
        concatenated = np.concatenate(cols, axis=1)
        
        # Display the concatenated frame
        im = ax.imshow(concatenated, animated=True)
        ax.axis('off')  # Hide axes
        ims.append([im])  # Add the frame to the animation

    # Create the animation
    ani = animation.ArtistAnimation(fig, ims, interval=50, repeat_delay=1000)
    plt.close(fig)  # Close the figure to avoid displaying it twice
    return ani

In [88]:
ani = display(source_image, driving_video, predictions)
HTML(ani.to_html5_video())

RuntimeError: Requested MovieWriter (ffmpeg) not available

In [78]:
display(source_image, driving_video, predictions)

In [57]:
import imageio
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from skimage.transform import resize
from IPython.display import HTML
import warnings
warnings.filterwarnings("ignore")

%matplotlib inline

'%matplotlib' is not recognized as an internal or external command,
operable program or batch file.


In [79]:
# driving_video = "WhatsApp Video 2024-09-26 at 18.52.33_8bfe52b5.mp4"
# source_image = "D:\Siddhant\FOMM\dp.jpg"
source_image = imageio.imread("D:\Siddhant\FOMM\WhatsApp Image 2025-01-20 at 22.53.42_1e2eb54d.jpg")
driving_video = imageio.mimread("WhatsApp Video 2024-09-26 at 18.52.33_8bfe52b5.mp4")


The frame size for reading (480, 704) is different from the source frame size (704, 480).


In [80]:
#Resize inputs to 256x256
source_image = resize(source_image, (256, 256))[..., :3]
driving_video = [resize(frame, (256, 256))[..., :3] for frame in driving_video]

In [81]:
# Generate new animation
predictions = make_animation(source_image, driving_video, generator, kp_detector, relative=True,
                             adapt_movement_scale=True)

# Display the video
# Assuming source_image, driving_video, and predictions are already defined
# display(source_image, driving_video, predictions)

100%|██████████| 91/91 [00:23<00:00,  3.87it/s]


In [82]:
print("Source image shape:", source_image.shape)

Source image shape: (256, 256, 3)


In [69]:
print("Driving video shape:", len(driving_video), "frames with shape:", driving_video[0].shape)

Driving video shape: 91 frames with shape: (256, 256, 3)


In [70]:
print("Predictions shape:", len(predictions), "frames with shape:", predictions[0].shape)

Predictions shape: 91 frames with shape: (256, 256, 3)


In [83]:
print("Source image dtype:", source_image.dtype)
print("Driving video frame dtype:", driving_video[0].dtype)
print("Predictions frame dtype:", predictions[0].dtype)

Source image dtype: float64
Driving video frame dtype: float64
Predictions frame dtype: float32


In [84]:
import numpy as np

# Normalize and convert source image
if source_image.dtype != np.uint8:
    source_image = cv2.normalize(source_image, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)

# Normalize and convert driving video frames
driving_video = [cv2.normalize(frame, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U) for frame in driving_video]

# Normalize and convert prediction frames
predictions = [cv2.normalize(frame, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U) for frame in predictions]

In [85]:
print("Source image shape:", source_image.shape, "dtype:", source_image.dtype)
print("Driving video shape:", len(driving_video), "frames with shape:", driving_video[0].shape, "dtype:", driving_video[0].dtype)
print("Predictions shape:", len(predictions), "frames with shape:", predictions[0].shape, "dtype:", predictions[0].dtype)

Source image shape: (256, 256, 3) dtype: uint8
Driving video shape: 91 frames with shape: (256, 256, 3) dtype: uint8
Predictions shape: 91 frames with shape: (256, 256, 3) dtype: uint8


In [60]:
print("Is source image empty?", np.all(source_image == 0))
print("Is driving video frame empty?", np.all(driving_video[0] == 0))
print("Is predictions frame empty?", np.all(predictions[0] == 0))

Is source image empty? False
Is driving video frame empty? False
Is predictions frame empty? False


In [55]:
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML

In [48]:
def normalize_kp(kp_source, kp_driving, kp_driving_initial, adapt_movement_scale=False,
                 use_relative_movement=False, use_relative_jacobian=False):
    if adapt_movement_scale:
        source_area = ConvexHull(kp_source['value'][0].data.cpu().numpy()).volume
        driving_area = ConvexHull(kp_driving_initial['value'][0].data.cpu().numpy()).volume
        adapt_movement_scale = np.sqrt(source_area) / np.sqrt(driving_area)
    else:
        adapt_movement_scale = 1

    kp_new = {k: v for k, v in kp_driving.items()}

    if use_relative_movement:
        kp_value_diff = (kp_driving['value'] - kp_driving_initial['value'])
        kp_value_diff *= adapt_movement_scale
        kp_new['value'] = kp_value_diff + kp_source['value']

        if use_relative_jacobian:
            jacobian_diff = torch.matmul(kp_driving['jacobian'], torch.inverse(kp_driving_initial['jacobian']))
            kp_new['jacobian'] = torch.matmul(jacobian_diff, kp_source['jacobian'])

    return kp_new

In [47]:
import os
from tqdm import tqdm

import torch
from torch.utils.data import DataLoader

from frames_dataset import PairedDataset
from logger import Logger, Visualizer
import imageio
from scipy.spatial import ConvexHull
import numpy as np

from sync_batchnorm import DataParallelWithCallback