In [1]:
import io
import logging
import numpy as np
import imageio.v3 as iio

# Init the logger.
log = logging.getLogger(__name__)

class MotionDetector():

    def get_frames_as_ndarray(self, fragment_bytes, one_in_frames_ratio):
        '''
        Parses fragment_bytes and returns a ratio of available frames in the MKV fragment as
        a list of numpy.ndarray's.

        e.g: Setting one_in_frames_ratio = 5 will return every 5th frame found in the fragment.
        (Starting with the first)

        To return all available frames just set one_in_frames_ratio = 1

        ### Parameters:

            fragment_bytes: bytearray
                A ByteArray with raw bytes from exactly one fragment.

            one_in_frames_ratio: Str
                Ratio of the available frames in the fragment to process and return.

        ### Return:

            frames: List<numpy.ndarray>
            A list of frames extracted from the fragment as numpy.ndarray
        
        '''

        # Parse all frames in the fragment to frames list
        frames = iio.imread(io.BytesIO(fragment_bytes), plugin="pyav", index=...)

        # Store and return frames in frame ratio of total available 
        ret_frames = []
        for i in range(0, len(frames), one_in_frames_ratio):
            ret_frames.append(frames[i])

        return ret_frames

    def save_frames_as_jpeg(self, ndarray_frames, jpg_file_base_path):
        '''
        Saves frames as
        JPEGs on the local disk.
        ### Parameters:

        ndarray_frames: List<numpy.ndarray>
            A ByteArray with raw bytes from exactly one fragment.

        ### Return
        jpeg_paths : List<Str>
            A list of file paths to the saved JPEN files. 
        
        '''

        # Write frames to disk as JPEG images
        jpeg_paths = []
        for i in range(len(ndarray_frames)):
            frame = ndarray_frames[i]
            image_file_path = '{}-{}.jpg'.format(jpg_file_base_path, i)
            iio.imwrite(image_file_path, frame, format=None)
            jpeg_paths.append(image_file_path)
        
        return jpeg_paths

    def pixel_difference(self, frames, threshold=50):
        significant_frames = [frames[0]]  # Almacena el primer frame como base
        for i in range(1, len(frames)):
            diff = np.abs(frames[i].astype(np.int16) - frames[i-1].astype(np.int16))  # Diferencia de píxeles
            if np.mean(diff) > threshold:  # Compara con el umbral
                significant_frames.append(frames[i])
        return significant_frames


Codigo para poder procesar los Key frames de forma independiente al flujo para optimizarlo

In [None]:
import cv2
import os
import numpy as np
import matplotlib.pyplot as plt  # Import for displaying images in Jupyter notebook

# Configuración
video_paths = [f"../videos/video{i}.mp4" for i in range(1, 7)]
frame_interval = 30  # Capturar 1 frame cada 30
pixel_diff_threshold = 10  # Umbral para considerar que hay movimiento significativo

# Procesar cada video
for video_path in video_paths:
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error al abrir el video: {video_path}")
        continue

    frames = []
    frame_count = 0
    key_frame_count = 0
    prev_frame = None
    significant_frames = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Extraer 1 de cada 30 frames
        if frame_count % frame_interval == 0:
            print(f"FRAME {frame_count}")

            key_frame_count +=1
            frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)  # Convertir a escala de grises

            if prev_frame is not None:
                # Calcular diferencia de píxeles
                diff = cv2.absdiff(frame_gray, prev_frame)
                mean_diff = np.mean(diff)
                print(f"Media {mean_diff} y Umbral {pixel_diff_threshold}" )
                # Mostrar frame si supera el umbral
                if mean_diff > pixel_diff_threshold:
                    print(f"Supera el umbral")
                    plt.imshow(frame, cmap='gray')  # Display grayscale image
                    plt.title(f"Frame {frame_count} - Video: {os.path.basename(video_path)}")
                    plt.show()  # Display the image
                    significant_frames.append(frame)

            prev_frame = frame_gray  # Actualizar el frame previo

        frame_count += 1

    cap.release()

    # Información del video procesado
    print(f"Procesado: {video_path}")
    print(f"Frames totales: {frame_count}")
    print(f"KEY Frames totales: {key_frame_count}")
    print(f"Frames significativos detectados: {len(significant_frames)}")

