In [None]:
import os
import subprocess
import torch
from torchvision.models import resnet50, ResNet50_Weights
import cv2
import torch.nn as nn
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence
from torch.optim import Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau
import time

In [None]:
class MinMaxNormalize(nn.Module):
    def __init__(self, min_val=0.0, max_val=1.0):
        super(MinMaxNormalize, self).__init__()
        self.min_val = min_val
        self.max_val = max_val

    def forward(self, x):
        x_min = x.min(dim=0, keepdim=True)[0]
        x_max = x.max(dim=0, keepdim=True)[0]

        x_normalized = (x - x_min) / (x_max - x_min + 1e-10)
        x_scaled = x_normalized * (self.max_val - self.min_val) + self.min_val

        return x_scaled

class VideoDataset(Dataset):
    def __init__(self, directory, num_frames=5):
        self.directory = directory
        self.num_frames = num_frames

        self.file_names = [
            f.split(".")[0]
            for f in os.listdir(directory)
            if f.endswith(".mkv")
        ]

        self.labels = [
            1 if f.startswith("highlights") else 0 for f in self.file_names
        ]

        #전처리
        self.video_transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.RandomResizedCrop((224, 224), scale=(0.8, 1.0)),
            transforms.RandomRotation(degrees=10),
            transforms.GaussianBlur(kernel_size=(5, 5), sigma=(0.1, 2.0)),
            transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
        ])

    def __len__(self):
        return len(self.file_names)

    def __getitem__(self, idx):
        file_name = self.file_names[idx]
        label = self.labels[idx]

        video_path = os.path.join(self.directory, file_name + ".mkv")
        video_tensor = self._load_video_frames(video_path)

        return video_tensor, label

    def _load_video_frames(self, video_path):
        cap = cv2.VideoCapture(video_path)
        frames = []
        frame_count = 0

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            if frame_count % self.num_frames == 0:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frame = self.video_transform(frame)
                frames.append(frame)
            frame_count += 1
            if len(frames) == 75:
                break

        cap.release()

        if len(frames) == 0:
            raise ValueError(f"No frames extracted from video: {video_path}")

        return torch.stack(frames)

def collate_fn(batch):
  video_batch, label_batch = zip(*batch)

  video_batch = pad_sequence(video_batch, batch_first=True, padding_value=0)
  batch_size, sequence, channel, height, width = video_batch.shape
  video_batch = video_batch.view(batch_size * sequence, channel, height, width)

  label_batch = torch.tensor(label_batch)

  return video_batch, label_batch

In [None]:
class GRU(torch.nn.Module):
    def __init__(self, input_size=1000, hidden_size=512, output_size=512, num_layers=1, bidirectional=True):
        super(GRU, self).__init__()

        # GRU 계층
        self.gru = torch.nn.GRU(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=bidirectional
        )

        self.fc = torch.nn.Linear(hidden_size * 2, output_size)

    def forward(self, x):
        _, hidden = self.gru(x)  # output: (batch_size, seq_len, hidden_size * num_directions)
        # hidden: (num_layers * num_directions, batch_size, hidden_size)
        # 양방향 GRU -> 양방향 히든 상태 결합
        hidden = torch.cat((hidden[-2], hidden[-1]), dim=-1)  # (batch_size, hidden_size * 2)
        output = self.fc(hidden)  # (batch_size, output_size)

        return output


class Football_Highlighter(torch.nn.Module):
    def __init__(self):
        super(Football_Highlighter, self).__init__()
        self.GRU = GRU()
        self.resnet = resnet50(weights=ResNet50_Weights.DEFAULT)
        self.fc1 = torch.nn.Linear(512, 768)
        self.fc2 = torch.nn.Linear(768, 2)
        self.layer_norm = torch.nn.LayerNorm(512)
        self.relu = torch.nn.ReLU()
        self.dropout_vid = torch.nn.Dropout(p=0.2)
        self.dropout_fc = torch.nn.Dropout(p=0.3)

    def forward(self, input):

        num_frames, channels, height, width = input.shape
        batch_size = num_frames // 75
        input = self.resnet(input)
        input = input.view(batch_size, num_frames // batch_size, -1)  # Reshape: (batch_size, num_frames, feature_size)
        input = self.GRU(input)
        input = self.layer_norm(input)
        input = self.relu(input)
        input = self.dropout_vid(input)

        x = self.fc1(input)
        x = self.relu(x)
        x = self.dropout_fc(x)
        x = self.fc2(x)
        x = torch.softmax(x, dim=1)
        return x

In [None]:
video_dir = "/content/drive/MyDrive/CVA/highlight_extract"
batch_size = 2
learning_rate = 1e-4
num_epochs = 40
validation_split = 0.2
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

dataset = VideoDataset(video_dir)
val_size = int(len(dataset) * validation_split)
train_size = len(dataset) - val_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4, collate_fn=collate_fn)

