In [None]:
import os
import pandas as pd
import requests
import re
from sentence_transformers import SentenceTransformer
import easyocr
import cv2
import subprocess
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
from PIL import Image
from tqdm.notebook import tqdm

In [None]:
data = pd.read_csv("yappy_hackaton_2024_400k.csv")

In [None]:
data.head(100)

In [None]:
data.info()

In [None]:
def download_video(url, filename):
    response = requests.get(url)
    with open(filename, 'wb') as f:
        f.write(response.content)

In [None]:
def trim_video(input_path, output_path, duration=15):
    command = f'ffmpeg -i {input_path} -t {duration} -c copy {output_path} -y'
    with open(os.devnull, 'w') as fnull:
        subprocess.call(command, shell=True, stdout=fnull, stderr=fnull)

In [None]:
def clear_string(text):
    if pd.isna(text):
        return ''
    text = re.sub(r'[^a-zA-Zа-яА-Я0-9]', ' ', text)
    text = re.sub(r'\s+', ' ', text)
    return text.strip().lower()

In [None]:
sentence_transformer = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')

In [None]:
def get_embedding(text):
    return sentence_transformer.encode(text)

In [None]:
reader = easyocr.Reader(['ru', 'en'])

In [None]:
def ocr_from_video(video_path, target_second=1):
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    target_frame = int(fps * target_second)
    cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
    ret, frame = cap.read()
    if ret:
        texts = reader.readtext(frame, detail=0, paragraph=True)
    else:
        texts = []
        print('Something went wrong (ocr_from_video): cannot read video frame')
    cap.release()
    text = ' '.join(texts)
    return clear_string(text)

In [None]:
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

model_id = 'openai/whisper-large-v3'

speech_model = AutoModelForSpeechSeq2Seq.from_pretrained(model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True)
speech_model.to(device)

processor = AutoProcessor.from_pretrained(model_id)

speech_pipe = pipeline(
    "automatic-speech-recognition",
    model=speech_model,
    tokenizer=processor.tokenizer,
    feature_extractor=processor.feature_extractor,
    max_new_tokens=128,
    chunk_length_s=30,
    batch_size=16,
    return_timestamps=True,
    torch_dtype=torch_dtype,
    device=device,
)

In [None]:
def speech_to_text(video_path):
    audio_path = 'temp_audio.mp3'
    command = f"ffmpeg -i {video_path} -q:a 0 -map a {audio_path} -y"
    with open(os.devnull, 'w') as fnull:
        subprocess.call(command, shell=True, stdout=fnull, stderr=fnull)
    
    text = speech_pipe(audio_path)['text']
    return clear_string(text)

In [None]:
caption_pipe = pipeline('image-to-text', model='nlpconnect/vit-gpt2-image-captioning')

In [None]:
def frame_to_pil(frame):
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    pil_image = Image.fromarray(rgb_frame)
    return pil_image

In [None]:
def image_caption(video_path, target_second=1):
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    target_frame = int(fps * target_second)
    cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
    ret, frame = cap.read()
    if ret:
        text = caption_pipe(frame_to_pil(frame))[0]['generated_text']
    else:
        text = ''
    cap.release()
    return clear_string(text)

In [None]:
def extract_embeddings(row):
    link = row['link']
    description = row['description']
    
    video_filename = 'temp_video.mp4'
    trimmed_video_filename = 'trimmed_video.mp4'
    download_video(link, video_filename)
    trim_video(video_filename, trimmed_video_filename)
    
    clean_description = clear_string(description)
    clean_description_embed = get_embedding(clean_description)
    
    video_text = ocr_from_video(trimmed_video_filename)
    video_text_embed = get_embedding(video_text)
    
    speech_text = speech_to_text(trimmed_video_filename)
    speech_text_embed = get_embedding(speech_text)
    
    caption_text = image_caption(trimmed_video_filename)
    caption_text_embed = get_embedding(caption_text)
    
    return clean_description_embed, video_text_embed, speech_text_embed, caption_text_embed

In [None]:
embedding_list = []

for idx, row in tqdm(data.iterrows(), total=len(data)):
    embeddings = extract_embeddings(row)
    embedding_list.append(embeddings)

os.remove('temp_video.mp4')
os.remove('trimmed_video.mp4')
os.remove('temp_audio.mp3')

embedding_tensor = torch.stack([torch.cat(embeddings) for embeddings in embedding_list])
torch.save(embedding_tensor, 'yappy_tensor.pt')