# Face Swapping Technique with Auto Encoder

### What is Face Swapping?

Face Swapping is the technique of cropping or copying the face of one person and superimposing it on another persons face.
Something like this:

![face-swap-baby-adult](images/01.jpg)

### Why do we use Data Science for this instead of an image editing software?

If face swap was to be accomplished one could use any image editing software to crop the face of one person and try to superimpose it over another.<br>
Assuming it takes 5 minutes to work on one image, consider a video of 1 minute with 24 fps (frames per second range varies from 23-30), this would make 1440 images (60 seconds x 24 fps) and the time spent to replace the face for these would be **5 days** (1440 images x 5 minutes=7200 minutes / 60 mins = 120 hours / 24 hours = 5 days) not including room for human error.
<br><br>
Wouldn't it be better for a software program to do this?

### Following are a few good and bad applications of this technique:
- Fun Picture Meme: Swapping your friends face with their dog can be funny at times
- News Videos: Fake news and people going all crazy about how someone said something
- Deepfakes (Needless to say)
- Movies: An actors stunt could be face swapped with a stunt double saving a lot of money for movie makers
- Fun Video Meme: What would Chris Hemsworth's (Thor) face look like acting as Robert Downey Jr. in Iron Man

Our focus is going to be the last application.

### Why Auto Encoder?
![mushroom encoder](images/02.png)

### Dataset:
There are two datasets for training our model, one is Chris Hemsworth’s face images & the second is Robert Downey Jr’s face images. The number of images are in the range 350 – 450. Each image is of dimension 256 x 256 and have been picked on the basis of clarity from a set of 2000 images. These images were obtained by scraping the internet. Originally the images obtained had other people in it and therefore the faces had to be extracted.


![dataset](images/03.jpg)

### Code for extracting faces
```python
import cv2
from pathlib import Path
from lib.cli import DirectoryProcessor
from plugins.PluginLoader import PluginLoader

class ExtractTrainingData(DirectoryProcessor):
    def create_parser(self, subparser, command, description):
        self.parser = subparser.add_parser(
            command,
            help="Extract the faces from a pictures.",
            description=description)
        
    def process(self):
        extractor_name = "Align"  # DEFINED BELOW THIS CLASS
        extractor = PluginLoader.get_extractor(extractor_name)()

        try:
            for filename in self.read_directory():
                image = cv2.imread(filename)
                for idx, face in self.get_faces(image):
                    resized_image = extractor.extract(image, face, 256)
                    output_file = self.output_dir / Path(filename).stem
                    cv2.imwrite(str(output_file) + str(idx) + Path(filename).suffix, resized_image)
        except Exception as e:
            print('Failed to extract from image: {}. Reason: {}'.format(filename, e))
```       
            
### Code for Alignment            
```python            
import cv2
from lib.aligner import get_align_mat

class Extract(object):
    def extract(self, image, face, size):
        if face.landmarks == None:
            return cv2.resize(face.image, (size, size))
        alignment = get_align_mat( face )
        return self.transform( image, alignment, size, 48 )
    
    def transform( self, image, mat, size, padding=0 ):
        mat = mat * (size - 2 * padding)
        mat[:,2] += padding
        return cv2.warpAffine( image, mat, ( size, size ) )
     
```

### Training:
We will be using a single Encoder and two Decoders to train our model. For the Encoder we use 4 Convolutional layers with activation LeakyReLU and neurons double at each layer, followed by Dense and Flatten. For the Decoder we use 3 Convolutional layers with activation LeakyReLU, followed by another Convolutional layer with Sigmoid activation. We use Adam as the optimizer with learning rate 0.00005 . The loss function used is Mean Absolute Error.
![autoencoderworking](images/04.png)