model = Football_Highlighter().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=learning_rate)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)

max_acc = 0
max_ep = -1

for epoch in range(num_epochs):
    start_time = time.time()

    model.train()
    train_loss = 0.0
    train_correct = 0
    train_total = 0

    for video, labels in train_loader:
        video = video.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(video)
        loss = criterion(outputs, labels)
        train_loss += loss.item()
        loss.backward()
        optimizer.step()

        _, predicted = torch.max(outputs, 1)
        train_correct += (predicted == labels).sum().item()
        train_total += labels.size(0)

    train_loss /= len(train_loader)
    train_accuracy = train_correct / train_total * 100

    model.eval() # validation
    val_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for video, labels in val_loader:
            video = video.to(device)
            labels = labels.to(device)
            outputs = model(video)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            val_correct += (predicted == labels).sum().item()
            val_total += labels.size(0)

    val_loss /= len(val_loader)
    val_accuracy = val_correct / val_total * 100

    if val_accuracy > max_acc:
        max_acc = val_accuracy
        max_ep = epoch + 1

    scheduler.step(val_loss)
    end_time = time.time()
    epoch_time = end_time - start_time

    model_save_path = os.path.join("/content/drive/MyDrive/CVA/checkpoints", f"./epoch{epoch+1}.pth")
    torch.save(model.state_dict(), model_save_path)
    print(f"Epoch [{epoch+1}/{num_epochs}]")
    print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%")
    print(f"Valid Loss: {val_loss:.4f}, Valid Accuracy: {val_accuracy:.2f}%")

Epoch [1/40]
Train Loss: 0.6391, Train Accuracy: 62.78%
Valid Loss: 0.6061, Valid Accuracy: 69.03%
Epoch [2/40]
Train Loss: 0.5767, Train Accuracy: 74.23%
Valid Loss: 0.6175, Valid Accuracy: 69.03%
Epoch [3/40]
Train Loss: 0.5788, Train Accuracy: 72.25%
Valid Loss: 0.6320, Valid Accuracy: 68.14%
Epoch [4/40]
Train Loss: 0.5559, Train Accuracy: 75.11%
Valid Loss: 0.5514, Valid Accuracy: 76.11%
Epoch [5/40]
Train Loss: 0.5635, Train Accuracy: 74.67%
Valid Loss: 0.6451, Valid Accuracy: 66.37%
Epoch [6/40]
Train Loss: 0.5788, Train Accuracy: 71.81%
Valid Loss: 0.5427, Valid Accuracy: 76.11%
Epoch [7/40]
Train Loss: 0.5282, Train Accuracy: 77.31%
Valid Loss: 0.6992, Valid Accuracy: 60.18%
Epoch [8/40]
Train Loss: 0.5851, Train Accuracy: 72.69%
Valid Loss: 0.5540, Valid Accuracy: 76.11%
Epoch [9/40]
Train Loss: 0.5410, Train Accuracy: 76.65%
Valid Loss: 0.5790, Valid Accuracy: 70.80%
Epoch [10/40]
Train Loss: 0.5212, Train Accuracy: 79.07%
Valid Loss: 0.6067, Valid Accuracy: 69.03%
Epoch [11

In [None]:
from PIL import Image
import subprocess

video_path = r"/content/drive/MyDrive/CVA/input/2_224p.mkv"
video_tensor_path = r"/content/drive/MyDrive/CVA/output5/video_tensor.pt" # 저장할 path
checkpoint_path = f"/content/drive/MyDrive/CVA/checkpoints/epoch{max_ep}.pth"

print(f"checkpoint_path: {checkpoint_path}")

video_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224, 224)),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])
video = cv2.VideoCapture(video_path)
fps = video.get(cv2.CAP_PROP_FPS)
frame_interval = int(fps / 5)
frames = []
frame_count = 0
while True:
    ret, frame = video.read()
    if not ret:
        break
    if frame_count % frame_interval == 0:
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame = Image.fromarray(frame)
        transformed_frame = video_transform(frame)
        frames.append(transformed_frame)
    frame_count += 1
