In [None]:
# !pip install PyYAML==5.3.1

# conda install -y -c conda-forge pyyaml=5.3.1

# ⭐Clone repository
# git clone https://github.com/AliaksandrSiarohin/first-order-model

In [None]:
# conda activate deepfake

In [None]:
# # Use conda-forge for most scientific packages
# conda install -c conda-forge numpy matplotlib pandas scikit-image ipython -y

# # Install OpenCV
# conda install -c conda-forge opencv -y

# # Install plotly and wbdata via pip
# pip install plotly wbdata


In [None]:
# # Create a new environment
# conda create -n deepfake python=3.12 -y

# # Activate it
# conda activate deepfake

# # Install PyTorch + torchvision + torchaudio (CPU-only for simplicity)
# conda install pytorch torchvision torchaudio cpuonly -c pytorch

# # Install dlib
# conda install -c conda-forge dlib


In [23]:
import os
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from PIL import Image
import dlib
import urllib.request
import requests
from io import BytesIO
import warnings
warnings.filterwarnings('ignore')

# =========================
# DEVICE SETUP
# =========================
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# =========================
# HAAR CASCADE LOADER
# =========================
def get_haar_cascade(local_file='haarcascade_frontalface_default.xml'):
    """Ensure Haar cascade XML is available locally"""
    if not os.path.exists(local_file):
        print("Haar cascade not found locally. Downloading...")
        url = f'https://raw.githubusercontent.com/opencv/opencv/master/data/haarcascades/{local_file}'
        urllib.request.urlretrieve(url, local_file)
        print(f"Haar cascade downloaded to: {local_file}")
    cascade = cv2.CascadeClassifier(local_file)
    if cascade.empty():
        raise IOError(f"Failed to load Haar cascade XML at {local_file}")
    print(f"Haar cascade loaded ✅ from {local_file}")
    return cascade

# =========================
# FACE PROCESSOR
# =========================
class FaceProcessor:
    def __init__(self):
        self.face_cascade = get_haar_cascade()

    def detect_faces(self, image):
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        faces = self.face_cascade.detectMultiScale(
            gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)
        )
        return faces

    def extract_face(self, image, face_coords, padding_ratio=0.2):
        x, y, w, h = face_coords
        pad = int(w * padding_ratio)
        x1 = max(0, x - pad)
        y1 = max(0, y - pad)
        x2 = min(image.shape[1], x + w + pad)
        y2 = min(image.shape[0], y + h + pad)
        face = image[y1:y2, x1:x2]
        return face, (x1, y1, x2-x1, y2-y1)