### Code for trainer:
```python
from lib.utils import get_image_paths
from lib.cli import FullPaths
from plugins.PluginLoader import PluginLoader

class TrainingProcessor(object):
    arguments = None

    def __init__(self, subparser, command, description='default'):
        self.parse_arguments(description, subparser, command)

    def process_arguments(self, arguments):
        self.arguments = arguments
        print("Model A Directory: {}".format(self.arguments.input_A))
        print("Model B Directory: {}".format(self.arguments.input_B))
        print("Training data directory: {}".format(self.arguments.model_dir))

        self.process()

    def parse_arguments(self, description, subparser, command):
        parser = subparser.add_parser(
            command,
            help="This command trains the model for the two faces A and B.",
            description=description
        )

        parser.add_argument('-A', '--input-A',
                            action=FullPaths,
                            dest="input_A",
                            default="input_A",
                            help="Input directory. A directory containing training images for face A.\
                             Defaults to 'input'")
        parser.add_argument('-B', '--input-B',
                            action=FullPaths,
                            dest="input_B",
                            default="input_B",
                            help="Input directory. A directory containing training images for face B.\
                             Defaults to 'input'")
        parser.add_argument('-m', '--model-dir',
                            action=FullPaths,
                            dest="model_dir",
                            default="models",
                            help="Model directory. This is where the training data will \
                                be stored. Defaults to 'model'")
        parser.add_argument('-p', '--preview',
                            action="store_true",
                            dest="preview",
                            default=False,
                            help="Show preview output. If not specified, write progress \
                            to file.")
        parser.add_argument('-v', '--verbose',
                            action="store_true",
                            dest="verbose",
                            default=False,
                            help="Show verbose output")
        parser.add_argument('-s', '--save-interval',
                            type=int,
                            dest="save_interval",
                            default=100,
                            help="Sets the number of iterations before saving the model.")
        parser.add_argument('-w', '--write-image',
                            action="store_true",
                            dest="write_image",
                            default=False,
                            help="Writes the training result to a file even on preview mode.")
        parser.add_argument('-t', '--trainer',
                            type=str,
                            choices=("Original", "LowMem"),
                            default="Original",
                            help="Select which trainer to use, LowMem for cards < 2gb.")
        parser.add_argument('-bs', '--batch-size',
                            type=int,
                            default=64,
                            help="Batch size, as a power of 2 (64, 128, 256, etc)")
        parser = self.add_optional_arguments(parser)
        parser.set_defaults(func=self.process_arguments)

    def add_optional_arguments(self, parser):
        # Override this for custom arguments
        return parser

    def process(self):
        import threading
        self.stop = False
        self.save_now = False

        thr = threading.Thread(target=self.processThread, args=(), kwargs={})
        thr.start()

        if self.arguments.preview:
            print('Using live preview')
            while True:
                try:
                    for name, image in self.preview_buffer.items():
                        cv2.imshow(name, image)

                    key = cv2.waitKey(1000)
                    if key == ord('\n') or key == ord('\r'):
                        break
                    if key == ord('s'):
                        self.save_now = True
                except KeyboardInterrupt:
                    break
        else:
            input() 

        print("Exit requested! The trainer will complete its current cycle, save the models and quit (it can take up a couple of seconds depending on your training speed). If you want to kill it now, press Ctrl + c")
        self.stop = True
        thr.join() # waits until thread finishes

    def processThread(self):
        print('Loading data, this may take a while...')
        # this is so that you can enter case insensitive values for trainer
        trainer = self.arguments.trainer
        trainer = trainer if trainer != "Lowmem" else "LowMem"
        model = PluginLoader.get_model(trainer)(self.arguments.model_dir)
        model.load(swapped=False)

        images_A = get_image_paths(self.arguments.input_A)
        images_B = get_image_paths(self.arguments.input_B)
        trainer = PluginLoader.get_trainer(trainer)(model,
                                                                   images_A,
                                                                   images_B,
                                                                   batch_size=self.arguments.batch_size)

        try:
            print('Starting. Press "Enter" to stop training and save model')

            for epoch in range(0, 1000000):

                save_iteration = epoch % self.arguments.save_interval == 0

                trainer.train_one_step(epoch, self.show if (save_iteration or self.save_now) else None)

                if save_iteration:
                    model.save_weights()

                if self.stop:
                    model.save_weights()
                    exit()

                if self.save_now:
                    model.save_weights()
                    self.save_now = False

        except KeyboardInterrupt:
            try:
                model.save_weights()
            except KeyboardInterrupt:
                print('Saving model weights has been cancelled!')
            exit(0)

    preview_buffer = {}

    def show(self, image, name=''):
        try:
            if self.arguments.preview:
                self.preview_buffer[name] = image
            elif self.arguments.write_image:
                cv2.imwrite('_sample_{}.jpg'.format(name), image)
        except Exception as e:
            print("could not preview sample")
            print(e)
            ```
