## Copyright 2022 Google LLC. Double-click for license information.

In [1]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

## Prompt-to-Prompt with Stable Diffusion

In [2]:
from typing import Optional, Union, Tuple, List, Callable, Dict
import torch
from diffusers import StableDiffusionPipeline
from diffusers.models.attention_processor import AttnProcessor, Attention
import torch.nn.functional as nnf
import numpy as np
import sys
import abc
import fastcore.all as fc
import math
from skimage.draw import disk
import torch.nn.functional as F
from functools import partial
from utils.guidance_functions import *
import ptp_utils
import numpy
from compel import Compel
import diffusers
import matplotlib.pyplot as plt
from diffusers import DPMSolverMultistepScheduler, TextToVideoSDPipeline, UNet3DConditionModel
from diffusers import DiffusionPipeline
from diffusers.utils import export_to_video
from einops import rearrange
from huggingface_hub import snapshot_download
import sys

from modelscope.pipelines import pipeline
from modelscope.outputs import OutputKeys
import pathlib

Initializing the conversion map


ModuleNotFoundError: No module named 'seq_aligner'

For loading the Stable Diffusion using Diffusers, follow the instuctions https://huggingface.co/blog/stable_diffusion and update ```MY_TOKEN``` with your token.
Set ```LOW_RESOURCE``` to ```True``` for running on 12GB GPU.

In [None]:
MY_TOKEN = '<me>'
print(sys.path)

numpy.set_printoptions(threshold=sys.maxsize)
LOW_RESOURCE = False 
NUM_DIFFUSION_STEPS = 50
GUIDANCE_SCALE = 7.5
MAX_NUM_WORDS = 77
device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')

#ldm_stable = StableDiffusionPipeline.from_pretrained("CompVis/stable-diffusion-v1-4", use_auth_token=MY_TOKEN).to(device)
#tokenizer = ldm_stable.tokenizer
from PIL import Image
#pipe = DiffusionPipeline.from_pretrained("damo-vilab/text-to-video-ms-1.7b", torch_dtype=torch.float16, variant="fp16")

#pipe = DiffusionPipeline.from_pretrained("cerspense/zeroscope_v2_576w", torch_dtype=torch.float16)

#pipe.enable_model_cpu_offload()


pipe = DiffusionPipeline.from_pretrained("damo-vilab/text-to-video-ms-1.7b", torch_dtype=torch.float16, variant="fp16")
pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config)
pipe.enable_model_cpu_offload()

'''prompt = "darth vader surfing in the ocean"
video_frames = pipe(prompt, num_inference_steps=40, height=320, width=504, num_frames=20).frames
export_to_video(video_frames, "testit_shark.mp4")'''

['/home/smotamed/Desktop/Text-To-Video-Finetuning/prompt-to-prompt', '/home/smotamed/anaconda3/envs/text2video-finetune/lib/python310.zip', '/home/smotamed/anaconda3/envs/text2video-finetune/lib/python3.10', '/home/smotamed/anaconda3/envs/text2video-finetune/lib/python3.10/lib-dynload', '', '/home/smotamed/.local/lib/python3.10/site-packages', '/home/smotamed/anaconda3/envs/text2video-finetune/lib/python3.10/site-packages', '/tmp/tmp0j5dpecl']


'prompt = "darth vader surfing in the ocean"\nvideo_frames = pipe(prompt, num_inference_steps=40, height=320, width=504, num_frames=20).frames\nexport_to_video(video_frames, "testit_shark.mp4")'

## Prompt-to-Prompt Attnetion Controllers
Our main logic is implemented in the `forward` call in an `AttentionControl` object.
The forward is called in each attention layer of the diffusion model and it can modify the input attnetion weights `attn`.

`is_cross`, `place_in_unet in ("down", "mid", "up")`, `AttentionControl.cur_step` help us track the exact attention layer and timestamp during the diffusion iference.


In [None]:

def normalize(x): return (x - x.min()) / (x.max() - x.min())
def threshold_attention(attn, s=10):
    norm_attn = s * (normalize(attn) - 0.5)
    return normalize(norm_attn.sigmoid())

def get_shape(attn, s=20): 
    return threshold_attention(attn, s)

def get_size(attn): 
    return 1/attn.shape[-2] * threshold_attention(attn).sum((1,2)).mean()

