In [1]:
import os
import subprocess
from collections import defaultdict
import cv2
from realesrgan import RealESRGANer
from PIL import Image, ImageEnhance, ImageFilter
from torchvision.transforms import transforms
from torchvision import transforms
from basicsr.archs.swinir_arch import SwinIR
from basicsr.archs.rrdbnet_arch import RRDBNet
from pathlib import Path
import torch
from pathlib import Path
import numpy as np
import re
import sys

In [None]:
os.environ["PATH"] = r"C:\ffmpeg\ffmpeg-7.1-full_build\bin" + ";" + os.environ["PATH"]
# rife_path = r"D:\oldMemories\RIFE"
# sys.path.append(rife_path)

In [8]:
# Define the source folder to traverse
SOURCE_FOLDER = r"F:\consolidated old photos"

# Define the output folder for converted videos (change if needed)
OUTPUT_FOLDER = r"F:\consolidated old photos\converted_videos"

# Test data
TEST_SOURCE_FOLDER = r"D:\oldMemoriesTestArea"
TEST_OUTPUT_FOLDER = r"D:\oldMemoriesTestArea\converted_videos"
TEST_EXTRACTS_FOLDER = r"D:\oldMemoriesTestArea\extracts"
TEST_SWINIR_FOLDER = r"D:\oldMemoriesTestArea\swinir"
TEST_UPSCALED_FOLDER = r"D:\oldMemoriesTestArea\upscaled"
FINAL_VIDEO_FOLDER = r"D:\oldMemoriesTestArea\final_videos"

# real esrgan model
REALESRGAN_MODEL_PATH = r"D:\oldMemories\RealESRGAN_x4plus.pth"
SWINIR_MODEL_PATH = r"D:\oldMemories\005_colorDN_DFWB_s128w8_SwinIR-M_noise15.pth"

RIFE_SCRIPT = r"D:\oldMemories\RIFE\inference_video.py"

# Video extensions that need standardizing
TARGET_EXTENSIONS = {".avi", ".dat", ".mpg", ".vob"}

# Info on the video files

In [None]:
def get_video_formats_and_paths(folder_path):
    """
    Traverse all subfolders, count video file formats, and list paths for each format.

    Args:
    - folder_path (str): Path to the folder to scan.

    Returns:
    - Dict with formats as keys, and a tuple (count, list of paths) as values.
    """
    # Define common video file extensions
    video_extensions = {
        ".mp4", ".avi", ".mkv", ".mov", ".wmv", ".flv", ".webm", ".mpeg", ".3gp", ".mpg", ".m4v", ".vob", ".dat"
    }

    # Dictionary to store format counts and paths
    format_data = defaultdict(lambda: {"count": 0, "paths": []})

    # Walk through the folder and its subfolders
    for root, _, files in os.walk(folder_path):
        for file in files:
            _, ext = os.path.splitext(file)
            ext = ext.lower()
            if ext in video_extensions:
                full_path = os.path.join(root, file)
                format_data[ext]["count"] += 1
                format_data[ext]["paths"].append(full_path)

    return dict(format_data)

# Example usage
checkVideoFormatsHere = r"F:\consolidated old photos"  # Replace with your folder path
video_format_data = get_video_formats_and_paths(checkVideoFormatsHere)

# Print results
print(f"Video formats and their counts/paths in '{checkVideoFormatsHere}':\n")
for format, data in sorted(video_format_data.items()):
    print(f"Format: {format}")
    print(f"Count: {data['count']}")
    # if count < 5 print their paths else print only the count
    if data['count'] < 5:
        print("Paths:")
        for path in data["paths"]:
            print(f"  {path}")
    print()


# Standardization of video files

In [None]:
def standardize_videos(source_folder, output_folder):
    """
    Traverse all subfolders from source_folder, find target video formats,
    and convert them to MP4 (H.264 + AAC) in output_folder, mirroring the folder structure.
    """
    for root, _, files in os.walk(source_folder):
        for file_name in files:
            # Get file extension in lowercase
            _, ext = os.path.splitext(file_name)
            ext = ext.lower()

            # If it's one of the target video formats, convert it
            if ext in TARGET_EXTENSIONS:
                source_file_path = os.path.join(root, file_name)

                # Build the mirrored output path by replacing SOURCE_FOLDER with OUTPUT_FOLDER
                relative_path = os.path.relpath(root, source_folder)
                output_subfolder = os.path.join(output_folder, relative_path)
                os.makedirs(output_subfolder, exist_ok=True)

                # Construct output filename (e.g., "Video 4_converted.mp4")
                base_name = os.path.splitext(file_name)[0]
                output_file_path = os.path.join(output_subfolder, base_name + ".mp4")

                print(f"\nConverting: {source_file_path}\n   to --> {output_file_path}")

                # ffmpeg command to convert video to H.264 (libx264) and AAC
                command = [
                    "ffmpeg",
                    "-y",
                    "-i", source_file_path,
                    "-c:v", "libx264",
                    "-crf", "15",        # Lower = higher quality, bigger file
                    "-preset", "slow",   # Or 'medium' / 'slower' / 'veryslow'
                    "-c:a", "aac",       # Encode audio with AAC
                    "-b:a", "192k",      # Optional: set audio bitrate
                    "-movflags", "+faststart",
                    output_file_path
                ]

                try:
                    subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                    print("Conversion successful.")
                except subprocess.CalledProcessError as e:
                    print(f"Error converting {source_file_path}.\nFFmpeg error: {e.stderr.decode('utf-8', errors='replace')}")