### Code for AutoEncoder
```python
from keras.models import Model as KerasModel
from keras.layers import Input, Dense, Flatten, Reshape
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import Conv2D
from keras.optimizers import Adam
from lib.ModelAE import ModelAE, TrainerAE
from lib.PixelShuffler import PixelShuffler

IMAGE_SHAPE = (64, 64, 3)
ENCODER_DIM = 1024

class Model(ModelAE):
    def initModel(self):
        optimizer = Adam(lr=5e-5, beta_1=0.5, beta_2=0.999)
        x = Input(shape=IMAGE_SHAPE)

        self.autoencoder_A = KerasModel(x, self.decoder_A(self.encoder(x)))
        self.autoencoder_B = KerasModel(x, self.decoder_B(self.encoder(x)))

        self.autoencoder_A.compile(optimizer=optimizer, loss='mean_absolute_error')
        self.autoencoder_B.compile(optimizer=optimizer, loss='mean_absolute_error')

    def converter(self, swap):
        autoencoder = self.autoencoder_B if not swap else self.autoencoder_A 
        return lambda img: autoencoder.predict(img)

    def conv(self, filters):
        def block(x):
            x = Conv2D(filters, kernel_size=5, strides=2, padding='same')(x)
            x = LeakyReLU(0.1)(x)
            return x
        return block

    def upscale(self, filters):
        def block(x):
            x = Conv2D(filters * 4, kernel_size=3, padding='same')(x)
            x = LeakyReLU(0.1)(x)
            x = PixelShuffler()(x)
            return x
        return block

    def Encoder(self):
        input_ = Input(shape=IMAGE_SHAPE)
        x = input_
        x = self.conv(128)(x)
        x = self.conv(256)(x)
        x = self.conv(512)(x)
        x = self.conv(1024)(x)
        x = Dense(ENCODER_DIM)(Flatten()(x))
        x = Dense(4 * 4 * 1024)(x)
        x = Reshape((4, 4, 1024))(x)
        x = self.upscale(512)(x)
        return KerasModel(input_, x)

    def Decoder(self):
        input_ = Input(shape=(8, 8, 512))
        x = input_
        x = self.upscale(256)(x)
        x = self.upscale(128)(x)
        x = self.upscale(64)(x)
        x = Conv2D(3, kernel_size=5, padding='same', activation='sigmoid')(x)
        return KerasModel(input_, x)

class Trainer(TrainerAE):
    """Empty inheritance"""
```
### Training Progress
![Training](images/05.png)


### Extract frames from Video 
We use ffmpeg to extract frames from the video

