In [None]:
import cv2
import numpy as np
import time
import os
from dataclasses import dataclass
from typing import Tuple, Optional, List
from scipy.ndimage import gaussian_filter
from skimage.restoration import estimate_sigma
import threading
import queue

@dataclass
class VideoMetadata:
    width: int
    height: int
    fps: int

class VideoProcessingError(Exception):
    pass

class VideoProcessor:
    def __init__(self, input_path: str, output_path: str):
        self.input_path = input_path
        self.output_path = output_path
        self.video_capture = None
        self.output_writer = None
        self.metadata = None

    def initialize(self) -> VideoMetadata:
        if not os.path.exists(self.input_path):
            raise VideoProcessingError(f"Video file not found: {self.input_path}")

        self.video_capture = cv2.VideoCapture(self.input_path)
        if not self.video_capture.isOpened():
            raise VideoProcessingError(f"Error opening video file: {self.input_path}")

        width = int(self.video_capture.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(self.video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(self.video_capture.get(cv2.CAP_PROP_FPS))

        self.metadata = VideoMetadata(width, height, fps)
        return self.metadata

    def create_writer(self):
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        self.output_writer = cv2.VideoWriter(
            self.output_path,
            fourcc,
            self.metadata.fps,
            (self.metadata.width, self.metadata.height)
        )

    def __iter__(self):
        return self

    def __next__(self):
        ret, frame = self.video_capture.read()
        if not ret:
            self.release()
            raise StopIteration
        return frame

    def write_frame(self, frame):
        if self.output_writer is not None:
            self.output_writer.write(frame)

    def release(self):
        if self.video_capture is not None:
            self.video_capture.release()
        if self.output_writer is not None:
            self.output_writer.release()

class FrameBuffer:
    def __init__(self, size: int):
        self.size = size
        self.buffer = []

    def add(self, frame):
        if len(self.buffer) == self.size:
            self.buffer.pop(0)
        self.buffer.append(frame)

    def is_full(self):
        return len(self.buffer) == self.size

    def get_current(self):
        return self.buffer[len(self.buffer) // 2]

class ImprovedFaceDetector:
    def __init__(self):
        self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
        self.confidence = 0.0

    def detect(self, frame: np.ndarray) -> Optional[Tuple[int, int, int, int]]:
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = self.face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
        if len(faces) > 0:
            self.confidence = faces[0][2] * faces[0][3] / (frame.shape[0] * frame.shape[1])  # Simple confidence metric
            return tuple(faces[0])
        return None

    def crop_face_with_padding(self, frame: np.ndarray, face_coords: Tuple[int, int, int, int]) -> np.ndarray:
        x, y, w, h = face_coords
        padding = int(min(w, h) * 0.1)  # 10% padding
        x1, y1 = max(0, x - padding), max(0, y - padding)
        x2, y2 = min(frame.shape[1], x + w + padding), min(frame.shape[0], y + h + padding)
        return frame[y1:y2, x1:x2]

class NoiseEstimator:
    def estimate_noise_level(self, image: np.ndarray) -> float:
        try:
            if len(image.shape) == 3:
                gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
            else:
                gray = image
            sigma_est = estimate_sigma(gray, channel_axis=None)
            return sigma_est
        except Exception as e:
            print(f"Warning: Noise estimation failed: {e}")
            return 0.1

class NoiseReducer:
    def __init__(self):
        self.bilateral_diameter = 9
        self.sigma_color = 75
        self.sigma_space = 75

    def reduce_noise(self, image: np.ndarray, noise_level: float) -> np.ndarray:
        sigma_color = self.sigma_color * (noise_level / 0.1)
        sigma_space = self.sigma_space * (noise_level / 0.1)

        denoised = cv2.bilateralFilter(image, self.bilateral_diameter, sigma_color, sigma_space)

        if noise_level > 0.2:
            denoised = cv2.GaussianBlur(denoised, (5, 5), 0.5)

        return denoised

class LightingEnhancer:
    def __init__(self):
        self.clip_limit = 3.0
        self.tile_grid_size = (8, 8)

    def enhance_lighting(self, image: np.ndarray) -> np.ndarray:
        lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
        l_channel = lab[:,:,0]

        clahe = cv2.createCLAHE(clipLimit=self.clip_limit, tileGridSize=self.tile_grid_size)

        enhanced_l = clahe.apply(l_channel)
        lab[:,:,0] = enhanced_l
        enhanced = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)

        return enhanced

class AdaptiveFaceEnhancer:
    def __init__(self):
        self.sr = cv2.dnn_superres.DnnSuperResImpl_create()
        model_path = "EDSR_x4.pb"  # You'll need to download this file
        try:
            self.sr.readModel(model_path)
            self.sr.setModel("edsr", 4)
        except Exception as e:
            print(f"Warning: Could not load super-resolution model: {e}")
            self.sr = None
        self.quality_score = 0.0

    def select_model(self, image: np.ndarray) -> 'AdaptiveFaceEnhancer':
        self.quality_score = self.assess_quality(image)
        return self

    def assess_quality(self, image: np.ndarray) -> float:
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
        return min(laplacian_var / 500, 1.0)  # Normalize to [0, 1]

    def enhance(self, image: np.ndarray) -> np.ndarray:
        if self.sr is not None and self.quality_score < 0.5:  # Only apply SR if quality is low
            try:
                return self.sr.upsample(image)
            except Exception as e:
                print(f"Super-resolution failed: {e}")
        return cv2.resize(image, (image.shape[1] * 2, image.shape[0] * 2), interpolation=cv2.INTER_CUBIC)

class FaceReintegrator:
    def blend(self, original: np.ndarray, enhanced_face: np.ndarray, face_coords: Tuple[int, int, int, int]) -> np.ndarray:
        x, y, w, h = face_coords
        enhanced_face_resized = cv2.resize(enhanced_face, (w, h))

        mask = np.zeros(original.shape[:2], dtype=np.float64)
        cv2.ellipse(mask, center=(x + w//2, y + h//2), axes=(w//2, h//2), angle=0, startAngle=0, endAngle=360, color=(1, 1, 1), thickness=-1)
        mask = cv2.GaussianBlur(mask, (w//15, h//15), 0)
        mask = np.dstack([mask] * 3)

        original_face_area = original[y:y+h, x:x+w]
        blended = mask * enhanced_face_resized + (1 - mask) * original_face_area
        original[y:y+h, x:x+w] = blended.astype(np.uint8)

        return original

class EnhancementStatsCollector:
    def __init__(self):
        self.frame_count = 0
        self.total_noise_level = 0
        self.total_face_confidence = 0
        self.total_quality_score = 0
        self.start_time = time.time()

    def update(self, noise_level: float, face_confidence: float, quality_score: float):
        self.frame_count += 1
        self.total_noise_level += noise_level
        self.total_face_confidence += face_confidence
        self.total_quality_score += quality_score

    def get_stats(self) -> dict:
        processing_time = time.time() - self.start_time
        return {
            "processed_frames": self.frame_count,
            "average_noise_level": self.total_noise_level / max(1, self.frame_count),
            "average_face_confidence": self.total_face_confidence / max(1, self.frame_count),
            "average_quality_score": self.total_quality_score / max(1, self.frame_count),
            "processing_time": processing_time,
            "fps": self.frame_count / processing_time if processing_time > 0 else 0
        }

def process_frame(frame, face_detector, noise_estimator, noise_reducer, lighting_enhancer, face_enhancer, face_reintegrator, stats_collector):
    face_coords = face_detector.detect(frame)
    if face_coords:
        face_frame = face_detector.crop_face_with_padding(frame, face_coords)

        noise_level = noise_estimator.estimate_noise_level(face_frame)
        denoised_frame = noise_reducer.reduce_noise(face_frame, noise_level)

        lighting_enhanced = lighting_enhancer.enhance_lighting(denoised_frame)

        enhanced_face = face_enhancer.select_model(lighting_enhanced).enhance(lighting_enhanced)

        frame = face_reintegrator.blend(frame, enhanced_face, face_coords)

        stats_collector.update(noise_level, face_detector.confidence, face_enhancer.quality_score)

    return frame

def process_video(input_path: str, output_path: str):
    video_processor = VideoProcessor(input_path, output_path)
    face_detector = ImprovedFaceDetector()
    noise_estimator = NoiseEstimator()
    noise_reducer = NoiseReducer()
    lighting_enhancer = LightingEnhancer()
    face_enhancer = AdaptiveFaceEnhancer()
    face_reintegrator = FaceReintegrator()
    stats_collector = EnhancementStatsCollector()

    try:
        video_processor.initialize()
        video_processor.create_writer()

        frame_buffer = FrameBuffer(size=5)
        frame_queue = queue.Queue(maxsize=10)
        result_queue = queue.Queue()

        def worker():
            while True:
                frame = frame_queue.get()
                if frame is None:
                    break
                processed_frame = process_frame(frame, face_detector, noise_estimator, noise_reducer,
                                                lighting_enhancer, face_enhancer, face_reintegrator, stats_collector)
                result_queue.put(processed_frame)
                frame_queue.task_done()

        num_threads = 4
        threads = []
        for _ in range(num_threads):
            t = threading.Thread(target=worker)
            t.start()
            threads.append(t)

        for frame in video_processor:
            frame_buffer.add(frame)
            if frame_buffer.is_full():
                frame_queue.put(frame_buffer.get_current())

            if not result_queue.empty():
                processed_frame = result_queue.get()
                video_processor.write_frame(processed_frame)

        # Process remaining frames
        while not frame_buffer.buffer:
            frame_queue.put(frame_buffer.buffer.pop(0))

        # Signal threads to exit
        for _ in range(num_threads):
            frame_queue.put(None)

        # Wait for all threads to complete
        for t in threads:
            t.join()

        # Write any remaining processed frames
        while not result_queue.empty():
            processed_frame = result_queue.get()
            video_processor.write_frame(processed_frame)

    except VideoProcessingError as e:
        print(f"Error processing video: {e}")
    finally:
        video_processor.release()

    return stats_collector.get_stats()

def main():
    input_video = "/content/D01_20240705162954 (online-video-cutter.com) (1).mp4"
    output_video = "enhanced_output.mp4"

    if not os.path.exists(input_video):
        print(f"Input video not found: {input_video}")
        return

    try:
        stats = process_video(input_video, output_video)
        print("Enhancement Statistics:", stats)
    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    main()




Exception in thread Exception in thread Thread-13 (worker):
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
Thread-12 (worker):
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
        self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
self.run()    
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-1-012741f6092a>", line 267, in worker
self._target(*self._args, **self._kwargs)
  File "<ipython-input-1-012741f6092a>", line 267, in worker
  File "<ipython-input-1-012741f6092a>", line 238, in process_frame
  File "<ipython-input-1-012741f6092a>", line 196, in blend
ValueError: operands could not be broadcast together with shapes (1080,1920,3) (106,106,3) 
  File "<ipython-input-1-012741f6092a>", line 238, in process_frame
  File "<ipython-input-1-012741f6092a>", line 196,

