<a href="https://colab.research.google.com/github/soheilpaper/-tft-2.4-ili9341-STM32/blob/master/youtube_subtitle/Transcribing_YT_Videos_2_Text.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Transcribing YouTube videos 📹 to text ✍

Here is the basic idea for this workflow:   

1. Get a URL from YT
2. Use pytube to download the video locally as .MP4
3. Use Whisper by OpenAI to transcribe the audio
4. Generate subtitles for YT
4. Analyze the extracted text:  
  - Summarize it
  - Extract questions
  - Answer the above questions
  - Answer any question

---
If you are interested in [SEO automation](https://wordlift.io/blog/en/seo-automation/) you might want to read this blog post.


<table align="left">
  <td>
  <a href="https://wordlift.io">
    <img width=130px src="https://wordlift.io/wp-content/uploads/2018/07/logo-assets-510x287.png" />
    </a>
    </td>
    <td>
      by
      <a href="https://wordlift.io/blog/en/entity/andrea-volpini">
        Andrea Volpini
      </a>
      MIT License
      <br/>
      <br/>
      <i>Last updated: <b>January 19, 2023</b></i>
  </td>
</table>
</br>
</br>




In [None]:

from google.colab import drive
import os
from pathlib import Path

global gdrive_fpath
drive_mounted = False
gdrive_fpath = '.'
local_path = '/content/'

mount_gdrive = True # @param{type:"boolean"}
if mount_gdrive : # and not drive_mounted:
    from google.colab import drive

    gdrive_mountpoint = '/content/drive/' #@param{type:"string"}
    gdrive_subdirectory = 'MyDrive/YouTube_Auto_Subtitle' #@param{type:"string"}
    gdrive_fpath = str(Path(gdrive_mountpoint) / gdrive_subdirectory)
    print ("gdrive path is :",gdrive_fpath)
   # Mount Google Drive
    if not os.path.isdir(gdrive_mountpoint):
     # If not, mount the drive
       drive.mount(gdrive_mountpoint)
       if not os.path.exists(gdrive_fpath):
          os.makedirs(gdrive_fpath)
          os.chdir(gdrive_fpath)
    else:
          print("Drive is already mounted.")
else:
   Folder_fpath ='/content/' #@param{type:"string"}
   #gdrive_subdirectory = 'MyDrive/ChatGPT_Paper_wrting' #@param{type:"string"}
   gdrive_fpath = Folder_fpath
   os.chdir(gdrive_fpath)
folder_path = gdrive_fpath

## 0. Install libraries

In [None]:
!pip install --upgrade pytube
!pip install -U mock
#!pip install git+https://github.com/openai/whisper.git
!pip install --upgrade git+https://github.com/openai/whisper.git
!pip install jiwer
!pip install sentencepiece

from pytube import YouTube
import whisper
import os
from typing import Iterator, TextIO
from datetime import datetime
import pandas as pd

## 1. Get the URL

In [None]:
#@title Add here the YT URL

link = 'https://m.youtube.com/watch?v=LI-_HpIDyoI' #@param {type:"string"}

file_path = f'{folder_path}/WL_video.mp4'
# @ param {type:"string"}

## 2. Download the video

In [None]:
import re
import mock

from pytube.cipher import get_throttling_function_code

def patched_throttling_plan(js: str):
    """Patch throttling plan, from https://github.com/pytube/pytube/issues/1498"""
    raw_code = get_throttling_function_code(js)

    transform_start = r"try{"
    plan_regex = re.compile(transform_start)
    match = plan_regex.search(raw_code)

    #transform_plan_raw = find_object_from_startpoint(raw_code, match.span()[1] - 1)
    transform_plan_raw = js

    # Steps are either c[x](c[y]) or c[x](c[y],c[z])
    step_start = r"c\[(\d+)\]\(c\[(\d+)\](,c(\[(\d+)\]))?\)"
    step_regex = re.compile(step_start)
    matches = step_regex.findall(transform_plan_raw)
    transform_steps = []
    for match in matches:
        if match[4] != '':
            transform_steps.append((match[0],match[1],match[4]))
        else:
            transform_steps.append((match[0],match[1]))

    return transform_steps


with mock.patch('pytube.cipher.get_throttling_plan', patched_throttling_plan):
    from pytube import YouTube
    url = link

    video = YouTube(url)
    audio = video.streams.filter(only_audio=True, file_extension='mp4')[0]
    audio.download()

In [None]:
# here is the function to get the video
def get_video(_link, _path):
  try:
      yt = YouTube(_link)
  except:
      print("Connection Error")

  yt.streams.filter(file_extension='mp4')
  stream = yt.streams.get_by_itag(139)
  stream.download('',_path)
  title = yt.title
  print("video downloaded")
  return title

In [None]:
title = get_video(link, file_path)

## 3. Transcribe the audio


In [None]:
# transcribe the audio track using Whisper by OpenAI set _translate=False to transcribe it in the original language

def transcribe_me(_video, _translate=False):
  # setting options to define if translating or transcribing
  if _translate:
    _options = dict(task="translate", beam_size=5, best_of=5)
  else:
    _options = dict(task="transcribe", beam_size=5, best_of=5)
  model = whisper.load_model("large-v2")  # for english the .en tend to perform better otherwise just use "base"
  result = model.transcribe(_video, **_options)

  return result, result["text"], result["segments"]

In [None]:
# store the the entire result (output), the extracted text (extracted_text) and the segments (res)

output, extracted_text, res  = transcribe_me(file_path)
extracted_text

### 3.1 Create the subtitles in English for YouTube (optional)

By running the cell below you will be able to generate automatically  the subtitle file for any youtube video. Whisper will be translating any language to English (unless you set to `_translate=False` in  `transcribe_me()` above).

In [None]:
def format_timestamp(seconds: float):
    assert seconds >= 0, "non-negative timestamp expected"
    milliseconds = round(seconds * 1000.0)

    hours = milliseconds // 3_600_000
    milliseconds -= hours * 3_600_000

    minutes = milliseconds // 60_000
    milliseconds -= minutes * 60_000

    seconds = milliseconds // 1_000
    milliseconds -= seconds * 1_000

    return (f"{hours}:" if hours > 0 else "") + f"{minutes:02d}:{seconds:02d}.{milliseconds:03d}"


def write_vtt(format,transcript: Iterator[dict], file: TextIO):
    print(f"WEB{format}\n", file=file)
    for segment in transcript:
        print(
            f"{format_timestamp(segment['start'])} --> {format_timestamp(segment['end'])}\n"
            f"{segment['text'].replace('-->', '->')}\n",
            file=file,
            flush=True,
        )


def slugify(title):
    return "".join(c if c.isalnum() else "_" for c in title).rstrip("_")

In [None]:
vtt_path = os.path.join(f"{folder_path}", f"{slugify(title)}.vtt")
with open(vtt_path, 'w', encoding="utf-8") as vtt:
  write_vtt('vvt',output["segments"], file=vtt)
  # print saved message with absolute path
  print("Saved VTT to", os.path.abspath(vtt_path))

In [None]:
# @title Download subtitles
# @markdown You can download:
# @markdown 1. The plaintext transcript (txt)
# @markdown 2. Multiple subtitles formats (srt, vtt, tsv)
# @markdown 3. Whisper format? (JSON)

from google.colab import files

format = "srt" # @param ["txt", "srt", "vtt", "tsv", "json"]
vtt_path = os.path.join(".", f"{slugify(title)}.{format}")
with open(vtt_path, 'w', encoding="utf-8") as format:
  write_vtt(format,output["segments"], file=format)
  # print saved message with absolute path
  print("Saved VTT to", os.path.abspath(vtt_path))
files.download(f"{vtt_path}")

## 4. Analyze the extracted text

### 4.1 Run the summarizier

We will use GPT-3 to simplify the summary we previously generated.

In [None]:
#@title Add here your GPT-3 key

key = '' #@param {type:"string"}

os.environ['OPENAI_API_KEY'] = key

##### Install langchain and other libraries

In [None]:
%%capture

!pip install openai
!pip install langchain
!pip install tiktoken
!pip install faiss-cpu
!pip install --upgrade faiss-gpu==1.7.1

import re
import openai
from langchain import OpenAI, PromptTemplate, LLMChain
from langchain.chains.mapreduce import MapReduceChain
from langchain.chains.question_answering import load_qa_chain
from langchain.prompts import PromptTemplate
from langchain.docstore.document import Document
from langchain.chains.summarize import load_summarize_chain
from langchain.text_splitter import RecursiveCharacterTextSplitter, CharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores.faiss import FAISS
from langchain.chains import VectorDBQAWithSourcesChain
from langchain.chains import RetrievalQAWithSourcesChain

import faiss

llm = OpenAI(temperature=0)

##### Summarization

In [None]:
def gp3_summarize_new(_text, _chunk_size):
    openai.api_key = os.environ['OPENAI_API_KEY']
    #text_splitter = CharacterTextSplitter.from_tiktoken_encoder(chunk_size=_chunk_size, chunk_overlap=0)

    #text_splitter = SpacyTextSplitter(chunk_size=_chunk_size)
    text_splitter = RecursiveCharacterTextSplitter(
    # Set a really small chunk size, just to show.
    chunk_size = _chunk_size,
    chunk_overlap  = 20,
    length_function = len,)

    texts = text_splitter.split_text(_text)
    docs = [Document(page_content=t) for t in texts[:15]]
    chain = load_summarize_chain(llm, chain_type="map_reduce")
    out = chain.run(docs)
    return out.strip()

In [None]:
summary = gp3_summarize_new(extracted_text, 1600)
summary

### 4.2 Extract the list of Questions and Answer them




In [None]:
# extract the questions
def gp3_extract_questions(_text_chunks):
    openai.api_key = os.environ['OPENAI_API_KEY']

    texts = _text_chunks
    docs = [Document(page_content=t) for t in texts[:3]]

    embeddings = OpenAIEmbeddings()
    docsearch = FAISS.from_texts(texts, embeddings)
    query = "Extract the questions from this document"

    try:
      docs = docsearch.similarity_search(query)
    except Exception as e:
      print(e)

    chain_refine = load_qa_chain(OpenAI(temperature=0), chain_type="refine")
    out = chain_refine({"input_documents": docs, "question": query}, return_only_outputs=True)

    questions = out["output_text"].splitlines()
    questions = [q for q in questions if re.match(r"^\d+\.", q)]

    return questions

# split the transcription into segments and return text and timecodes
def store_segments(segments):
  texts = []
  start_times = []

  for segment in segments:
    text = segment['text']
    start = segment['start']

    # Convert the starting time to a datetime object
    start_datetime = datetime.fromtimestamp(start)

    # Format the starting time as a string in the format "00:00:00"
    formatted_start_time = start_datetime.strftime('%H:%M:%S')

    texts.append("".join(text))
    start_times.append(formatted_start_time)

  return texts, start_times

# find the segment that answer best the question
def answer_question(_texts, _start_times, _question):

  text_splitter = CharacterTextSplitter(chunk_size=1500)
  docs = []
  metadatas = []
  for i, d in enumerate(_texts):

    splits = text_splitter.split_text(d)
    docs.extend(splits)
    metadatas.extend([{"source": _start_times[i]}] * len(splits))
    embeddings = OpenAIEmbeddings()

  store = FAISS.from_texts(docs, embeddings, metadatas=metadatas)
  faiss.write_index(store.index, "docs.index")

  chain = VectorDBQAWithSourcesChain.from_llm(llm=OpenAI(temperature=0), vectorstore=store)
  result = chain({"question": _question})

  answer = result['answer']
  sources = result['sources']

  return answer, sources

# create a link that directs the user to the provided timecode
def add_timecode_to_url(url, timecode):
    # convert timecode to seconds
    time_parts = timecode.split(':')
    seconds = int(time_parts[0]) * 3600 + int(time_parts[1]) * 60 + int(time_parts[2])
    # append the timecode to the URL
    return url + "&t=" + str(seconds) + "s"

#### 4.2.1 Here are the questions


In [None]:
# we store text and timecodes
texts, start_times = store_segments(res)

# here is the list of questions
questions = gp3_extract_questions(texts)
questions

#### 4.2.2 Here are the answers


In [None]:
# extract the answers for the generted questions and provide the links to the video

def get_output_df(questions, texts, start_times):
    data = []
    for q in questions:
        try:
            r = answer_question(texts, start_times, q)
            timecodes = r[1].split(",")
            urls = []
            for t in timecodes:
                urls.append(add_timecode_to_url(link, t))
            data.append([q, r[0], r[1], urls])
        except Exception as e:
            pass
    df = pd.DataFrame(data, columns=['Question', 'Answer', 'Timecodes', 'URLs'])
    return df

In [None]:
#@title 5.1.1 Ask any question

question = 'What are the most interesting change?' #@param {type:"string"}


In [None]:
r = answer_question(texts, start_times, question)
print(question)
print(r[0])
timecodes = r[1].split(",")
print(r[1])

for t in timecodes:
  print(add_timecode_to_url(link, t))

In [None]:
from IPython.display import YouTubeVideo
YouTubeVideo('tjupXBKObXs', start=965)