### Code for performing Swap on the extracted frames
```python
import cv2
import re
from pathlib import Path
from lib.cli import DirectoryProcessor, FullPaths
from lib.utils import BackgroundGenerator
from lib.faces_detect import detect_faces
from plugins.PluginLoader import PluginLoader

class ConvertImage(DirectoryProcessor):
    filename = ''
    def create_parser(self, subparser, command, description):
        self.parser = subparser.add_parser(
            command,
            help="Convert a source image to a new one with the face swapped.",
            description=description     
        )

    def add_optional_arguments(self, parser):
        parser.add_argument('-m', '--model-dir',
                            action=FullPaths,
                            dest="model_dir",
                            default="models",
                            help="Model directory. A directory containing the trained model \
                    you wish to process. Defaults to 'models'")

        parser.add_argument('-s', '--swap-model',
                            action="store_true",
                            dest="swap_model",
                            default=False,
                            help="Swap the model. Instead of A -> B, swap B -> A.")

        parser.add_argument('-c', '--converter',
                            type=str,
                            choices=("Masked", "Adjust"), # case sensitive because this is used to load a plugin.
                            default="Masked",
                            help="Converter to use.")

        parser.add_argument('-fr', '--frame-ranges',
                            nargs="+",
                            type=str,
                            help="""frame ranges to apply transfer to. eg for frames 10 to 50 and 90 to 100 use --frame-ranges 10-50 90-100.
                            Files must have the framenumber as the last number in the name!"""
                            )

        parser.add_argument('-d', '--discard-frames',
                            action="store_true",
                            dest="discard_frames",
                            default=False,
                            help="when use with --frame-ranges discards frames that are not processed instead of writing them out unchanged."
                            )

        parser.add_argument('-b', '--blur-size',
                            type=int,
                            default=2,
                            help="Blur size. (Masked converter only)")

        parser.add_argument('-S', '--seamless',
                            action="store_true",
                            dest="seamless_clone",
                            default=False,
                            help="Seamless mode. (Masked converter only)")

        parser.add_argument('-M', '--mask-type', 
        # Based on https://gist.github.com/anonymous/d3815aba83a8f79779451262599b0955
                            type=str.lower, #lowercase this, because its just a string later on.
                            dest="mask_type",
                            choices=["rect", "facehull", "facehullandrect"],
                            default="facehullandrect",
                            help="Mask to use to replace faces. (Masked converter only)")

        parser.add_argument('-e', '--erosion-kernel-size',
                            dest="erosion_kernel_size",
                            type=int,
                            default=None,
                            help="Erosion kernel size. (Masked converter only)")

        parser.add_argument('-sm', '--smooth-mask',
                            action="store_true",
                            dest="smooth_mask",
                            default=True,
                            help="Smooth mask (Adjust converter only)")

        parser.add_argument('-aca', '--avg-color-adjust',
                            action="store_true",
                            dest="avg_color_adjust",
                            default=True,
                            help="Average color adjust. (Adjust converter only)")

        return parser
    
    def process(self):
        # Original model goes with Adjust or Masked converter
        # does the LowMem one work with only one?
        model_name = "Original" # TODO Pass as argument
        conv_name = self.arguments.converter
        
        model = PluginLoader.get_model(model_name)(self.arguments.model_dir)
        if not model.load(self.arguments.swap_model):
            print('Model Not Found! A valid model must be provided to continue!')
            exit(1)

        converter = PluginLoader.get_converter(conv_name)(model.converter(False),
            blur_size=self.arguments.blur_size,
            seamless_clone=self.arguments.seamless_clone,
            mask_type=self.arguments.mask_type,
            erosion_kernel_size=self.arguments.erosion_kernel_size,
            smooth_mask=self.arguments.smooth_mask,
            avg_color_adjust=self.arguments.avg_color_adjust
        )

        batch = BackgroundGenerator(self.prepare_images(), 1)

        # frame ranges stuff...
        self.frame_ranges = None
        # split out the frame ranges and parse out "min" and "max" values
        minmax = {
            "min": 0, # never any frames less than 0
            "max": float("inf")
        }
        if self.arguments.frame_ranges:
            self.frame_ranges = [tuple(map(lambda q: minmax[q] if q in minmax.keys() else int(q), v.split("-"))) for v in self.arguments.frame_ranges]

        # last number regex. I know regex is hacky, but its reliablyhacky(tm).
        self.imageidxre = re.compile(r'(\d+)(?!.*\d)')

        for item in batch.iterator():
            self.convert(converter, item)
    
    def check_skip(self, filename):
        try:
            idx = int(self.imageidxre.findall(filename)[0])
            return not any(map(lambda b: b[0]<=idx<=b[1], self.frame_ranges))
        except:
            return False


    def convert(self, converter, item):
        try:
            (filename, image, faces) = item
            
            skip = self.check_skip(filename)

            if not skip: # process as normal
                for idx, face in faces:
                    image = converter.patch_image(image, face)

            output_file = self.output_dir / Path(filename).name

            if self.arguments.discard_frames and skip:
                return
            cv2.imwrite(str(output_file), image)
        except Exception as e:
            print('Failed to convert image: {}. Reason: {}'.format(filename, e))

    def prepare_images(self):
        for filename in self.read_directory():
            image = cv2.imread(filename)
            yield filename, image, self.get_faces(image)

```
### Masked Conversion:
```python
import cv2
import numpy

from lib.aligner import get_align_mat

class Convert():
    def __init__(self, encoder, blur_size=2, seamless_clone=False, mask_type="facehullandrect", erosion_kernel_size=None, **kwargs):
        self.encoder = encoder

        self.erosion_kernel = None
        if erosion_kernel_size is not None:
            self.erosion_kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(erosion_kernel_size,erosion_kernel_size))

        self.blur_size = blur_size
        self.seamless_clone = seamless_clone
        self.mask_type = mask_type.lower() # Choose in 'FaceHullAndRect','FaceHull','Rect'

    def patch_image( self, image, face_detected ):
        size = 64
        image_size = image.shape[1], image.shape[0]

        mat = numpy.array(get_align_mat(face_detected)).reshape(2,3) * size

        new_face = self.get_new_face(image,mat,size)

        image_mask = self.get_image_mask( image, new_face, face_detected, mat, image_size )

        return self.apply_new_face(image, new_face, image_mask, mat, image_size, size)

    def apply_new_face(self, image, new_face, image_mask, mat, image_size, size):
        base_image = numpy.copy( image )
        new_image = numpy.copy( image )

        cv2.warpAffine( new_face, mat, image_size, new_image, cv2.WARP_INVERSE_MAP, cv2.BORDER_TRANSPARENT )

        outImage = None
        if self.seamless_clone:
            masky,maskx = cv2.transform( numpy.array([ size/2,size/2 ]).reshape(1,1,2) ,cv2.invertAffineTransform(mat) ).reshape(2).astype(int)
            outimage = cv2.seamlessClone(new_image.astype(numpy.uint8),base_image.astype(numpy.uint8),(image_mask*255).astype(numpy.uint8),(masky,maskx) , cv2.NORMAL_CLONE )
        else:
            foreground = cv2.multiply(image_mask, new_image.astype(float))
            background = cv2.multiply(1.0 - image_mask, base_image.astype(float))
            outimage = cv2.add(foreground, background)

        return outimage

    def get_new_face(self, image, mat, size):
        face = cv2.warpAffine( image, mat, (size,size) )
        face = numpy.expand_dims( face, 0 )
        new_face = self.encoder( face / 255.0 )[0]

        return numpy.clip( new_face * 255, 0, 255 ).astype( image.dtype )

    def get_image_mask(self, image, new_face, face_detected, mat, image_size):

        face_mask = numpy.zeros(image.shape,dtype=float)
        if 'rect' in self.mask_type:
            face_src = numpy.ones(new_face.shape,dtype=float)
            cv2.warpAffine( face_src, mat, image_size, face_mask, cv2.WARP_INVERSE_MAP, cv2.BORDER_TRANSPARENT )

        hull_mask = numpy.zeros(image.shape,dtype=float)
        if 'hull' in self.mask_type:
            hull = cv2.convexHull( numpy.array( face_detected.landmarksAsXY() ).reshape((-1,2)).astype(int) ).flatten().reshape( (-1,2) )
            cv2.fillConvexPoly( hull_mask,hull,(1,1,1) )

        if self.mask_type == 'rect':
            image_mask = face_mask
        elif self.mask_type == 'faceHull':
            image_mask = hull_mask
        else:
            image_mask = ((face_mask*hull_mask))


        if self.erosion_kernel is not None:
            image_mask = cv2.erode(image_mask,self.erosion_kernel,iterations = 1)

        if self.blur_size!=0:
            image_mask = cv2.blur(image_mask,(self.blur_size,self.blur_size))

        return image_mask

```