# =========================
# FACE ALIGNER (OPTIONAL)
# =========================
class FaceAligner:
    def __init__(self, predictor_path='shape_predictor_68_face_landmarks.dat'):
        if not os.path.exists(predictor_path):
            print(f"Landmark predictor not found: {predictor_path}")
            print("Download: http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2")
            raise FileNotFoundError(predictor_path)
        self.detector = dlib.get_frontal_face_detector()
        self.predictor = dlib.shape_predictor(predictor_path)
        
    def align_face(self, image):
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        faces = self.detector(gray, 1)
        if len(faces) == 0:
            return None
        face = faces[0]
        landmarks = self.predictor(gray, face)
        left_eye = np.mean([(landmarks.part(i).x, landmarks.part(i).y) for i in range(36,42)], axis=0)
        right_eye = np.mean([(landmarks.part(i).x, landmarks.part(i).y) for i in range(42,48)], axis=0)
        angle = np.degrees(np.arctan2(right_eye[1]-left_eye[1], right_eye[0]-left_eye[0]))
        eyes_center = ((left_eye[0]+right_eye[0])//2, (left_eye[1]+right_eye[1])//2)
        M = cv2.getRotationMatrix2D(eyes_center, angle, 1.0)
        rotated = cv2.warpAffine(image, M, (image.shape[1], image.shape[0]), flags=cv2.INTER_CUBIC)
        return rotated

# =========================
# AUTOENCODER MODEL
# =========================
class FaceSwapAutoencoder(nn.Module):
    def __init__(self):
        super().__init__()
        # Encoder
        self.enc_conv1 = nn.Conv2d(3, 64, 5, 2, 2)
        self.enc_conv2 = nn.Conv2d(64, 128, 5, 2, 2)
        self.enc_conv3 = nn.Conv2d(128, 256, 5, 2, 2)
        self.enc_conv4 = nn.Conv2d(256, 512, 5, 2, 2)
        self.bottleneck = nn.Conv2d(512, 512, 3, 1, 1)
        # Decoder
        self.dec_conv1 = nn.ConvTranspose2d(512, 256, 5, 2, 2, 1)
        self.dec_conv2 = nn.ConvTranspose2d(256, 128, 5, 2, 2, 1)
        self.dec_conv3 = nn.ConvTranspose2d(128, 64, 5, 2, 2, 1)
        self.dec_conv4 = nn.ConvTranspose2d(64, 3, 5, 2, 2, 1)
        # BatchNorm
        self.norm1 = nn.BatchNorm2d(64)
        self.norm2 = nn.BatchNorm2d(128)
        self.norm3 = nn.BatchNorm2d(256)
        self.norm4 = nn.BatchNorm2d(512)
        self.dnorm1 = nn.BatchNorm2d(256)
        self.dnorm2 = nn.BatchNorm2d(128)
        self.dnorm3 = nn.BatchNorm2d(64)

    def encode(self, x):
        x = F.relu(self.norm1(self.enc_conv1(x)))
        x = F.relu(self.norm2(self.enc_conv2(x)))
        x = F.relu(self.norm3(self.enc_conv3(x)))
        x = F.relu(self.norm4(self.enc_conv4(x)))
        return F.relu(self.bottleneck(x))

    def decode(self, x):
        x = F.relu(self.dnorm1(self.dec_conv1(x)))
        x = F.relu(self.dnorm2(self.dec_conv2(x)))
        x = F.relu(self.dnorm3(self.dec_conv3(x)))
        return torch.tanh(self.dec_conv4(x))

    def forward(self, x):
        return self.decode(self.encode(x))

# =========================
# DEEPFAKE GENERATOR
# =========================
class DeepfakeGenerator:
    def __init__(self):
        self.model = FaceSwapAutoencoder().to(device)
        self.face_processor = FaceProcessor()
        self.transform = transforms.Compose([
            transforms.Resize((256,256)),
            transforms.ToTensor(),
            transforms.Normalize([0.5]*3,[0.5]*3)
        ])

    def prepare_face(self, image):
        """Accepts URL or local path or OpenCV image"""
        if isinstance(image, str):
            if image.startswith('http'):
                img = Image.open(BytesIO(requests.get(image).content))
            else:
                img = Image.open(image)
            img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
        else:
            img = image.copy()
        faces = self.face_processor.detect_faces(img)
        if len(faces)==0:
            raise ValueError("No faces detected")
        face_coords = max(faces, key=lambda f: f[2]*f[3])
        face_img, _ = self.face_processor.extract_face(img, face_coords)
        face_tensor = self.transform(Image.fromarray(cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB))).unsqueeze(0).to(device)
        return face_tensor, face_img, face_coords

    def blend_faces(self, source_tensor, target_tensor):
        """Simple blending"""
        return 0.3*source_tensor + 0.7*target_tensor

    def tensor_to_image(self, tensor):
        tensor = tensor.cpu().detach()*0.5 + 0.5
        img = (tensor.permute(1,2,0).numpy()*255).astype(np.uint8)
        return cv2.cvtColor(img, cv2.COLOR_RGB2BGR)

    def swap_faces_video(self, source_img, target_video, output_file='deepfake_output.mp4'):
        print("Processing deepfake video...")
        source_tensor, _, _ = self.prepare_face(source_img)
        cap = cv2.VideoCapture(target_video)
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        w, h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        out = cv2.VideoWriter(output_file, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w,h))
        frame_num = 0
        while True:
            ret, frame = cap.read()
            if not ret: break
            try:
                target_tensor, target_face_img, coords = self.prepare_face(frame)
                blended = self.blend_faces(source_tensor, target_tensor)
                swapped = self.tensor_to_image(blended[0])
                swapped_resized = cv2.resize(swapped, (coords[2], coords[3]))
                x, y, w1, h1 = coords
                frame[y:y+h1, x:x+w1] = swapped_resized
            except Exception as e:
                pass
            out.write(frame)
            frame_num += 1
        cap.release()
        out.release()
        print(f"Deepfake video saved: {output_file}")
        return output_file