video_tensor = torch.stack(frames)

torch.save(video_tensor, video_tensor_path)
video.release()


def checkpoint_loading(checkpoint_path, device):
    model = Football_Highlighter().to(device)
    checkpoint = torch.load(checkpoint_path, map_location=device)

    if 'model_state_dict' in checkpoint:
        model.load_state_dict(checkpoint['model_state_dict'])
    else:
        model.load_state_dict(checkpoint)

    model.eval()
    return model

def extract_time(video_tensor, model, device):
    outputs = []
    highlights_time = []
    start_frame = 0
    i = 0

    while True:
        end_frame = start_frame + 75
        if end_frame >= 12346:
            return highlights_time
        video = video_tensor[start_frame:end_frame]
        video = video.to(device)

        with torch.no_grad():
            output = model(video)

        output = output.cpu().numpy()
        outputs.append(output)

        if output[0, 1] > 0.88:
            start_time = 3 * i
            end_time = start_time + 15
            if highlights_time and (start_time < highlights_time[-1][1]):
                highlights_time[-1] = (highlights_time[-1][0], end_time)
            else:
                highlights_time.append((start_time, end_time))

        start_frame += 15
        i += 1

def extract_clip(input_path, start_time, end_time, output_path):
    command = [
        "ffmpeg",
        "-y",
        "-i", input_path,
        "-ss", str(start_time),
        "-to", str(end_time),
        "-c:v", "libx264",
        "-preset", "fast",
        "-crf", "23",
        "-c:a", "aac",
        "-b:a", "128k",
        output_path
    ]
    subprocess.run(command, check=True)

def save_clip(video_path, highlights_time):
    if os.path.exists(video_path):
        for i, (start_time, end_time) in enumerate(highlights_time):
            output_path = os.path.join(r"/content/drive/MyDrive/CVA/output5", f"highlights_{i}.mkv")
            if not os.path.exists(output_path):
                extract_clip(video_path, start_time, end_time, output_path)


video_tensor = torch.load(video_tensor_path)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = checkpoint_loading(checkpoint_path, device)
highlights_time = extract_time(video_tensor, model, device)
save_clip(video_path, highlights_time)

checkpoint_path: /content/drive/MyDrive/CVA/checkpoints/epoch27.pth


In [None]:
for i in range(0, 41):
    input_file = f"/content/drive/MyDrive/CVA/output5/highlights_{i}.mkv"
    output_file = f"/content/drive/MyDrive/CVA/output5/highlights_{i}.mkv"
    if not os.path.exists(output_file):
        subprocess.run([
            "ffmpeg", "-i", input_file,
            "-c:v", "libx264", "-c:a", "aac", "-strict", "experimental",
            output_file
        ])

with open("/content/drive/MyDrive/CVA/output5/input.txt", "w") as f:
    for i in range(0, 41):
        f.write(f"file '/content/drive/MyDrive/CVA/output5/highlights_{i}.mkv'\n")

subprocess.run([
    "ffmpeg", "-f", "concat", "-safe", "0", "-i", "/content/drive/MyDrive/CVA/output5/input.txt",
    "-c", "copy", "/content/drive/MyDrive/CVA/output5/output.mkv"
])

CompletedProcess(args=['ffmpeg', '-f', 'concat', '-safe', '0', '-i', '/content/drive/MyDrive/CVA/output5/input.txt', '-c', 'copy', '/content/drive/MyDrive/CVA/output5/output.mkv'], returncode=0)