if __name__ == "__main__":
    # Make sure output folder exists
    os.makedirs(TEST_OUTPUT_FOLDER, exist_ok=True)

    standardize_videos(TEST_SOURCE_FOLDER, TEST_OUTPUT_FOLDER)
    print("\nAll possible videos have been processed.")

In [None]:
def extract_audio_ffmpeg(input_video_path, output_audio_path):
    """
    Extracts the audio track from a video using FFmpeg without re-encoding the audio.
    -i <input_video>  : Input video file
    -vn               : Disable video
    -acodec copy      : Copy the existing audio track directly
    
    Args:
        input_video_path  (str): Path to the input video (e.g., .mp4)
        output_audio_path (str): Path to save the output audio file (e.g., .aac or .mp3)
    """
    command = [
        "ffmpeg",
        "-y",               # Overwrite output if it exists
        "-i", input_video_path,
        "-vn",             # No video
        "-acodec", "copy", # Copy original audio track without re-encoding
        output_audio_path
    ]
    
    try:
        subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        print(f"Audio extracted to: {output_audio_path}")
    except subprocess.CalledProcessError as e:
        print(f"Error extracting audio from {input_video_path}.\nFFmpeg error: {e.stderr.decode('utf-8', errors='replace')}")

In [None]:
def extract_frames_opencv(input_video_path, frames_output_folder):
    """
    Extracts frames from a video using OpenCV and saves them as PNG files.
    Args:
        input_video_path     (str): Path to the input .mp4 (or other) video.
        frames_output_folder (str): Folder to save extracted frames.
    """
    os.makedirs(frames_output_folder, exist_ok=True)

    cap = cv2.VideoCapture(input_video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video {input_video_path}")
        return

    frame_count = 0
    while True:
        success, frame = cap.read()
        if not success:
            break
        frame_filename = os.path.join(frames_output_folder, f"frame_{frame_count:06d}.png")
        cv2.imwrite(frame_filename, frame)
        frame_count += 1

    cap.release()
    print(f"Extracted {frame_count} frames from {input_video_path} to {frames_output_folder}")

In [None]:
def process_standardized_videos(input_folder, output_folder):
    """
    Traverse all subfolders in `input_folder` to find .mp4 files.
    For each video, extract audio + frames in the corresponding subfolder of `output_folder`.
    
    Args:
        input_folder  (str): Path with standardized videos (likely .mp4).
        output_folder (str): Path to store extracted audio and frames.
    """
    for root, dirs, files in os.walk(input_folder):
        for file_name in files:
            if file_name.lower().endswith(".mp4"):
                video_path = os.path.join(root, file_name)

                # Mirror subfolder structure under output_folder
                relative_path = os.path.relpath(root, input_folder)
                target_subfolder = os.path.join(output_folder, relative_path)
                os.makedirs(target_subfolder, exist_ok=True)

                # Derive base name (no extension)
                base_name = os.path.splitext(file_name)[0]

                # 1) Extract Audio
                # We'll name the audio file base_name.aac in this example
                audio_file_path = os.path.join(target_subfolder, base_name + ".aac")
                extract_audio_ffmpeg(video_path, audio_file_path)

                # 2) Extract Frames
                frames_folder = os.path.join(target_subfolder, f"{base_name}_frames")
                extract_frames_opencv(video_path, frames_folder)

    print("All videos have been processed!")

In [None]:
# Make sure output folder exists
os.makedirs(TEST_EXTRACTS_FOLDER, exist_ok=True)
process_standardized_videos(TEST_OUTPUT_FOLDER, TEST_EXTRACTS_FOLDER)

# Frame Processing

## Denoising and sharpening

In [None]:
# Load the SwinIR model for denoising
def load_swinir_model(model_path, device):
    model = SwinIR(
        upscale=1,
        in_chans=3,
        img_size=128,
        window_size=8,
        img_range=1.0,
        depths=[6, 6, 6, 6, 6, 6],
        embed_dim=180,
        num_heads=[6, 6, 6, 6, 6, 6],
        mlp_ratio=2,
        upsampler='',
        resi_connection='1conv'
    )
    
    # Load weights
    pretrained = torch.load(model_path, map_location=device)
    param_key_g = 'params' if 'params' in pretrained else list(pretrained.keys())[0]  # Handle different model keys
    model.load_state_dict(pretrained[param_key_g], strict=True)

    model.eval()
    model.to(device)
    return model

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = load_swinir_model(SWINIR_MODEL_PATH, device)

In [None]:
def denoise_image_swinir(model, image_path, device):
    """ Denoise an image using SwinIR and return the output image. """
    # Load and convert image
    img = cv2.imread(str(image_path))
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0  # Normalize to [0,1]

    # Pad image to be a multiple of window size (8)
    h, w, c = img.shape
    h_pad = (8 - h % 8) % 8
    w_pad = (8 - w % 8) % 8
    img = np.pad(img, ((0, h_pad), (0, w_pad), (0, 0)), 'reflect')

    # Convert to tensor
    img_tensor = torch.from_numpy(np.transpose(img, (2, 0, 1))).float().unsqueeze(0).to(device)

    # Run SwinIR model
    with torch.no_grad():
        output = model(img_tensor).clamp_(0, 1)  # Clamp values to valid range

    # Convert back to image
    output = output.squeeze().cpu().numpy().transpose(1, 2, 0)  # CHW → HWC
    output = (output[:h, :w] * 255.0).astype(np.uint8)  # Remove padding & scale back to [0,255]

    return cv2.cvtColor(output, cv2.COLOR_RGB2BGR)  # Convert back to BGR for OpenCV saving

In [None]:
def sharpen_image_opencv(image):
    """Apply sharpening filter using OpenCV."""
    kernel = np.array([[0, -1, 0],
                       [-1,  5, -1],
                       [0, -1, 0]])
    sharpened = cv2.filter2D(image, -1, kernel)
    return sharpened

def sharpen_image_pil(image):
    """Apply unsharp mask using PIL for natural sharpening."""
    pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    sharpened = pil_image.filter(ImageFilter.UnsharpMask(radius=2, percent=150, threshold=3))
    return cv2.cvtColor(np.array(sharpened), cv2.COLOR_RGB2BGR)

In [None]:
# def process_images(input_dir, output_dir, model, device):
#     """ Recursively denoise images while maintaining folder structure. """
#     input_path = Path(input_dir)
#     output_path = Path(output_dir)

#     for img_path in input_path.rglob('*'):
#         if img_path.suffix.lower() in ['.png', '.jpg', '.jpeg', '.bmp']:
#             relative_path = img_path.relative_to(input_path)
#             target_path = output_path / relative_path
#             target_path.parent.mkdir(parents=True, exist_ok=True)  # Ensure folder exists

#             # Apply denoising
#             denoised_img = denoise_image_swinir(model, img_path, device)
#             cv2.imwrite(str(target_path), denoised_img)  # Save output
#             print(f"Denoised and saved: {target_path}")

def process_images_with_sharpening(input_dir, output_dir, model, device, sharpening_method="opencv"):
    """Denoise images and apply sharpening while maintaining folder structure."""
    input_path = Path(input_dir)
    output_path = Path(output_dir)

    for img_path in input_path.rglob('*'):
        if img_path.suffix.lower() in ['.png', '.jpg', '.jpeg', '.bmp']:
            relative_path = img_path.relative_to(input_path)
            target_path = output_path / relative_path
            target_path.parent.mkdir(parents=True, exist_ok=True)  # Ensure folder exists

            # Apply denoising
            denoised_img = denoise_image_swinir(model, img_path, device)

            # Apply sharpening
            if sharpening_method == "opencv":
                final_img = sharpen_image_opencv(denoised_img)
            else:
                final_img = sharpen_image_pil(denoised_img)

            # Save final output
            cv2.imwrite(str(target_path), final_img)
            print(f"Denoised, sharpened, and saved: {target_path}")

In [None]:
# Ensure output directory exists
os.makedirs(TEST_SWINIR_FOLDER, exist_ok=True)

# Run batch processing
# process_images_with_sharpening(TEST_EXTRACTS_FOLDER, TEST_SWINIR_FOLDER, model, device, sharpening_method="opencv")
process_images_with_sharpening(TEST_EXTRACTS_FOLDER, TEST_SWINIR_FOLDER, model, device, sharpening_method="pil")

## Upscaling

In [None]:
def upscale_extracted_frames_official(
    input_folder,
    output_folder,
    realesrgan_model_path,
    scale=4
):
    """
    Recursively upscale extracted frames in input_folder using the official Real-ESRGAN approach.
    Saves upscaled frames to the corresponding structure in output_folder.
    """
    
    # 1) Define the architecture that matches your .pth file
    model = RRDBNet(
        num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=scale
    )
    
    # 2) Create RealESRGANer, passing the custom model
    upsampler = RealESRGANer(
        scale=scale,
        model_path=realesrgan_model_path,
        model=model,          # ← provide the RRDBNet model
        tile=0,               # 0 = no tiling (set larger tile if you run out of memory)
        tile_pad=10,
        pre_pad=0,
        half=False,           # if you have low VRAM, try True
        gpu_id=0
    )

    for root, _, files in os.walk(input_folder):
        for file_name in files:
            # Check if this is an image
            if file_name.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
                input_path = os.path.join(root, file_name)
                rel_path = os.path.relpath(root, input_folder)
                target_subfolder = os.path.join(output_folder, rel_path)
                os.makedirs(target_subfolder, exist_ok=True)

                output_path = os.path.join(target_subfolder, file_name)
                img = cv2.imread(input_path, cv2.IMREAD_COLOR)
                if img is None:
                    print(f"Warning: Could not read {input_path}. Skipping.")
                    continue

                # Upscale
                output, _ = upsampler.enhance(img, outscale=scale)
                cv2.imwrite(output_path, output)

    print("All frames have been upscaled!")

os.makedirs(TEST_UPSCALED_FOLDER, exist_ok=True)

upscale_extracted_frames_official(
    input_folder=TEST_SWINIR_FOLDER,
    output_folder=TEST_UPSCALED_FOLDER,
    realesrgan_model_path=REALESRGAN_MODEL_PATH,
    scale=4
)

# ReStitching the video back together

In [10]:
# Ensure final output folder exists
os.makedirs(FINAL_VIDEO_FOLDER, exist_ok=True)

In [None]:
def get_video_fps(video_path):
    """ Extract FPS from the original video using FFmpeg. """
    try:
        cmd = f'ffmpeg -i "{video_path}"'
        output = subprocess.run(cmd, shell=True, capture_output=True, text=True).stderr  # Get stderr output

        # Debugging: Print FFmpeg output
        print("FFmpeg Output:\n", output)

        # Extract FPS using regex
        match = re.search(r'(\d+(?:\.\d+)?)\s*fps', output)
        if match:
            fps = float(match.group(1))
            print(f"✅ Extracted FPS: {fps}")
            return fps

    except subprocess.CalledProcessError as e:
        print(f"❌ FFmpeg Error:\n{e.stderr}")

    print("⚠ Warning: Could not detect FPS, using default 30 FPS.")
    return 30  # Fallback to 30 FPS

In [None]:
def frames_to_video_with_audio(video_name):
    """Stitch frames into a video and add the original audio."""
    video_stem = Path(video_name).stem  # Extract "flying car" from "flying car.mp4"
    frames_folder = Path(TEST_UPSCALED_FOLDER) / f"{video_stem}_frames"
    original_video_path = Path(TEST_OUTPUT_FOLDER) / video_name
    audio_file = Path(TEST_EXTRACTS_FOLDER) / f"{video_stem}.aac"
    output_video = Path(FINAL_VIDEO_FOLDER) / f"{video_stem}_final.mp4"

    # Extract original FPS
    fps = get_video_fps(original_video_path)

    # Ensure frames exist
    if not frames_folder.exists() or not any(frames_folder.glob("*.png")):
        print(f"❌ Frames folder is missing or empty: {frames_folder}")
        return

    # FFmpeg command to stitch frames into a video
    temp_video = Path(FINAL_VIDEO_FOLDER) / f"{video_stem}_no_audio.mp4"
    cmd = f'ffmpeg -framerate {fps} -i "{frames_folder}/frame_%06d.png" -c:v libx264 -crf 0 -preset slow "{temp_video}"'
    print(f"Running FFmpeg command: {cmd}")
    subprocess.run(cmd, shell=True, check=True)

    # If audio exists, merge it
    if audio_file.exists():
        cmd_audio = f'ffmpeg -i "{temp_video}" -i "{audio_file}" -c:v copy -c:a aac -b:a 256k "{output_video}"'
        subprocess.run(cmd_audio, shell=True, check=True)
        temp_video.unlink()  # Remove temp video without audio
        print(f"✅ Final video with audio saved: {output_video}")
    else:
        print(f"⚠ No audio found, saving video without audio: {temp_video}")

# Run the function
frames_to_video_with_audio("flying car.mp4")