# =========================
# DEMO EXECUTION
# =========================
if __name__=="__main__":
    print("=== Online-ready Deepfake Demo ===")
    generator = DeepfakeGenerator()

    # Example online files (replace with any URLs)
    source_image_url = "https://raw.githubusercontent.com/opencv/opencv/master/samples/data/lena.jpg"
    target_video_url = "https://raw.githubusercontent.com/opencv/opencv/master/samples/data/vtest.avi"

    # Download target video if URL
    if target_video_url.startswith('http'):
        print("Downloading target video...")
        r = requests.get(target_video_url)
        target_video_path = 'target_video.mp4'
        with open(target_video_path, 'wb') as f:
            f.write(r.content)
    else:
        target_video_path = target_video_url

    generator.swap_faces_video(source_image_url, target_video_path, 'deepfake_demo.mp4')


Using device: cpu
=== Online-ready Deepfake Demo ===
Haar cascade loaded ✅ from haarcascade_frontalface_default.xml
Downloading target video...
Processing deepfake video...
Deepfake video saved: deepfake_demo.mp4


In [None]:
import os
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from PIL import Image
import dlib
import urllib.request
import requests
from io import BytesIO
import warnings
warnings.filterwarnings('ignore')

# =========================
# DEVICE SETUP
# =========================
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# =========================
# HAAR CASCADE SETUP
# =========================
def get_haar_cascade(local_file='haarcascade_frontalface_default.xml'):
    """Ensure Haar cascade XML is available locally"""
    if not os.path.exists(local_file):
        print("Haar cascade not found locally. Downloading...")
        url = f'https://raw.githubusercontent.com/opencv/opencv/master/data/haarcascades/{local_file}'
        # Face:
# https://raw.githubusercontent.com/opencv/opencv/master/data/haarcascades/haarcascade_frontalface_default.xml

# Eye:
# https://raw.githubusercontent.com/opencv/opencv/master/data/haarcascades/haarcascade_eye.xml

# Smile:
# https://raw.githubusercontent.com/opencv/opencv/master/data/haarcascades/haarcascade_smile.xml

# Full body:
# https://raw.githubusercontent.com/opencv/opencv/master/data/haarcascades/haarcascade_fullbody.xml
        urllib.request.urlretrieve(url, local_file)
        print(f"Haar cascade downloaded to: {local_file}")
    cascade = cv2.CascadeClassifier(local_file)
    if cascade.empty():
        raise IOError(f"Failed to load Haar cascade XML at {local_file}")
    print(f"Haar cascade loaded ✅ from {local_file}")
    return cascade

# =========================
# FACE PROCESSOR
# =========================
class FaceProcessor:
    def __init__(self):
        self.face_cascade = get_haar_cascade()

    def detect_faces(self, image):
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        faces = self.face_cascade.detectMultiScale(
            gray, scaleFactor=1.1, minNeighbors=5, minSize=(30,30)
        )
        return faces

    def extract_face(self, image, face_coords):
        x, y, w, h = face_coords
        padding = int(w * 0.2)
        x1 = max(0, x - padding)
        y1 = max(0, y - padding)
        x2 = min(image.shape[1], x + w + padding)
        y2 = min(image.shape[0], y + h + padding)
        return image[y1:y2, x1:x2], (x1, y1, x2-x1, y2-y1)