Codigo empleando SSIM

In [None]:
import cv2
import numpy as np
from skimage.metrics import structural_similarity as ssim
import matplotlib.pyplot as plt
import os

# Configuración
video_paths = [f"../videos/video{i}.mp4" for i in range(1, 7)]
output_dir = "../frames/ssim_motion/"
os.makedirs(output_dir, exist_ok=True)

# Umbral de SSIM
SSIM_THRESHOLD = 0.87

# Función para procesar un video
def process_video(video_path, frame_step=30):
    cap = cv2.VideoCapture(video_path)
    motion_frames = []  # Lista para guardar frames con movimiento
    key_frame = None
    frame_count = 0
    key_frame_count = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Extraer 1 de cada `frame_step` frames
        if frame_count % frame_step == 0:
            # Convertir el frame a escala de grises
            gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            key_frame_count += 1
            print(f"FRAME {frame_count}")

            if key_frame is None:
                # Primer frame siempre es el key frame
                key_frame = gray_frame
                motion_frames.append(frame)
                continue

            # Calcular SSIM con el key frame
            score, _ = ssim(key_frame, gray_frame, full=True)
            print(f"Score {score} y Umbral {SSIM_THRESHOLD}")
            if score < SSIM_THRESHOLD:
                # Cambio significativo: Guardar el frame actual
                motion_frames.append(frame)
                key_frame = gray_frame  # Actualizar el key frame

        frame_count += 1
    print(f"KEY frames {key_frame_count}")
    cap.release()
    return motion_frames

# Procesar cada video
for video_path in video_paths:
    video_name = os.path.basename(video_path).split('.')[0]
    motion_frames = process_video(video_path)

    print(f"Video: {video_name}, Motion Frames Detectados: {len(motion_frames)}")

    # Guardar y mostrar los frames con matplotlib
    for i, frame in enumerate(motion_frames):
        output_path = os.path.join(output_dir, f"{video_name}_motion_{i}.jpg")
        cv2.imwrite(output_path, frame)

        # Mostrar con matplotlib
        plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        plt.title(f"Motion Frame {i+1} - {video_name}")
        plt.axis('off')
        plt.show()


Modelo de Flujo optico

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import os

# Configuración
video_paths = [f"../videos/video{i}.mp4" for i in range(1, 4)]

# Parámetros del flujo óptico y umbral
FRAME_STEP = 30
MOTION_THRESHOLD = 2.5  # Ajusta este valor para definir qué nivel de movimiento es significativo

def process_video_with_optical_flow(video_path, frame_step=FRAME_STEP):
    cap = cv2.VideoCapture(video_path)
    motion_frames = []
    prev_gray = None
    frame_count = 0
    key_frame_count = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Procesar solo frames seleccionados
        if frame_count % frame_step == 0:
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            key_frame_count+=1
            print(f"FRAME {key_frame_count}")
            if prev_gray is None:
                prev_gray = gray
                continue

            # Calcular el flujo óptico con Farneback
            flow = cv2.calcOpticalFlowFarneback(
                prev_gray, gray, None,
                pyr_scale=0.5, levels=3, winsize=15,
                iterations=3, poly_n=5, poly_sigma=1.2, flags=0
            )

            # Magnitud del movimiento (norma de los vectores de flujo)
            magnitude, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])

            # Promedio de la magnitud del movimiento
            mean_motion = np.mean(magnitude)
            print(f"MEAN {mean_motion} y Umbral {MOTION_THRESHOLD}")
            if mean_motion > MOTION_THRESHOLD:
                # Cambio significativo detectado
                motion_frames.append(frame)

            # Actualizar el frame previo
            prev_gray = gray

        frame_count += 1
    print(f"KEY frames {key_frame_count}")
    cap.release()
    return motion_frames

# Procesar cada video
for video_path in video_paths:
    video_name = os.path.basename(video_path).split('.')[0]
    motion_frames = process_video_with_optical_flow(video_path)

    print(f"Video: {video_name}, Motion Frames Detectados: {len(motion_frames)}")

    # Guardar y mostrar los frames con matplotlib
    for i, frame in enumerate(motion_frames):

        # Mostrar con matplotlib
        plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        plt.title(f"Motion Frame {i+1} - {video_name}")
        plt.axis('off')
        plt.show()


