Download models from huggingface

In [None]:
!wget -P models https://huggingface.co/damo-vilab/modelscope-damo-text-to-video-synthesis/resolve/main/VQGAN_autoencoder.pth \
                https://huggingface.co/damo-vilab/modelscope-damo-text-to-video-synthesis/resolve/main/open_clip_pytorch_model.bin \
                https://huggingface.co/damo-vilab/modelscope-damo-text-to-video-synthesis/resolve/main/text2video_pytorch_model.pth

In [1]:
from os import path as osp
from typing import Any, Dict
import torch
import cv2
import torch
import torch.cuda.amp as amp
from einops import rearrange
from autoencoder import AutoencoderKL
from clip import FrozenOpenCLIPEmbedder
from diffusion import (GaussianDiffusion, beta_schedule)
from unet_sd import UNetSD
import tempfile
import cv2
import torch
from einops import rearrange

#set torch seed 
torch.manual_seed(0)

class TextToVideoSynthesis():

    def __init__(self, model_dir):

        self.device = torch.device('cuda') if torch.cuda.is_available() \
            else torch.device('cpu')

        # Initialize unet
        self.sd_model = UNetSD(
            in_dim=4,
            dim=320,
            y_dim=768,
            context_dim=1024,
            out_dim=4,
            dim_mult=[1, 2, 4, 4],
            num_heads=8,
            head_dim=64,
            num_res_blocks=2,
            attn_scales=[1, 0.5, 0.25],
            dropout=0.1,
            temporal_attention=True)

        with amp.autocast(enabled=True):
            self.sd_model.load_state_dict(torch.load(
            osp.join(model_dir, "text2video_pytorch_model.pth")))

        self.sd_model.eval()
        self.sd_model.to(self.device)
        self.sd_model.half()

        # Initialize diffusion
        betas = beta_schedule(
            'linear_sd',
            1000,
            init_beta=0.00085,
            last_beta=0.0120)
        self.diffusion = GaussianDiffusion(
            betas=betas,
            mean_type="eps",
            var_type="fixed_small",
            loss_type="mse",
            rescale_timesteps=False)

        # Initialize autoencoder
        ddconfig = {
            'double_z': True,
            'z_channels': 4,
            'resolution': 256,
            'in_channels': 3,
            'out_ch': 3,
            'ch': 128,
            'ch_mult': [1, 2, 4, 4],
            'num_res_blocks': 2,
            'attn_resolutions': [],
            'dropout': 0.0
        }
        self.autoencoder = AutoencoderKL(
            ddconfig, 4,
            osp.join(model_dir, "VQGAN_autoencoder.pth"))
        self.autoencoder.to('cpu')
        self.autoencoder.eval()
        self.autoencoder.half()

        # Initialize Open clip
        self.clip_encoder = FrozenOpenCLIPEmbedder(
            version=osp.join(model_dir,
                             "open_clip_pytorch_model.bin"),
            layer='penultimate')
        self.clip_encoder.to('cpu')
        self.clip_encoder.eval()
        self.clip_encoder.half()

    def forward(self, input: Dict[str, Any]):

        y = input['text_emb']
        zero_y = input['text_emb_zero']
        context = torch.cat([zero_y, y], dim=0).to(self.device)
        # synthesis
        with torch.no_grad():
            num_sample = 1  # here let b = 1
            max_frames = 48
            latent_h, latent_w = 32, 64
            with amp.autocast(enabled=True):
                if input['latent_input'] is None:
                    init_latent = torch.randn(num_sample, 4, max_frames, latent_h,
                                        latent_w).to(self.device)# 1 was max_frames
                    gs=50
                    steps = 50
                else:   
                    init_latent = input['latent_input']
                    gs = input['guidance']
                    steps = input['steps']
                    
                x0 = self.diffusion.ddim_sample_loop(
                    noise=init_latent,  # shape: b c f h w
                    model=self.sd_model,
                    model_kwargs=[{
                        'y':
                        context[1].unsqueeze(0).repeat(num_sample, 1, 1)
                    }, {
                        'y':
                        context[0].unsqueeze(0).repeat(num_sample, 1, 1)
                    }],
                    guide_scale=gs,
                    ddim_timesteps=steps,
                    eta=0.0)

                scale_factor = 0.18215
                video_data = 1. / scale_factor * x0
                bs_vd = video_data.shape[0]
                video_data = rearrange(video_data, 'b c f h w -> (b f) c h w')
                self.autoencoder.to(self.device)

                video = torch.zeros((video_data.shape[0], 3, latent_h*8, latent_w*8))
                for i in range(0, video_data.shape[0]):
                    video[i] = self.autoencoder.decode(video_data[i].unsqueeze(0)).detach().cpu().squeeze()
                self.autoencoder.to('cpu')
                video_data = rearrange(
                    video, '(b f) c h w -> b c f h w', b=bs_vd)
        return video_data.type(torch.float32), x0

