<a href="https://colab.research.google.com/github/martinopiaggi/summarize/blob/main/Summarize.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# https://github.com/martinopiaggi/summarize



In [None]:
Source = "" #@param {type:"string"}
Type_of_source = "Local file" #@param ['Youtube video or playlist', 'Google Drive video link','Dropbox video link', 'Local file']
Type = Type_of_source
URL = Source                                                                                                                                                                                                    

#@markdown ---
#@markdown Insert your API key depending on which endpoint you want to use
# API Configuration
api_key = "lm-studio"  # @param {type:"string"}
api_endpoint = "Custom"  # @param ['Groq', 'OpenAI', 'Custom'] 

endpoints = {
    "Groq": "https://api.groq.com/openai/v1",
    "OpenAI": "https://api.openai.com/v1",
    "Custom": "http://localhost:1234/v1"  # Example custom endpoint
}
base_url = endpoints.get(api_endpoint)

models = {                                  
    "Groq": "llama3-8b-8192",
    "OpenAI": "gpt-4o-mini",
    "Custom": "Meta-Llama-3-8B-Instruct-GGUF"  # Placeholder for any custom model
}
model = models.get(api_endpoint)

use_Youtube_captions = True #@param {type:"boolean"}


*Remember to change runtime type to T4 GPU in case you are planning to use Whisper and not youtube autogenerated captions*

## Installation of libraries
Re-run if you change settings in the previous cell:

In [None]:
# %%capture
import subprocess
import re
import os 

if use_Youtube_captions:
  !pip install youtube-transcript-api
  from youtube_transcript_api import YouTubeTranscriptApi

if (not Type == "Youtube video or playlist") or (not use_Youtube_captions):
  !pip install openai-whisper
  import whisper

!pip install openai
import openai
client = openai.OpenAI(api_key=api_key, base_url=base_url)


if Type == "Youtube video or playlist":
  !pip install git+https://github.com/pytube/pytube
  from pytube import YouTube

if Type == "Google Drive video link":
  from google.colab import drive
  drive.mount('/content/drive')

## Video fetching
Re-run cell if you change the source URL:

In [None]:
skip_transcription=False
text = ""
textTimestamps = ""

def seconds_to_time_format(seconds):
    hours, remainder = divmod(seconds, 3600)
    minutes, seconds = divmod(remainder, 60)
    return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}"

def download_youtube_audio_only(url):
    yt = YouTube(url)
    audio_stream = yt.streams.get_audio_only()
    saved_path = audio_stream.download(output_path=".", skip_existing=True)
    return saved_path

def download_youtube_captions(url):
    regex = r'(?:https?:\/\/)?(?:www\.)?(?:youtube\.com\/(?:[^\/\n\s]+\/\S+\/|(?:v|e(?:mbed)?)\/|\S*?[?&]v=)|youtu\.be\/)([a-zA-Z0-9_-]{11})'
    video_id =  re.search(regex, url).group(1)
    transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)

    try:
      transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['en'])
    except:
      for available_transcript in transcript_list:
        if available_transcript.is_translatable:
          transcript = available_transcript.translate('en').fetch()
          break

    text = ""
    for entry in transcript:
            start_time = seconds_to_time_format(entry['start'])
            text += f"{start_time} {entry['text'].strip()}\n"

    transcript_file_name = f"{video_id}_captions.md"

    with open(transcript_file_name, 'w', encoding='utf-8') as f:
      f.write(text)

    return text,transcript_file_name

if Type == "Youtube video or playlist":
    #clean youtube url from timestamp
    URL = re.sub('\&t=\d+s?', '', URL)
    if use_Youtube_captions:
      text, transcript_file_name = download_youtube_captions(URL)
      skip_transcription=True
    else:
      video_path_local =  download_youtube_audio_only(URL)

elif Type == "Google Drive video link":
  subprocess.run(['ffmpeg', '-y', '-i', "drive/MyDrive/" + URL, '-vn', '-acodec', 'pcm_s16le',
                  '-ar', '16000', '-ac', '1', 'gdrive_audio.wav'], check=True)
  video_path_local = "gdrive_audio.wav"

elif Type == "Dropbox video link":
    subprocess.run(['wget', URL, '-O', 'dropbox_video.mp4'], check=True)
    subprocess.run(['ffmpeg', '-y', '-i', 'dropbox_video.mp4', '-vn', '-acodec', 'pcm_s16le',
                    '-ar', '16000', '-ac', '1', 'dropbox_video_audio.wav'], check=True)
    video_path_local = "dropbox_video_audio.wav"

