<a href="https://colab.research.google.com/github/yumin8136/LM_No2/blob/main/Youtube_summary(Llama2_%2B_streamlit).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -qU transformers accelerate einops langchain xformers bitsandbytes youtube-transcript-api streamlit

In [2]:
%%writefile youtube_video_search.py
#Llama 2模型初始化, GPU模型设置
from torch import cuda,bfloat16
import transformers
model_id = 'meta-llama/Llama-2-7b-chat-hf'
device = f'cuda:{cuda.current_device()}' if cuda.is_available() else 'cpu'
bub_config = transformers.BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type='nf4',
    bnb_4bit_use_double_quant=True,
    bnb_4bit_compute_dtype=bfloat16
)
hf_auth = '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'
model_config =transformers.AutoConfig.from_pretrained(
    model_id,
    use_auth_token=hf_auth
)
model = transformers.AutoModelForCausalLM.from_pretrained(
    model_id,
    trust_remote_code=True,
    config=model_config,
    quantization_config=bub_config,
    device_map='auto',
    use_auth_token=hf_auth
)
model.eval()
tokenizer = transformers.AutoTokenizer.from_pretrained(
    model_id,
    use_auth_token = hf_auth
)
stop_list = ['\nHuman:','\n"""\n']
stop_token_ids = [tokenizer(x)['input_ids'] for x in stop_list]
import torch
stop_token_ids = [torch.LongTensor(x).to(device) for x in stop_token_ids]
from transformers import StoppingCriteria, StoppingCriteriaList
class StopOnTokens(StoppingCriteria):
    def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
        for stop_ids in stop_token_ids:
            if torch.eq(input_ids[0][-len(stop_ids):], stop_ids).all():
                return True
        return False

stopping_criteria = StoppingCriteriaList([StopOnTokens()])
generate_text = transformers.pipeline(
    model=model, tokenizer=tokenizer,
    return_full_text=True,  # langchain expects the full text
    task='text-generation',
    # we pass model parameters here too
    #stopping_criteria=stopping_criteria,  # without this model rambles during chat
    temperature=0.0,  # 'randomness' of outputs, 0.0 is the min and 1.0 the max
    max_new_tokens=128, #512,  # max number of tokens to generate in the output
    repetition_penalty=1.1  # without this output begins repeating
)
from langchain.llms import HuggingFacePipeline
llm = HuggingFacePipeline(pipeline=generate_text)


def llm_sum(text):
  prompt = f"""Summarize the full text in a few sentences.
  text: ```{text}```
  """
  sum_text = llm(prompt=prompt)
  #print(sum_text)
  return sum_text
#通过url获取视频字母
def get_video_subtitle(url):
  try:
    from langchain.document_loaders import YoutubeLoader
    loader = YoutubeLoader.from_youtube_url(url)
    subtitle_list = loader.load()
    subtitle = subtitle_list[0].page_content
  except:
    print("没有找到视频的字幕")
    return "None"
  return subtitle

#V3.1 将文件分割喂给Llama2 summarize,然后再将分割并summarize的文本合并再次喂给Llama2进行summarize.
def sum_subtitles(texts):
  from langchain.text_splitter import RecursiveCharacterTextSplitter
  text_splitter = RecursiveCharacterTextSplitter(chunk_size = 5000, chunk_overlap =0)
  chunks =text_splitter.split_text(texts)

  sums =""
  sum_subtitles =[]
  for i in range(len(chunks)):
    sum_subtitle = llm_sum(chunks[i])
    sum_subtitles.append(sum_subtitle)
    #print(str(i)+":"+sum_subtitle+"\n")
    sums = sums + sum_subtitles[i] +"\n"
  print(sums)
  #sum = llm(sums)
  return sums

#输入关键词，获取批量相关youtube视频信息,返回视频名,视频url,视频字幕，获取前5条记录
def get_video_info_from_youtube_crawler(keyword):
    import requests
    import re
    import json
    url = "https://m.youtube.com/results?search_query=" + keyword
    print("通过爬虫获取数据")
    resp = requests.get(url)

    if resp.status_code == 200:
        result_json = re.findall(r'ytInitialData = (.*);</script>', resp.text)[0]
        result_obj = json.loads(result_json)

        video_names = []
        video_urls = []
        video_subtitles = []
        summ_subtitles = []

        try:
            contents = result_obj['contents']['twoColumnSearchResultsRenderer']['primaryContents']['sectionListRenderer']['contents']
            count = 0
            for content in contents:
                if 'itemSectionRenderer' in content and 'contents' in content['itemSectionRenderer']:
                    videos = content['itemSectionRenderer']['contents']
                    for video in videos:
                        if 'videoRenderer' in video:
                            video_id = video['videoRenderer']['videoId']
                            video_name = video['videoRenderer']['title']['runs'][0]['text']
                            video_url = "https://www.youtube.com/watch?v=" + video_id
                            video_subtitle = get_video_subtitle(video_url)
                            video_names.append(video_name)
                            video_urls.append(video_url)
                            video_subtitles.append(video_subtitle)
                            sum_subtitle = sum_subtitles(video_subtitle)
                            summ_subtitles.append(sum_subtitle)
                            count += 1
                            #获取前五条记录
                            if count >= 2:
                              break

        except KeyError:
            return [], [] , [] ,[]

        return video_names, video_urls ,video_subtitles, summ_subtitles
    else:
        return [], [], [], []

#将视频名，视频url,视频字幕，路径等信息保存到本地
def store_video_info_to_files(video_names, video_urls, video_subtitles, folder_path ,sum_subtitles):
    import re
    for name, url, subtitle ,sum_subtitle in zip(video_names, video_urls ,video_subtitles ,sum_subtitles):
        file_name = re.sub(r'[\/:*?"<>|]', '_', name) + ".txt"
        #file_name = name + ".txt"
        file_path = folder_path + "/" + file_name
        #print(file_path)
        print(name)
        print(url)
        with open(file_path, "w", encoding="utf-8") as file:
            file.write("Video Name: " + name + "\n")
            file.write("Video URL: " + url + "\n")
            file.write("Video summary: " + sum_subtitle + "\n")
            file.write("Video Subtitle: " + subtitle + "\n")
        print(sum_subtitle)
        #print("Video information stored in", file_path)



Writing youtube_video_search.py


In [8]:
%%writefile app.py
from youtube_video_search import get_video_info_from_youtube_crawler
import streamlit as st
st.header("Find your videos from Youtube")
# 添加搜索框和提交按钮
search_query = st.text_input("Enter your search query:")
submit_button = st.button("Search")

# 当用户按下回车键时，执行搜索操作
if search_query or submit_button:
  # 调用函数获取视频信息
  video_names, video_urls , video_subtitles, sum_subtitles = get_video_info_from_youtube_crawler(search_query)
  # 确定记录的数量
  num_records = len(video_names)

  # 循环输出并打印记录
  for i in range(num_records):
      video_name = video_names[i]
      video_url = video_urls[i]
      sum_subtitle = sum_subtitles[i]

      # 输出当前记录的信息
      st.write(i+1)
      st.write("Video Name:")
      st.write(video_name)

      st.write("Video URL:")
      st.write(video_url)

      st.write("Video Subtitle summary:")
      st.write(sum_subtitle)

      # 打印一个空行，用于分隔每条记录
      st.write("---------------------------")

Overwriting app.py


In [None]:
!streamlit run app.py & npx localtunnel --port 8501