class TextToVideoSynthesisPipeline():

    def __init__(self, model: str, **kwargs):
        self.model = TextToVideoSynthesis(model, **kwargs)

    def preprocess(self, input) -> Dict[str, Any]:
        self.model.clip_encoder.to(self.model.device)
        text_emb = self.model.clip_encoder(input['text'])
        text_emb_zero = self.model.clip_encoder('')
        self.model.clip_encoder.to('cpu')
        return {'text_emb': text_emb, 'text_emb_zero': text_emb_zero}

    def forward(self, input: Dict[str, Any]) -> Dict[str, Any]:
        video, latent = self.model.forward(input)
        return {'video': video}, latent

    def postprocess(self, inputs: Dict[str, Any],
                    **post_params) -> Dict[str, Any]:
        video = tensor2vid(inputs['video'])
        output_video_path = post_params.get('output_video', None)
        if output_video_path is None:
            output_video_path = tempfile.NamedTemporaryFile(suffix='.mp4').name

        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        h, w, c = video[0].shape
        video_writer = cv2.VideoWriter(
            output_video_path, fourcc, fps=8, frameSize=(w, h))
        for i in range(len(video)):
            img = cv2.cvtColor(video[i], cv2.COLOR_RGB2BGR)
            video_writer.write(img)
        return output_video_path

def tensor2vid(video, mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]):
    mean = torch.tensor(
        mean, device=video.device).reshape(1, -1, 1, 1, 1)  # ncfhw
    std = torch.tensor(
        std, device=video.device).reshape(1, -1, 1, 1, 1)  # ncfhw
    video = video.mul_(std).add_(mean)  # unnormalize back to [0,1]
    video.clamp_(0, 1)
    images = rearrange(video, 'i c f h w -> f h (i w) c')
    images = images.unbind(dim=0)
    images = [(image.numpy() * 255).astype('uint8')
              for image in images]  # f h w c
    return images


pipeline = TextToVideoSynthesisPipeline('models')


  from .autonotebook import tqdm as notebook_tqdm
2023-03-28 02:27:00.680151: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-03-28 02:27:02.676270: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /home/nepyope/.local/lib/python3.10/site-packages/cv2/../../lib64::/home/nepyope/.local/lib/python3.10/site-packages/nvidia/cuda_runtime/lib:/home/nepyope/.local/lib/python3.10/site-packages/nvidia/cuda_runtime/lib
2023-03-28 02:27:02.676392: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 

Text-to-video

In [2]:
inp = pipeline.preprocess({'text': 'A cow running a marathon'})
inp['latent_input'] = None
vid, latent_inp = pipeline.forward(inp)
out = pipeline.postprocess(vid, output_video='TTV.mp4')

Video-to-Video

In [6]:
inp = {}
inp['text_emb'], inp['text_emb_zero'] = pipeline.preprocess({'text': 'A cow running a marathon in hell'}).values()
inp['latent_input'] = latent_inp*0.1 + 0.9*torch.randn(1, 4, 48, 32,64).to('cuda')#ok
inp['guidance'] = 50
inp['steps'] = 50
vid, _ = pipeline.forward(inp)
out = pipeline.postprocess(vid, output_video='VTV.mp4')

Use real video as input (doesn't work (YET))

In [22]:
import PIL
import requests
import torch
#open  vtv/chicken.mp4

import cv2
import numpy as np
from PIL import Image
frames = []
cap = cv2.VideoCapture('vtv/chicken.mp4')
#open at 8 fps
cap.set(cv2.CAP_PROP_FPS, 8)
while(cap.isOpened()):
    ret, frame = cap.read()
    #convert to rgb    

    if ret == True:
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frames.append(Image.fromarray(frame).crop((0, 0, 512, 256)))
    else:
        break

f = frames[::3][:48]
import cv2
from PIL import Image
import numpy as np
encoded_frames = []

for frame in f:
    frame = torch.tensor(np.array(f[0])).unsqueeze(0).transpose(1,3).transpose(2,3)
    o = pipeline.model.autoencoder.to('cuda').encode(frame.cuda().half())
    o = o.sample().unsqueeze(2).cpu().detach()
    #normalize o based on mean and std of the latent space
    o = (o - o.mean()) / o.std()#the latents are good once normalized
    encoded_frames.append(o)

latent_inp = torch.cat(encoded_frames, dim=2).cuda().half()

pipeline.model.autoencoder.to('cpu')
del o
torch.cuda.empty_cache()
