In [1]:
import torch
import cv2
import numpy as np

from unet import UNet, temporal_smooth

DIR_MODEL_WEIGHTS = "./best_model.pth"
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Usando dispositivo:", DEVICE)

def generate_circular_stain(h, w, radius=25, opacity=1, hardness=0.8):
    # Centro aleatorio
    cx = np.random.randint(radius, w - radius)
    cy = np.random.randint(radius, h - radius)

    y, x = np.ogrid[:h, :w]
    dist = np.sqrt((x - cx)**2 + (y - cy)**2)

    # Distancia normalizada 0..1
    norm = dist / radius

    # "hardness" controla qué tan abrupto es el borde
    mask = np.clip(1 - norm, 0, 1)**hardness

    # opacidad de la mancha
    mask = mask * opacity

    return mask[..., None]

def apply_stain_to_video(frames, stain_mask):
    stained = []
    for frame in frames:
        frame_f = frame / 255.0
        stain = np.ones_like(frame_f) * stain_mask

        # Combinar (mancha oscurece + teñido)
        corrupted = frame_f * (1 - stain_mask) + (0.3 * stain)
        corrupted = np.clip(corrupted, 0, 1)
        
        stained.append((corrupted * 255).astype(np.uint8))

    return np.array(stained)

def save_video(frames, filename="video_con_manchas.mp4", fps=24):
    h, w = frames[0].shape[:2]
    writer = cv2.VideoWriter(
        filename,
        cv2.VideoWriter_fourcc(*"mp4v"),
        fps,
        (w, h)
    )
    for f in frames:
        writer.write(cv2.cvtColor(f, cv2.COLOR_RGB2BGR))
    writer.release()

def read_video(filename):
    cap = cv2.VideoCapture(filename)
    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    cap.release()
    return frames

Usando dispositivo: cuda


In [7]:
frames = read_video("./mmnist-medium/batch_12_video_33.mp4")
frames = np.array(frames)

mask1 = generate_circular_stain(frames.shape[1], frames.shape[2], opacity=0.8)
mask2 = generate_circular_stain(frames.shape[1], frames.shape[2], opacity=1) 
frames_corrupted = apply_stain_to_video(frames, mask1)
frames_corrupted = apply_stain_to_video(frames_corrupted, mask2)

save_video(frames_corrupted, filename="video_test.mp4", fps=24)

In [2]:
video_test = cv2.VideoCapture("./video_test.mp4")

if not video_test.isOpened():
    print("No se pudo abrir el video")
    exit()

while True:
    ret, frame = video_test.read()
    if not ret:
        break

    # Opcional: ventana completa
    cv2.namedWindow("Video", cv2.WINDOW_NORMAL)
    cv2.setWindowProperty("Video", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)

    cv2.imshow("Video", frame)

    if cv2.waitKey(25) & 0xFF == ord('q'):
        break

video_test.release()
cv2.destroyAllWindows()

In [2]:
CHANNELS_IN = 3
CHANNELS = 64
    
model = UNet(CHANNELS_IN, CHANNELS).to(DEVICE)
model.load_state_dict(torch.load(DIR_MODEL_WEIGHTS))
model.eval()

UNet(
  (initial_conv): DoubleConv(
    (double_conv): Sequential(
      (0): Conv3K(
        (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      )
      (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
      (3): Conv3K(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      )
      (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (5): ReLU()
    )
  )
  (down_conv1): DownConv(
    (encoder): Sequential(
      (0): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (1): DoubleConv(
        (double_conv): Sequential(
          (0): Conv3K(
            (conv1): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
          )
          (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): ReLU()
          (3): Conv3K(
            (conv1): Conv

In [4]:
test_frames = cv2.VideoCapture("./video_test.mp4")

frames_out = []

while True:
    ret, frame = test_frames.read()
    if not ret:
        break

    # Convertir a RGB y normalizar
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0

    # De (H, W, C) a (C, H, W) y añadir batch dimension
    frame_tensor = torch.from_numpy(frame_rgb).permute(2, 0, 1).unsqueeze(0).to(DEVICE)

    _, H, W = frame_tensor.shape[1:]
    pad_h = (16 - H % 16) % 16
    pad_w = (16 - W % 16) % 16
    if pad_h > 0 or pad_w > 0:
        # (left, right, top, bottom)
        pad = (0, pad_w, 0, pad_h)
        frame_tensor = torch.nn.functional.pad(frame_tensor, pad)

    # 3️⃣ Inferencia
    with torch.no_grad():
        output = model(frame_tensor)

    # Devolver a formato imagen
    output_frame = output.squeeze(0).permute(1, 2, 0).cpu().numpy()
    output_frame = np.clip(output_frame * 255, 0, 255).astype(np.uint8)

    frames_out.append(output_frame)

test_frames.release()

frames_array = np.array(frames_out)
frames_smooth = temporal_smooth(frames_array, strength=0.6)

# 4️⃣ Guardar video reconstruido
out_path = "video_reconstruido.mp4"
h, w, _ = frames_smooth[0].shape
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(out_path, fourcc, 24, (w, h))

for f in frames_smooth:
    out.write(cv2.cvtColor(f, cv2.COLOR_RGB2BGR))

out.release()

In [5]:
video_resolucion = cv2.VideoCapture("./video_reconstruido.mp4")

if not video_resolucion.isOpened():
    print("No se pudo abrir el video")
    exit()

while True:
    ret, frame = video_resolucion.read()
    if not ret:
        break

    # Opcional: ventana completa
    cv2.namedWindow("Video", cv2.WINDOW_NORMAL)
    cv2.setWindowProperty("Video", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)

    cv2.imshow("Video", frame)

    if cv2.waitKey(25) & 0xFF == ord('q'):
        break

video_resolucion.release()
cv2.destroyAllWindows()