In [None]:
# !pip install torch torchvision timm transformers opencv-python
# !apt-get install ffmpeg -y

In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from PIL import Image
from transformers import BertTokenizer
import os
import timm

In [8]:
class LipReadingDataset(Dataset):
    def __init__(self, frame_paths, annotations, tokenizer, transform, fps, frames_per_chunk=30, load_from_file=None):
        self.frame_paths = frame_paths  # 비디오 프레임 경로 리스트의 리스트
        self.annotations = annotations  # 어노테이션 리스트의 리스트
        self.tokenizer = tokenizer
        self.transform = transform
        self.fps = fps  # 프레임 레이트
        self.frames_per_chunk = frames_per_chunk  # 시퀀스 당 프레임 수

        if load_from_file is not None:
            self.load_dataset(load_from_file)
        else:
            self.data = self.prepare_dataset()

    def prepare_dataset(self):
        data_list = []
        for video_frames, video_annotations in zip(self.frame_paths, self.annotations):
            num_frames = len(video_frames)
            frame_text_mapping = self.map_frames_to_text(video_frames, video_annotations, self.fps, num_frames, self.frames_per_chunk)
            for start_frame, end_frame, text in frame_text_mapping:
                # 프레임 로드 및 전처리
                frames = []
                for frame_idx in range(start_frame, end_frame + 1):
                    if frame_idx >= len(video_frames):
                        break  # 인덱스 오류 방지
                    frame_path = video_frames[frame_idx]
                    frame = self.preprocess_video_frame(frame_path)
                    frames.append(frame)
                # 필요한 경우 프레임 패딩
                while len(frames) < self.frames_per_chunk:
                    frames.append(torch.zeros_like(frames[0]))
                frames_tensor = torch.stack(frames)  # (frames_per_chunk, C, H, W)
                # 텍스트 토크나이즈
                tokens = self.tokenizer(
                    text,
                    return_tensors="pt",
                    padding="max_length",
                    max_length=50,
                    truncation=True
                )
                input_ids = tokens['input_ids'].squeeze(0)  # (max_length,)
                attention_mask = tokens['attention_mask'].squeeze(0)  # (max_length,)
                data_list.append({
                    'video': frames_tensor,
                    'input_ids': input_ids,
                    'attention_mask': attention_mask
                })
        return data_list

    def __len__(self):
        return len(self.data)  # 총 시퀀스 수

    def __getitem__(self, idx):
        return self.data[idx]

    def preprocess_video_frame(self, frame_path):
        img = Image.open(frame_path).convert('RGB')
        img_tensor = self.transform(img)
        return img_tensor

    def map_frames_to_text(self, frame_paths, annotations, fps, num_frames, frames_per_chunk):
        frame_text_mapping = []
        for start_time, end_time, text in annotations:
            total_text_frames = int((end_time - start_time) * fps)
            if total_text_frames == 0:
                continue
            words = text.split()
            total_words = len(words)
            frames_per_word = [max(1, total_text_frames // total_words)] * total_words
            leftover_frames = total_text_frames - sum(frames_per_word)
            for i in range(leftover_frames):
                frames_per_word[i % total_words] += 1
            current_frame = int(start_time * fps)
            for word, frame_count in zip(words, frames_per_word):
                start_frame = current_frame
                end_frame = min(current_frame + frame_count - 1, num_frames - 1)
                frame_text_mapping.append((start_frame, end_frame, word))
                current_frame += frame_count
        # 시퀀스 길이에 맞게 프레임을 묶음
        sequences = []
        for i in range(0, len(frame_text_mapping), frames_per_chunk):
            seq_frames = frame_text_mapping[i:i+frames_per_chunk]
            if len(seq_frames) == 0:
                continue
            start_frame = seq_frames[0][0]
            end_frame = seq_frames[-1][1]
            texts = ' '.join([t[2] for t in seq_frames])
            sequences.append((start_frame, end_frame, texts))
        return sequences

    # 데이터셋 저장 함수
    def save_dataset(self, file_path):
        # 데이터를 저장
        torch.save(self.data, file_path)
        print(f"Dataset saved to {file_path}")
    
    # 데이터셋 로드 함수
    def load_dataset(self, file_path):
        self.data = torch.load(file_path)
        print(f"Dataset loaded from {file_path}")


In [9]:
class Frontend(nn.Module):
    def __init__(self, model_type="convnext"):
        super(Frontend, self).__init__()
        if model_type == "convnext":
            self.model = timm.create_model('convnext_base', pretrained=True)
            feature_dim = self.model.get_classifier().in_features
            self.model.reset_classifier(0)
        self.feature_dim = feature_dim

    def forward(self, x):
        x = self.model(x)
        return x  # (batch_size * seq_len, feature_dim)

class LipReadingModel(nn.Module):
    def __init__(self, vocab_size, feature_dim, frontend_type="convnext"):
        super(LipReadingModel, self).__init__()
        self.frontend = Frontend(model_type=frontend_type)
        self.lstm = nn.LSTM(input_size=feature_dim, hidden_size=512, num_layers=2, batch_first=True)
        self.fc = nn.Linear(512, vocab_size)

    def forward(self, video):
        batch_size, seq_len, C, H, W = video.size()
        video = video.view(batch_size * seq_len, C, H, W)
        features = self.frontend(video)  # (batch_size * seq_len, feature_dim)
        features = features.view(batch_size, seq_len, -1)  # (batch_size, seq_len, feature_dim)
        outputs, _ = self.lstm(features)  # (batch_size, seq_len, hidden_size)
        outputs = self.fc(outputs)  # (batch_size, seq_len, vocab_size)
        return outputs


In [None]:
from torch.utils.data import Dataset
import torch
from transformers import BertTokenizer
from torch.utils.data import DataLoader

class SavedLipReadingDataset(Dataset):
    def __init__(self, data_list, transform=None):
        self.data = data_list
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        data = self.data[idx]
        # 영상 프레임 경로를 사용하여 이미지를 로드하고 transform 적용
        video_frames = data['video_paths']  # 이미지 경로 리스트
        frames = [self.transform(Image.open(frame).convert('RGB')) for frame in video_frames]
        video_tensor = torch.stack(frames)
        return {
            'video': video_tensor,
            'input_ids': data['input_ids'],
            'attention_mask': data['attention_mask']
        }

# 데이터셋 로드 함수
def load_dataset(file_path):
    # 저장된 데이터를 불러오기
    data_list = torch.load(file_path)
    print(f"Dataset loaded from {file_path}")
    return data_list

# 토크나이저 설정
tokenizer = BertTokenizer.from_pretrained("beomi/kobert")

# 저장된 데이터셋 불러오기
save_path = 'lip_reading_dataset.pt'
loaded_data = load_dataset(save_path)
dataset = SavedLipReadingDataset(loaded_data)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True, num_workers=4)


In [None]:
import torchvision.transforms as transforms
from PIL import Image

# 추론 시에 필요한 변환기 정의
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# 추론 함수
def infer(model, video_frames, tokenizer, transform, device='cpu'):
    model.eval()
    processed_frames = [transform(Image.open(frame).convert('RGB')) for frame in video_frames]
    video_tensor = torch.stack(processed_frames).unsqueeze(0).to(device)  # (1, seq_len, C, H, W)
    with torch.no_grad():
        outputs = model(video_tensor)  # (1, seq_len, vocab_size)
        predicted_ids = torch.argmax(outputs, dim=-1).squeeze(0)  # (seq_len,)
    predicted_text = tokenizer.decode(predicted_ids.tolist(), skip_special_tokens=True)
    return predicted_text


In [None]:
# 모델, 손실 함수, 최적화 설정
vocab_size = tokenizer.vocab_size
feature_dim = 1024  # convnext_base의 출력 차원
model = LipReadingModel(vocab_size=vocab_size, feature_dim=feature_dim, frontend_type="convnext")
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = model.to(device)

criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# 모델 학습 시작
train_model(model, dataloader, criterion, optimizer, device, num_epochs=10)


In [None]:
# 데이터 로드 후 DataLoader 생성
# 저장된 데이터셋 불러오기
save_path = 'lip_reading_dataset.pt'
loaded_data = load_dataset(save_path)

# 불러온 데이터 확인
for i in range(3):  # 첫 3개 항목만 확인
    print(f"Video Shape: {loaded_data[i]['video'].shape}, Input IDs: {loaded_data[i]['input_ids']}, Attention Mask: {loaded_data[i]['attention_mask']}")

dataloader = DataLoader(dataset, batch_size=8, shuffle=True, num_workers=4)

# 모델, 손실 함수, 최적화 설정
model = LipReadingModel(frontend_type="convnext", backend_type="swin")
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# 학습 시작
train_model(model, dataloader, criterion, optimizer, device, num_epochs=10)

In [None]:
def infer(model, video_frames, tokenizer, transform, device='cpu'):
    model.eval()
    processed_frames = [transform(Image.open(frame).convert('RGB')) for frame in video_frames]
    video_tensor = torch.stack(processed_frames).unsqueeze(0).to(device)  # (1, seq_len, C, H, W)
    with torch.no_grad():
        outputs = model(video_tensor)  # (1, seq_len, vocab_size)
        predicted_ids = torch.argmax(outputs, dim=-1).squeeze(0)  # (seq_len,)
    predicted_text = tokenizer.decode(predicted_ids.tolist(), skip_special_tokens=True)
    return predicted_text


In [None]:
tokenizer = BertTokenizer.from_pretrained("beomi/kobert")

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

dataset = LipReadingDataset(
    frame_paths=[list_of_frame_paths],  # 비디오 프레임 경로 리스트의 리스트
    annotations=[list_of_annotations],  # 어노테이션 리스트의 리스트
    tokenizer=tokenizer,
    transform=transform,
    fps=25,
    frames_per_chunk=30
)

dataloader = DataLoader(dataset, batch_size=8, shuffle=True, num_workers=4)


# 테스트할 비디오 디렉토리 경로 예시
test_video_dir = './videos/video1'  # 예시 비디오 디렉토리
video_frames = load_test_video_frames(test_video_dir)  # 테스트 비디오 프레임 경로 리스트

model = LipReadingModel(frontend_type="convnext", backend_type="swin").to(device)  # 학습된 모델을 로드
tokenizer = BertTokenizer.from_pretrained("beomi/kobert")  # 텍스트 추출용 BERT 토크나이저

predicted_text = infer(model, video_frames, tokenizer, transform, device)
print("Predicted Text:", predicted_text)