elif Type == "Local file":
    local_file_path = Source 
    subprocess.run(['ffmpeg', '-y', '-i', local_file_path, '-vn', '-acodec', 'pcm_s16le',
                    '-ar', '16000', '-ac', '1', 'local_file_audio.wav'], check=True)
    video_path_local = "local_file_audio.wav"

## Transcription using Whisper
***Only run this cell if the source is not YouTube or you decided not to use YouTube captions.***
You can specify the language and initial prompt to increase speed

In [None]:
#%%capture
if not skip_transcription:
  language = "en" # @param {type:"string"}
  initial_prompt = "" # @param {type:"string"}

  modelWhisper = whisper.load_model('tiny', device="cuda")
  transcription = modelWhisper.transcribe(str(video_path_local), beam_size=5,
                                    language=None if language == "auto" else language,
                                    task="translate",
                                    initial_prompt=initial_prompt)
  
  transcript_file_name = video_path_local.replace(".mp4", ".md") #to check
  transcript_file_name = video_path_local.replace(".wav", ".md")

with open(transcript_file_name, 'w') as f:
    for segment in transcription["segments"]:
        start_time = seconds_to_time_format(segment['start'])
        text += f"{start_time} {segment['text'].strip()} "

    f.write(text)

## Summarization and elaboration

In [None]:
prompt_type = "Summarization"  # @param ['Summarization', 'Only grammar correction with highlights']
# @markdown Set the number of parallel API calls (be mindful of usage rate limits)
parallel_api_calls = 1 # @param


# Define your prompts using a dictionary for easier management
prompts = {
    'Summarization': """Summarize the video transcript excerpt including a concise title that reflects the content. Wrap the title with **markdown bold notation**. Write the summary as if you are continuing a conversation without needing to signal a beginning. Here is the transcript: """,
    'Only grammar correction with highlights': """Repeat the following text correcting any grammatical errors and formatting error. Highlight only the important quote (if there are any) with **markdown bold notation**. Focus solely on the essence of the content as if you are continuing a conversation without using any form of introduction like 'Here's the corrected text:'. Here is the text to fix: """
}

# Select the appropriate prompt
summary_prompt = prompts[prompt_type]


def extract_and_clean_timestamps(text_chunks):
    timestamp_pattern = re.compile(r'(\d{2}:\d{2}:\d{2})')
    cleaned_texts = []
    timestamp_ranges = []
    for chunk in text_chunks:
        timestamps = timestamp_pattern.findall(chunk)
        if timestamps:
            for timestamp in timestamps:
                # Remove each found timestamp from the chunk
                chunk = chunk.replace(timestamp, "")
            timestamp_ranges.append(timestamps[0])  # Assuming you want the first timestamp per chunk
        else:
            timestamp_ranges.append("")
        cleaned_texts.append(chunk.strip())  # Strip to remove any leading/trailing whitespace
    return cleaned_texts, timestamp_ranges

def format_timestamp_link(timestamp):
    if Type == "Youtube video or playlist":
      hours, minutes, seconds = map(int, timestamp.split(':'))
      total_seconds = hours * 3600 + minutes * 60 + seconds
      return f"{timestamp} - {URL}&t={total_seconds}"
    else:
      return f"{timestamp}"

import concurrent.futures
import time

def summarize(prompt):
    completion = client.chat.completions.create(
            model=model,
            messages=[
            {"role": "system", "content": summary_prompt},
            {"role": "user", "content": prompt}
            ],
            max_tokens=4096
    )
    return completion.choices[0].message.content

def process_and_summarize(text):
    chunk_size, overlap_size = 4096, 20
    texts = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size - overlap_size)]
    cleaned_texts, timestamp_ranges = extract_and_clean_timestamps(texts)
    summaries = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_api_calls) as executor:
        future_to_chunk = {executor.submit(summarize, text_chunk): idx for idx, text_chunk in enumerate(cleaned_texts)}
        for future in concurrent.futures.as_completed(future_to_chunk):
            idx = future_to_chunk[future]
            try:
                summarized_chunk = future.result()
                summary_piece = format_timestamp_link(timestamp_ranges[idx]) + " " + summarized_chunk
                summary_piece += "\n"
                summaries.append((idx, summary_piece))
            except Exception as exc:
                print(f'Chunk {idx} generated an exception: {exc}')
                # Resubmit the task with the new model
                time.sleep(10)
                future_to_chunk[executor.submit(summarize, texts[idx])] = idx

    summaries.sort()  # Ensure summaries are in the correct order
    final_summary = "\n\n".join([summary for _, summary in summaries])

    # Save the final summary
    final_name = transcript_file_name.replace(".md", "_FINAL.md") if Type != "Dropbox video link" else "final_dropbox_video.md"
    with open(final_name, 'w') as f:
        f.write(final_summary)


process_and_summarize(text)