Frame Differencig

In [None]:
import cv2
import matplotlib.pyplot as plt
import numpy as np

# Lista de videos
video_paths = [f"../videos/video{i}.mp4" for i in range(1, 4)]

def process_video_with_frame_diff(video_path):
    cap = cv2.VideoCapture(video_path)
    prev_frame = None
    frame_count = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        frame_count += 1
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # Calcular diferencia de frames
        if prev_frame is not None:
            diff = cv2.absdiff(prev_frame, gray)
            _, diff_thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)
            
            # Mostrar resultados cada 30 frames
            if frame_count % 30 == 0:
                plt.figure(figsize=(10, 5))
                plt.subplot(1, 2, 1)
                plt.imshow(gray, cmap='gray')
                plt.title("Frame Original")
                plt.axis("off")
                
                plt.subplot(1, 2, 2)
                plt.imshow(diff_thresh, cmap='gray')
                plt.title("Detección de Movimiento (Frame Differencing)")
                plt.axis("off")
                
                plt.show()
        
        # Actualizar el frame previo
        prev_frame = gray
    
    cap.release()

# Procesar cada video
for video in video_paths:
    process_video_with_frame_diff(video)


Background substraction

In [None]:
import cv2
import matplotlib.pyplot as plt
import numpy as np

# Lista de videos
video_paths = [f"../videos/video{i}.mp4" for i in range(1, 4)]

# Configuración de background subtraction
bg_subtractor = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=50, detectShadows=True)

def process_video_with_bg_subtraction(video_path):
    cap = cv2.VideoCapture(video_path)
    frame_count = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        frame_count += 1
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # Aplicar background subtraction
        fg_mask = bg_subtractor.apply(gray)
        
        # Mostrar resultados cada 30 frames
        if frame_count % 30 == 0:
            plt.figure(figsize=(10, 5))
            plt.subplot(1, 2, 1)
            plt.imshow(gray, cmap='gray')
            plt.title("Frame Original")
            plt.axis("off")
            
            plt.subplot(1, 2, 2)
            plt.imshow(fg_mask, cmap='gray')
            plt.title("Detección de Movimiento (BG Subtraction)")
            plt.axis("off")
            
            plt.show()
    
    cap.release()

# Procesar cada video
for video in video_paths:
    process_video_with_bg_subtraction(video)


Diferencia entre difference y BG

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt

# Lista de videos
# video_paths = [f"../videos/video{i}.mp4" for i in range(1, 4)]
video_paths = [f"../videos/video{i}.mp4" for i in range(1, 2)]

# Configuración de background subtraction
bg_subtractor = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=50, detectShadows=True)

def process_video_combined(video_path):
    cap = cv2.VideoCapture(video_path)
    frame_count = 0
    prev_frame = None  # Para frame differencing

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if frame_count % 30 == 0:
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

            # **Frame Differencing**
            if prev_frame is not None:
                diff = cv2.absdiff(prev_frame, gray)
                _, diff_thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)
                print(f"EL umbral es {diff_thresh}")
            else:
                diff_thresh = np.zeros_like(gray)

            # Actualizar el frame previo
            prev_frame = gray

            # **Background Subtraction**
            fg_mask = bg_subtractor.apply(gray)

            # Mostrar resultados cada 30 frames
            plt.figure(figsize=(15, 5))
            
            # Frame Original
            plt.subplot(1, 3, 1)
            plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            plt.title(f"Frame {frame_count} - Original")
            plt.axis("off")
            
            # Frame Differencing
            plt.subplot(1, 3, 2)
            plt.imshow(diff_thresh, cmap="gray")
            plt.title("Frame Differencing")
            plt.axis("off")
            
            # Background Subtraction
            plt.subplot(1, 3, 3)
            plt.imshow(fg_mask, cmap="gray")
            plt.title("Background Subtraction")
            plt.axis("off")
            
            plt.tight_layout()
            plt.show()
        frame_count += 1

    cap.release()

# Procesar cada video
for video in video_paths:
    process_video_combined(video)

cv2.destroyAllWindows()