# =========================
# OPTIONAL: FACE ALIGNER
# =========================
class FaceAligner:
    def __init__(self, predictor_path='shape_predictor_68_face_landmarks.dat'):
        if not os.path.exists(predictor_path):
            print(f"Landmark predictor not found: {predictor_path}")
            print("Download from: http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2")
            raise FileNotFoundError(predictor_path)
        self.detector = dlib.get_frontal_face_detector()
        self.predictor = dlib.shape_predictor(predictor_path)
    
    def align_face(self, image):
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        faces = self.detector(gray, 1)
        if len(faces) == 0:
            return None
        face = faces[0]
        landmarks = self.predictor(gray, face)
        left_eye = np.mean([(landmarks.part(i).x, landmarks.part(i).y) for i in range(36,42)], axis=0)
        right_eye = np.mean([(landmarks.part(i).x, landmarks.part(i).y) for i in range(42,48)], axis=0)
        angle = np.degrees(np.arctan2(right_eye[1]-left_eye[1], right_eye[0]-left_eye[0]))
        eyes_center = ((left_eye[0]+right_eye[0])//2, (left_eye[1]+right_eye[1])//2)
        M = cv2.getRotationMatrix2D(eyes_center, angle, 1.0)
        return cv2.warpAffine(image, M, (image.shape[1], image.shape[0]), flags=cv2.INTER_CUBIC)

# =========================
# AUTOENCODER MODEL
# =========================
class FaceSwapAutoencoder(nn.Module):
    def __init__(self):
        super().__init__()
        # Encoder
        self.enc_conv1 = nn.Conv2d(3, 64, 5, 2, 2)
        self.enc_conv2 = nn.Conv2d(64, 128, 5, 2, 2)
        self.enc_conv3 = nn.Conv2d(128, 256, 5, 2, 2)
        self.enc_conv4 = nn.Conv2d(256, 512, 5, 2, 2)
        self.bottleneck = nn.Conv2d(512, 512, 3, 1, 1)
        # Decoder
        self.dec_conv1 = nn.ConvTranspose2d(512, 256, 5, 2, 2, 1)
        self.dec_conv2 = nn.ConvTranspose2d(256, 128, 5, 2, 2, 1)
        self.dec_conv3 = nn.ConvTranspose2d(128, 64, 5, 2, 2, 1)
        self.dec_conv4 = nn.ConvTranspose2d(64, 3, 5, 2, 2, 1)
        # BatchNorm
        self.norm1, self.norm2, self.norm3, self.norm4 = nn.BatchNorm2d(64), nn.BatchNorm2d(128), nn.BatchNorm2d(256), nn.BatchNorm2d(512)
        self.dnorm1, self.dnorm2, self.dnorm3 = nn.BatchNorm2d(256), nn.BatchNorm2d(128), nn.BatchNorm2d(64)

    def encode(self, x):
        x = F.relu(self.norm1(self.enc_conv1(x)))
        x = F.relu(self.norm2(self.enc_conv2(x)))
        x = F.relu(self.norm3(self.enc_conv3(x)))
        x = F.relu(self.norm4(self.enc_conv4(x)))
        return F.relu(self.bottleneck(x))

    def decode(self, x):
        x = F.relu(self.dnorm1(self.dec_conv1(x)))
        x = F.relu(self.dnorm2(self.dec_conv2(x)))
        x = F.relu(self.dnorm3(self.dec_conv3(x)))
        return torch.tanh(self.dec_conv4(x))

    def forward(self, x):
        return self.decode(self.encode(x))

# =========================
# DEEPFAKE GENERATOR
# =========================
class DeepfakeGenerator:
    def __init__(self):
        self.model = FaceSwapAutoencoder().to(device)
        self.face_processor = FaceProcessor()
        self.transform = transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.ToTensor(),
            transforms.Normalize([0.5]*3, [0.5]*3)
        ])

    def prepare_face(self, image):
        """Load image and convert to tensor"""
        if isinstance(image, str):
            if image.startswith('http'):
                response = requests.get(image)
                img = Image.open(BytesIO(response.content))
            else:
                img = Image.open(image)
            img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
        else:
            img = image.copy()
        faces = self.face_processor.detect_faces(img)
        if len(faces) == 0:
            raise ValueError("No faces detected in image/video frame")
        face_coords = max(faces, key=lambda f: f[2]*f[3])
        face_img, _ = self.face_processor.extract_face(img, face_coords)
        face_tensor = self.transform(Image.fromarray(cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB))).unsqueeze(0).to(device)
        return face_tensor, face_img, face_coords

    def simple_blend(self, source_tensor, target_tensor):
        """Demo blending"""
        return 0.3*source_tensor + 0.7*target_tensor

    def tensor_to_image(self, tensor):
        tensor = tensor.cpu().detach()*0.5 + 0.5
        img = (tensor.permute(1,2,0).numpy()*255).astype(np.uint8)
        return cv2.cvtColor(img, cv2.COLOR_RGB2BGR)

    def swap_faces_video(self, source_image, target_video, output='deepfake_output.mp4'):
        """Process video and swap faces"""
        source_tensor, _, _ = self.prepare_face(source_image)
        cap = cv2.VideoCapture(target_video)
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        w, h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        out = cv2.VideoWriter(output, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w,h))
        frame_count = 0
        while True:
            ret, frame = cap.read()
            if not ret: break
            try:
                target_tensor, target_face_img, coords = self.prepare_face(frame)
                blended = self.simple_blend(source_tensor, target_tensor)
                swapped = self.tensor_to_image(blended[0])
                swapped_resized = cv2.resize(swapped, (coords[2], coords[3]))
                x,y,w1,h1 = coords
                frame[y:y+h1, x:x+w1] = swapped_resized
            except Exception:
                pass
            out.write(frame)
            frame_count += 1
        cap.release()
        out.release()
        print(f"✅ Deepfake saved: {output}")