def enlarge(x, scale_factor=1.0):
    x = x.view(1, -1, 1)
    assert scale_factor >= 1

    h = w = int(math.sqrt(x.shape[-2]))
    x = rearrange(x, 'n (h w) d -> n d h w', h=h)
    x = F.interpolate(x, scale_factor=scale_factor)
    new_h = new_w = x.shape[-1]
    x_l, x_r = (new_w//2) - w//2, (new_w//2) + w//2
    x_t, x_b = (new_h//2) - h//2, (new_h//2) + h//2
    x = x[:,:,x_t:x_b,x_l:x_r]
    return rearrange(x, 'n d h w -> n (h w) d', h=h) * scale_factor

def create_circular_mask(h, w, center=None, radius=None):

    if center is None: # use the middle of the image
        center = (int(w/2), int(h/2))
    if radius is None: # use the smallest distance between the center and image walls
        radius = min(center[0], center[1], w-center[0], h-center[1])

    Y, X = np.ogrid[:h, :w]
    dist_from_center = np.sqrt((X - center[0])**2 + (Y-center[1])**2)

    mask = dist_from_center <= radius
    return mask






In [None]:
def run_and_display(prompt, objects, guidance_func,  latent=None, run_baseline=False):
    videos, orig_video, x_t = ptp_utils.text2video(pipe, prompt, objects, guidance_func) #num_inference_steps=NUM_DIFFUSION_STEPS, guidance_scale=GUIDANCE_SCALE, low_resource=LOW_RESOURCE)
    
    return videos, orig_video, x_t

## Replacement edit

In [None]:
prompts = ["darth vader surfs in the ocean"]
tokens = [2]
objects = None
move = partial(roll_shape, direction='right', factor=.1)
guidance = partial(move_object_by_shape, shape_weight=0, appearance_weight = 0, position_weight=4, tau=move)
#objects = ['apple', 'banana']
object_to_edit = 'ball'
#resize = partial(resize, scale_factor=2)

#guidance = partial(resize_object_by_size, shape_weight=0.5, appearance_weight=0.5, size_weight=8, tau=8)
#ctl = AttentionStoreSam()
#controller = CustomAttnProcessor(attnstore=None, place_in_unet=None)
#show_cross_attention(controller, res=16, from_where=("up", "down"))
videos, orig_video, x_t  = run_and_display(prompts, objects = objects, guidance_func=guidance,  latent=None, run_baseline=False)
for video in [videos[0]]:

    video = rearrange(video.cpu(), "c f h w -> f h w c").clamp(-1, 1).add(1).mul(127.5)

    video = video.byte().cpu().numpy()
    export_to_video(video, "testit.mp4")
for video in [orig_video[0]]:

    video = rearrange(video.cpu(), "c f h w -> f h w c").clamp(-1, 1).add(1).mul(127.5)

    video = video.byte().cpu().numpy()
    
    export_to_video(video, "testit_orig.mp4")


    
    #export_to_video(video, "testit.mp4", 6)
'''for video in [videos[1]]:

    video = rearrange(video.cpu(), "c f h w -> f h w c").clamp(-1, 1).add(1).mul(127.5)

    video = video.byte().cpu().numpy()

    export_to_video(video, "testit.mp4", 6)'''

Lora successfully injected into UNet3DConditionModel.
tensor(999, device='cuda:0')
torch.Size([5, 1024])


  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
tensor([1])
frame None
20
20
20
20
20
20
20
20
20
20
20
20
20
20
20
20
loss : tensor(0.6992, device='cuda:0', dtype=torch.float16, grad_fn=<AddBackward0>)
tensor(979, device='cuda:0')
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
torch.Size([5, 1024])
tensor([1])
frame None
20
20
20
20
20
20
20
20
20
20
20
20
20
20
20
20
loss : tensor(0.6299, device='cuda:0', dtype=torch.float16, gra

Decoding to pixels...: 100%|██████████| 16/16 [00:01<00:00,  9.33frame/s]
Decoding to pixels...: 100%|██████████| 16/16 [00:00<00:00, 52.70frame/s]


'for video in [videos[1]]:\n\n    video = rearrange(video.cpu(), "c f h w -> f h w c").clamp(-1, 1).add(1).mul(127.5)\n\n    video = video.byte().cpu().numpy()\n\n    export_to_video(video, "testit.mp4", 6)'