In [None]:
import os
import json
import ast
import time, random
import pysrt
from tqdm import tqdm
from wtpsplit import SaT
# from gpt4api.api_wrap import OpenAIAPIWrapper

def fix_youtube_vtt(vtt_file_path) -> str:
    """Fixes Youtube's autogenerated VTT subtitles and returns a srt-formatted string"""

    import webvtt

    pretty_subtitle = ''  
    previous_caption_text = ''
    i = 1
    for caption in webvtt.read(vtt_file_path):

        if previous_caption_text == caption.text.strip():
            # if previous and current lines are `identical`, print the start time from the previous
            # and the end time from the current.
            pretty_subtitle += f"{i}\n{previous_caption_start} --> {caption.end}\n{previous_caption_text}\n\n"
            i += 1

        elif previous_caption_text == caption.text.strip().split("\n")[0]: 
            # if the current caption is multiline, and the previous caption is equal to 
            # the current's first line, just ignore the first line and move on with the second.
            previous_caption_text = caption.text.strip().split("\n")[1]
            previous_caption_start = caption.start
            last_caption_end = caption.end

        else:	    
            previous_caption_text = caption.text.strip()
            previous_caption_start = caption.start.strip()

    return pretty_subtitle

In [None]:
def contained(ref, pred):
    "calcuate bleu score"
    ref_lst = ref.lower().split()
    pred_lst = pred.lower().split()
    h_1 = 0
    for ps in pred_lst:
        if ps in ref_lst: h_1 += 1
    return h_1 / len(pred_lst)
        
def openai_seg(subtitle_filter):
    "openai for sentence segment"
    prompt = f"Hi, ChatGPT! I have a series of subtitles from a video broken down into timestamped segments. I would like to merge these subtitles to form complete sentence lists. Here are the subtitles:\n\n{subtitle_filter}\nPlease response with string list, with each element is a complete sentence, don't output any other things.\nFor example, your response should look like ["", ]"
    try_cnt = 5
    while try_cnt > 0:
        try:
            gpt4 = OpenAIAPIWrapper()
            subtitle_merge, _ = gpt4.get_completion(prompt)
            subtitle_merge = ast.literal_eval(subtitle_merge)
            break
        except Exception as e:
            print(f"Encounter Error {e}")
            print(f"Rest retry counts: {try_cnt}")
            try_cnt -= 1
            time.sleep(random.randint(3,5))
    return subtitle_merge

# SaT for sentence segment: https://github.com/segment-any-text/wtpsplit
sat = SaT("sat-3l-sm")
sat.half().to("cuda")
print('load model')


## Preprocess subtitles: sentence segment

making sure each dialogue are complete sentence

In [26]:

subtitles = [
    "--aVuWXcdTs.vtt.en.vtt",
    "--BL4qjDW8c.vtt.en.vtt",
    "--SbhVvgxf8.vtt.en.vtt",
    "-0aYDY57Thc.vtt.en.vtt",
    "-1syqqeLbHc.vtt.en.vtt"
]

for subtitle in tqdm(subtitles):
    subtitle_filter = fix_youtube_vtt(f"videos/{subtitle}")
    with open(f"videos/{subtitle}_filter.srt", "w") as fp:
        fp.write(subtitle_filter)
    subtitles = pysrt.open(f"videos/{subtitle}_filter.srt")
    subtitles_string = [sub.text for sub in subtitles]
    subtitles_string = ' '.join(subtitles_string)
    # break
    # merge sentence
    # # way1 openai    
    # subtitle_merge = openai_seg(subtitle_filter)
    # way2 sat
    subtitle_merge = sat.split(subtitles_string, threshold=0.4)
    # print(subtitle_merge)
    
    idx = 0
    res = []
    prev_sub = ""
    prev_start = 0.0
    prev_end = 0.0
    for sub in subtitles:
        sub_text = sub.text
        h, m, s, ms = sub.start.hours, sub.start.minutes, sub.start.seconds, sub.start.milliseconds
        start = h * 3600 + m * 60 + s + ms / 1000
        h, m, s, ms = sub.end.hours, sub.end.minutes, sub.end.seconds, sub.end.milliseconds
        end = h * 3600 + m * 60 + s + ms / 1000
        # if contained(subtitle_merge[idx], sub_text) > 0.6:
        if sub_text.lower() in subtitle_merge[idx].lower():
            prev_sub += sub_text
            prev_end = end
        else:
            res.append({
                "start": prev_start,
                "end": prev_end,
                "subtitle": subtitle_merge[idx]
            })
            prev_start = start
            prev_end = end
            prev_sub = ""
            idx += 1
    if prev_sub:
        res.append({
            "start": prev_start,
            "end": prev_end,
            "subtitle": subtitle_merge[idx] 
        })
    with open("preprocessed_data/"+subtitle.split('.')[0]+'.json', "w") as jp:
        json.dump(res, jp, indent=4)


100%|██████████| 5/5 [00:00<00:00, 14.88it/s]


In [None]:
def get_caption(video, start, end):
    video = video.subclip(start, end)
    # get caption for video

In [None]:
import json
import random
from moviepy.editor import VideoFileClip

from .proxy_task import Tasks

dialogues = [
    "--aVuWXcdTs.json",
    "--BL4qjDW8c.json",
    "--SbhVvgxf8.json",
    "-0aYDY57Thc.json",
    "-1syqqeLbHc.json"
]

instructions = []

for dias in dialogues:
    dias = json.load(open(f"preprocessed_data/{dias}"))
    
    timestamp = random.choice(range(1, len(dias)))
    vid = dias.split('.')[0]
    vid_path = f"videos/{vid}.mp4"
    video = VideoFileClip(vid_path)
    task = random.choice(Tasks.tasks)
    new_vid_path = f"preprocess/{vid}_{timestamp}.mp4"
    
    if task == "t2d":
        ts = random.choice(list(Tasks.t2d.keys()))
        if ts == "past":
            dialogue = dias[timestamp-1]["dialogue"]
        elif ts == "current":
            dialogue = dias[timestamp]["dialogue"]
        instruct = Tasks.t2d[ts]
        answer = dialogue
    elif task == 't2c':
        ts = random.choice(list(Tasks.t2c.keys()))
        if ts == "past":
            caption = get_caption(video, dias[timestamp-1]["start"], dias[timestamp-1]["end"])
        elif ts == "current":
            caption = get_caption(video, dias[timestamp]["start"], dias[timestamp]["end"])
        instruct = Tasks.t2d[ts]
        answer = caption
    elif task == 'd2c':
        dialogue = dias[timestamp]["dialogue"]
        instruct = random.choice(Tasks.d2c).format_map({"dialogue": dialogue})
        caption = get_caption(video, dias[timestamp]["start"], dias[timestamp]["end"])
        answer = caption
    elif task == 'c2d':
        caption = get_caption(video, dias[timestamp]["start"], dias[timestamp]["end"])
        instruct = random.choice(Tasks.c2d).format_map({"caption": caption})
        dialogue = dias[timestamp]["dialogue"]
        answer = caption
    
    instructions.append({
        "video": new_vid_path,
        "instruction": instruct,
        "answer": answer,
    })
    
    