# =========================
# DEMO / SAMPLE OUTPUT
# =========================
def create_sample_video(output='deepfake_sample.mp4'):
    """Generates a sample video with text if source files missing"""
    width, height, fps, duration = 640,480,30,5
    out = cv2.VideoWriter(output, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width,height))
    for i in range(fps*duration):
        frame = np.zeros((height,width,3),dtype=np.uint8)
        cv2.putText(frame,"Deepfake Output Sample",(50,200),cv2.FONT_HERSHEY_SIMPLEX,1,(0,255,255),2)
        cv2.putText(frame,f"Frame {i+1}",(50,250),cv2.FONT_HERSHEY_SIMPLEX,0.7,(255,255,255),2)
        out.write(frame)
    out.release()
    print(f"Sample video created: {output}")

# =========================
# MAIN EXECUTION
# =========================
if __name__=="__main__":
    print("=== Deepfake System Initialized ===")
    generator = DeepfakeGenerator()

    # File paths
    source_image = "source_face.jpg"
    target_video = "target_video.mp4"

    # Check files
    if not os.path.exists(source_image) or not os.path.exists(target_video):
        print("Source image or target video missing. Generating sample output instead...")
        create_sample_video()
    else:
        generator.swap_faces_video(source_image, target_video, "deepfake_demo_output.mp4")


Using device: cpu
=== Deepfake System Initialized ===
Haar cascade loaded ✅ from haarcascade_frontalface_default.xml
Source image or target video missing. Generating sample output instead...
Sample video created: deepfake_sample.mp4
