## First Order Model

This is a demonstration of ["First Order Model"](https://github.com/AliaksandrSiarohin/first-order-model), which applies animation models to images.

**Even on CPU, this will train vor 15-30 minutes**. I hope this makes it accessible to all.

### Environment

#### General
* For video creation, make sure ffmpeg is installed (apt install ffmpeg, brew install ffmpeg, windows idk)
* Git needs to be installed for the notebook to clone code

**Make sure to run this notebook in the correct environment:**

#### Linux and Mac
```
conda env create -f fom.yml
conda activate fom
jupyter notebook
```

#### Windows
```
conda env create -f fom_windows.yml
conda activate fom
jupyter notebook
```

### Troubleshooting
If you run into trouble, make sure to *include your OS, OS Version and Python in the bug report in the forum.*

In [None]:
# This cell only has an effect once: NB_PATH saves the path the notebook is started in
# It's important later down for absolute paths to resources in res/ folder
from pathlib import Path

NB_PATH = None
if not NB_PATH:
    NB_PATH = Path.cwd()
    print("Notebook path: {}".format(NB_PATH))

In [None]:
import sys
assert 'fom' in sys.executable, 'You are not running this notebook in the fom environment'

print("Running the correct environment.\nPython interpreter: {}".format(sys.executable))

In [None]:
# Download pretrained model on celebrity faces (VoxCeleb)
import requests
from tqdm import tqdm_notebook as tqdm

url = 'https://myshare.uni-osnabrueck.de/f/86c0195eb2e74845b77d/?dl=1'
filename = NB_PATH / 'res/vox-cpk.pth.tar'

def download(url, fname):
    response = requests.get(url, stream=True)
    total_size_in_bytes= int(response.headers.get('content-length', 0))
    block_size = 1024 #1 Kibibyte
    progress_bar = tqdm(total=total_size_in_bytes, unit='iB', unit_scale=True)
    
    with open(filename, 'wb') as file:
        for data in response.iter_content(block_size):
            progress_bar.update(len(data))
            file.write(data)
    
    progress_bar.close()

    if total_size_in_bytes != 0 and progress_bar.n != total_size_in_bytes:
        print("ERROR, something went wrong. Please manually delete any residual download files")
    
if not filename.exists():
    print("Downloading Pretrained weight on VOX. Be patient...It's 300mb in size.")
    download(url, filename)

print("Download complete")

In [None]:
import torch

USE_GPU = torch.cuda.is_available() and torch.cuda.device(torch.cuda.current_device())
if USE_GPU:
    print("Found GPU: {}".format(torch.cuda.get_device_name(torch.cuda.current_device())))
else:
    print("Using CPU. It will bleed. RIP")

In [None]:
from IPython.display import HTML

HTML("""
<div>
<h2>Chose a Driving Video and Source Image in the next section(s)</h2>
<table>
<tr>
  <th>leo.mp4</th>
  <th>jon_fookin_snow.png</th>
</tr>
<tr>
  <td>
  <video width="350" height="350" controls autoplay loop>
    <source src="{}" type="video/mp4">
  </video>
  </td>
  <td>
  <img src="{}" alt="Girl in a jacket" width="350" height="350">
  </td>
</tr>
<tr>
  <th>the_donald.mp4</th>
  <th>still.png</th>
</tr>
<tr>

  <td>
  <video width="350" height="350" controls autoplay loop>
    <source src="{}" type="video/mp4">
  </video>
  </td>
  <td>
  <img src="{}" alt="Girl in a jacket" width="350" height="350">
  </td>
</tr>
</table>
</div>
""".format('res/leo.mp4', 'res/jon_fookin_snow.png', 'res/the_donald.mp4', 'res/still.png'))

## Configuration
The next cell contains configuration options for this notebook

Leaving everything unchanged will create a demo with the default driving video and source image. 

**Change `DRIVING_VIDEO` and `SOURCE_IMAGE` (but leave `RES_PREFIX` untouched) in the `Input Data` Section in the next cell**, if you want to experiment. Map Leo di Caprio onto Jon Snow, for example.

There are many options. But the vast majority of them need to stay untouched. You can play with parameters with the `config.yml` file in folder `res`.

#### Input Data Extraction
If you want to use your own input data (not recommended, you will need additional software), follow [these steps](https://github.com/AliaksandrSiarohin/first-order-model#animation-demo) and adjust the parameters `FIND_BEST_FRAME` and `BEST_FRAME`

In [None]:
# Last check for existing files
from pathlib import Path

# Prefix to all iamges and videos in /res folder
# This needs to be relative, so the HTML elements can load the data
RES_PREFIX= Path('res')
OUT_PREFIX= Path('out')

# Output path, absolute (Note: can't use .absolute() in case working directory changed)
OUT=NB_PATH / 'out'
OUT.mkdir(exist_ok=True)

# Input Data
DRIVING_VIDEO=RES_PREFIX / 'the_donald.mp4' # Driving Video to use
SOURCE_IMAGE=RES_PREFIX / 'jon_fookin_snow.png' # Source image to use

# Configuration
CONFIG=NB_PATH / RES_PREFIX / 'config.yml' # Path to model config
CKPT=NB_PATH / RES_PREFIX / 'vox-cpk.pth.tar' # path to checkpoint to restore
CPU=not USE_GPU

# Ouput
OUTPUT_VIDEO=OUT_PREFIX / 'result.mp4'
OVERWRITE=True # Overwrite existing OUTPUT_VIDEO

# Input Data Extraction
FIND_BEST_FRAME=False
BEST_FRAME=None

RELATIVE=True # use relative or absolute keypoint coordinates
ADAPT_SCALE=True # adapt movement scale based on convex hull of keypoints

def path_assert(Path):
    assert Path.exists(), "{} does not exist!".format(Path) 

path_assert(NB_PATH / SOURCE_IMAGE)
path_assert(NB_PATH / DRIVING_VIDEO)
path_assert(NB_PATH / RES_PREFIX / 'leo.mp4')
path_assert(NB_PATH / RES_PREFIX / 'jon_fookin_snow.png')
path_assert(NB_PATH / RES_PREFIX / 'the_donald.mp4')
path_assert(NB_PATH / RES_PREFIX / 'still.png')
path_assert(OUT)
path_assert(CONFIG)
path_assert(CKPT)
if not OVERWRITE and Path(NB_PATH / OUTPUT_VIDEO).exists():
    raise Exception("Config would overwride existing output file!")

In [None]:
# Function used to determine whether change of working directory has already happened
def in_repo():
    return Path('sync_batchnorm').exists() and Path('modules').exists() and Path('data').exists()

In [None]:
# Clone necessary code
assert(not in_repo())
!git clone https://github.com/AliaksandrSiarohin/first-order-model

In [None]:
# change working directory to git repo (windows supported too, !cd only works in unix based systems )
import os
from pathlib import Path

# if not in repo, change working directory of notebook
if not in_repo():
    global NB_PATH, CODE_PATH
    NB_PATH = Path.cwd() 
    CODE_PATH = NB_PATH / 'first-order-model'
    os.chdir(CODE_PATH)
    print("Working directory changed to {}".format(CODE_PATH))    

assert(in_repo() and NB_PATH and CODE_PATH)

In [None]:
from IPython.display import HTML

HTML("""
<div>

<h2>You chose</h2>

<table>
<tr>
  <th>Driving Video</th>
  <th>Source Image</th>
</tr>
<tr>
  <td>
  <video width="350" height="350" controls autoplay loop>
    <source src="{}" type="video/mp4">
  </video>
  </td>
  <td>
  <img src="{}" alt="Girl in a jacket" width="350" height="350">
  </td>
</tr>
</table>

<h4>Run the next cell to map the movement of the driving video onto the source image. The cell after that handles visualizing the results</h4>
<div>Note: there will be deprecation warnings. These are normal</div>

</div>
""".format(DRIVING_VIDEO, SOURCE_IMAGE))

In [None]:
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning) 

import matplotlib
matplotlib.use('Agg')
import os, sys
import yaml
from argparse import ArgumentParser
from tqdm import tqdm

import imageio
import numpy as np
from skimage.transform import resize
from skimage import img_as_ubyte
import torch
from sync_batchnorm import DataParallelWithCallback

from modules.generator import OcclusionAwareGenerator
from modules.keypoint_detector import KPDetector
from animate import normalize_kp
from scipy.spatial import ConvexHull

ABS_SOURCE_IMAGE = NB_PATH / SOURCE_IMAGE
ABS_DRIVING_VIDEO = NB_PATH / DRIVING_VIDEO
ABS_OUTPUT_VIDEO = NB_PATH / OUTPUT_VIDEO

if sys.version_info[0] < 3:
    raise Exception("You must use Python 3 or higher. Recommended version is Python 3.7")

def load_checkpoints(config_path, checkpoint_path, cpu=False):

    with open(config_path) as f:
        config = yaml.load(f)

    generator = OcclusionAwareGenerator(**config['model_params']['generator_params'],
                                        **config['model_params']['common_params'])
    if not cpu:
        generator.cuda()

    kp_detector = KPDetector(**config['model_params']['kp_detector_params'],
                             **config['model_params']['common_params'])
    if not cpu:
        kp_detector.cuda()
    
    if cpu:
        checkpoint = torch.load(checkpoint_path, map_location=torch.device('cpu'))
    else:
        checkpoint = torch.load(checkpoint_path)
 
    generator.load_state_dict(checkpoint['generator'])
    kp_detector.load_state_dict(checkpoint['kp_detector'])
    
    if not cpu:
        generator = DataParallelWithCallback(generator)
        kp_detector = DataParallelWithCallback(kp_detector)

    generator.eval()
    kp_detector.eval()
    
    return generator, kp_detector


def make_animation(source_image, driving_video, generator, kp_detector, relative=True, adapt_movement_scale=True, cpu=False):
    with torch.no_grad():
        predictions = []
        source = torch.tensor(source_image[np.newaxis].astype(np.float32)).permute(0, 3, 1, 2)
        if not cpu:
            source = source.cuda()
        driving = torch.tensor(np.array(driving_video)[np.newaxis].astype(np.float32)).permute(0, 4, 1, 2, 3)
        kp_source = kp_detector(source)
        kp_driving_initial = kp_detector(driving[:, :, 0])

        for frame_idx in tqdm(range(driving.shape[2])):
            driving_frame = driving[:, :, frame_idx]
            if not cpu:
                driving_frame = driving_frame.cuda()
            kp_driving = kp_detector(driving_frame)
            kp_norm = normalize_kp(kp_source=kp_source, kp_driving=kp_driving,
                                   kp_driving_initial=kp_driving_initial, use_relative_movement=relative,
                                   use_relative_jacobian=relative, adapt_movement_scale=adapt_movement_scale)
            out = generator(source, kp_source=kp_source, kp_driving=kp_norm)

            predictions.append(np.transpose(out['prediction'].data.cpu().numpy(), [0, 2, 3, 1])[0])
    return predictions

def find_best_frame(source, driving, cpu=False):
    import face_alignment

    def normalize_kp(kp):
        kp = kp - kp.mean(axis=0, keepdims=True)
        area = ConvexHull(kp[:, :2]).volume
        area = np.sqrt(area)
        kp[:, :2] = kp[:, :2] / area
        return kp

    fa = face_alignment.FaceAlignment(face_alignment.LandmarksType._2D, flip_input=True,
                                      device='cpu' if cpu else 'cuda')
    kp_source = fa.get_landmarks(255 * source)[0]
    kp_source = normalize_kp(kp_source)
    norm  = float('inf')
    frame_num = 0
    for i, image in tqdm(enumerate(driving)):
        kp_driving = fa.get_landmarks(255 * image)[0]
        kp_driving = normalize_kp(kp_driving)
        new_norm = (np.abs(kp_source - kp_driving) ** 2).sum()
        if new_norm < norm:
            norm = new_norm
            frame_num = i
    return frame_num

if __name__ == "__main__":
    source_image = imageio.imread(ABS_SOURCE_IMAGE)
    video_reader = imageio.get_reader(ABS_DRIVING_VIDEO)
    fps = video_reader.get_meta_data()['fps']
    driving_video = []
    try:
        for im in video_reader:
            driving_video.append(im)
    except RuntimeError:
        pass
    video_reader.close()

    source_image = resize(source_image, (256, 256))[..., :3]
    driving_video = [resize(frame, (256, 256))[..., :3] for frame in driving_video]
    
    generator, kp_detector = load_checkpoints(config_path=CONFIG, checkpoint_path=CKPT, cpu=CPU)

    if FIND_BEST_FRAME or BEST_FRAME is not None:
        i = BEST_FRAME if BEST_FRAME is not None else find_best_frame(source_image, driving_video, cpu=CPU)
        print ("Best frame: " + str(i))
        driving_forward = driving_video[i:]
        driving_backward = driving_video[:(i+1)][::-1]
        predictions_forward = make_animation(source_image, driving_forward, generator, kp_detector, relative=RELATIVE, adapt_movement_scale=ADAPT_SCALE, cpu=CPU)
        predictions_backward = make_animation(source_image, driving_backward, generator, kp_detector, relative=RELATIVE, adapt_movement_scale=ADAPT_SCALE, cpu=CPU)
        predictions = predictions_backward[::-1] + predictions_forward[1:]
    else:
        predictions = make_animation(source_image, driving_video, generator, kp_detector, relative=RELATIVE, adapt_movement_scale=ADAPT_SCALE, cpu=CPU)
    
    imageio.mimsave(ABS_OUTPUT_VIDEO, [img_as_ubyte(frame) for frame in predictions], format='.mp4', fps=fps)
    print("============================\nOutput video saved to: {}. Viz result in next cell".format(ABS_OUTPUT_VIDEO))

Run the next cell to visualize the results!

In [None]:
from IPython.display import HTML
 
HTML("""
<div>
<table>
<tr>
  <th>Driving Video</th>
  <th>Source Image</th>
  <th>Result Video</th>
</tr>
  <td>
  <video width="250" height="250" controls autoplay loop>
    <source src="{}" type="video/mp4">
  </video>
  </td>
  <td>
  <img src="{}" width="250" height="250">
  </td>
  <td>
  <video width="250" height="250" controls autoplay loop>
    <source src="{}" type="video/mp4">
  </video>
  </td>
  </tr>
</table>
<h5>Note: There might be a slight delay between videos</h5>
</div>
""".format(DRIVING_VIDEO, SOURCE_IMAGE, OUTPUT_VIDEO))