<h3 align='center'> Image before conversion</h3>

![stark_before.png](images/06.png)

<h3 align='center'> Image after conversion</h3>

![stark_after.png](images/07.png)


<h3 align='center'> Model Results </h3>
<table>
  <tr>
    <th>Error for A</th>
    <th>Error for B</th>
    <th>Batch Size</th>
    <th>Epoch</th>
  </tr>
  <tr>
    <td>0.05862</td>
    <td>0.07512</td>
    <td>64</td>
    <td>1000</td>
  </tr>
  <tr>
    <td>0.05322</td>
    <td>0.07353</td>
    <td>64</td>
    <td>1500</td>
  </tr>
  <tr>
    <td>0.05226</td>
    <td>0.07171</td>
    <td>128</td>
    <td>1000</td>
  </tr>
  <tr>
    <td>0.04723</td>
    <td>0.06410</td>
    <td>128</td>
    <td>1500</td>
  </tr>
</table>

### Conclusion

Since the error rate was still 0.04723, we couldn’t obtain significant clarity.
But a few thoughts to improve this model would be to increase the dataset, more CNN layers, using GAN’s. Adding a bit of noise in the image dataset and choosing high resolution images may be viable as well.

#### Github link for the project
https://github.com/sushrutt12/faceswappingtechnique

#### Link for the video
https://youtu.be/FZXQtbHG6-o

### License
The text in the document by Sushrut Tadwalkar is licensed under CC BY 3.0 https://creativecommons.org/licenses/by/3.0/us/