<a href="https://colab.research.google.com/github/nyabingenorv/100-pandas-puzzles/blob/master/YtSummarizer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **YouTube Video to Blog Post Generator**

This notebook outlines a comprehensive toolset  for automating the generation of blog posts from YouTube videos. By integrating the YouTube Data API, Gemini AI, and Firebase, the notebook extracts video transcripts, summarizes content, and publishes blog posts to Firestore.

**Overview**

1. ***YouTube Data Retrieval:***
  The notebook utilizes the YouTube Data API to fetch metadata for specific videos or recent uploads from a YouTube channel. This metadata includes details such as the video’s title, description, upload date, view count, and more.

2. ***Transcript Extraction:***
The YouTube Transcript API is employed to retrieve the transcript of the video. If transcripts are unavailable or disabled, the notebook will skip the processing of that particular video.

3. ***Content Summarization:***
The transcript is summarized using Gemini AI, which generates a detailed and structured summary of the video content. If Gemini AI considers the content unsafe, the notebook will skip processing that video to ensure only appropriate content is summarized.
4. ***Key Points Extraction:***
The notebook extracts the most frequent and relevant key points from the video transcript, summarizing the core topics discussed.
5. ***Blog Post Creation:***
The notebook automatically generates a blog post that includes the video’s summary, key points, and description. The blog post also contains a link to the original YouTube video and a disclaimer regarding the content generation process.
6. ***Firestore Integration:***
The generated blog post is saved to a Firestore database, along with additional metadata such as the video’s title, author, and thumbnail image.



In [None]:
# Importing necessary libraries
from transformers import pipeline
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.corpus import stopwords
from collections import Counter
import os
import json
from googleapiclient.discovery import build
from google.colab import userdata
import google.generativeai as genai
import firebase_admin
from firebase_admin import credentials, firestore
import datetime
from typing import List, Dict, Optional

## Initialization

Next we are going to initialise the following:

1.   Google
2.   Gemini
3.   Firebase
4.   Natural language toolkit libraries(nltk)

In [None]:
# Set up environment variables and initialize Firebase
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "firebase.json"

# Initialize Firebase Firestore
db = firestore.client()

# Download necessary NLTK data
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)

# Set up API keys
GOOGLE_API_KEY = userdata.get('youtube_api')
GEMINI_KEY = userdata.get('Gemini')

# Initialize YouTube and Gemini API clients
youtube = build('youtube', 'v3', developerKey=GOOGLE_API_KEY)
genai.configure(api_key=GEMINI_KEY)

---

***Method to get the channel videos from googles youtube api***

Retrieve the most recent videos from a YouTube channel.

    Args:
        channel_id (str): The ID of the YouTube channel.
        video_count (int): The number of recent videos to retrieve.

    Returns:
        List[Dict[str, str]]: A list of video information dictionaries.


In [None]:
def get_channel_videos_youtube_api(channel_id: str, video_count: int) -> List[Dict[str, str]]:
    """
    Retrieve the most recent videos from a YouTube channel.

    Args:
        channel_id (str): The ID of the YouTube channel.
        video_count (int): The number of recent videos to retrieve.

    Returns:
        List[Dict[str, str]]: A list of video information dictionaries.
    """
    request = youtube.search().list(
        part='snippet',
        channelId=channel_id,
        maxResults=video_count,
        order='date'
    )
    response = request.execute()

    videos = response.get('items', [])
    video_infos = []
    for video in videos:
        video_id = video['id']['videoId']
        video_info = get_video_info(video_id)
        if video_info:
            video_infos.append(video_info)
    return video_infos


---
***Method to get and parse the youtube video metadata***



Retrieve detailed information about a YouTube video.

    Args:
        video_id (str): The ID of the YouTube video.

    Returns:
        Optional[Dict[str, str]]: A dictionary containing video information or None if the video was not found.



In [None]:
def get_video_info(video_id: str) -> Optional[Dict[str, str]]:
    """
    Retrieve detailed information about a YouTube video.

    Args:
        video_id (str): The ID of the YouTube video.

    Returns:
        Optional[Dict[str, str]]: A dictionary containing video information or None if the video was not found.
    """
    request = youtube.videos().list(
        part='snippet,contentDetails,statistics',
        id=video_id
    )
    response = request.execute()

    if not response['items']:
        print(f"No video found with ID: {video_id}")
        return None

    video_info = response['items'][0]
    video = {
        'id': video_info['id'],
        'title': video_info['snippet']['title'],
        'description': video_info['snippet'].get('description', 'No description available.'),
        'upload_date': video_info['snippet']['publishedAt'],
        'uploader': video_info['snippet']['channelTitle'],
        'view_count': video_info['statistics'].get('viewCount', 0),
        'like_count': video_info['statistics'].get('likeCount', 0),
        'duration': video_info['contentDetails']['duration'],
        'categories': video_info['snippet'].get('categoryId', []),
        'tags': video_info['snippet'].get('tags', []),
        'webpage_url': f"https://www.youtube.com/watch?v={video_id}"
    }
    return video

---

***Method to transcribe the youtube video***


Retrieve the transcript of a YouTube video.

    Args:
        video_id (str): The ID of the YouTube video.

    Returns:
        Optional[str]: The transcript text or None if transcripts are disabled or unavailable.



In [None]:
def get_transcript(video_id: str) -> Optional[str]:
    """
    Retrieve the transcript of a YouTube video.

    Args:
        video_id (str): The ID of the YouTube video.

    Returns:
        Optional[str]: The transcript text or None if transcripts are disabled or unavailable.
    """
    try:
        transcript = YouTubeTranscriptApi.get_transcript(video_id)
        transcript_text = ' '.join([entry['text'] for entry in transcript])
        if not transcript_text.strip():
            print(f"Transcript is empty for video {video_id}")
            return None
        return transcript_text
    except TranscriptsDisabled:
        print(f"Transcripts are disabled for video {video_id}")
        return None




---


***This method leverages the Gemini AI model to
provide concise and informative summaries of lengthy texts.***





Summarize the given text using Gemini AI.

    Args:
        text (str): The text to be summarized.

    Returns:
        str: The summarized text.



In [None]:
def gemini_summarizer(text: str) -> Optional[str]:
    """
    Summarize the given text using Gemini AI, skipping if the content is blocked.

    Args:
        text (str): The text to be summarized.

    Returns:
        Optional[str]: The summarized text, or None if the content is blocked.
    """
    model = genai.GenerativeModel("gemini-1.5-flash")
    instruction = "Please provide a detailed summary of the following text. Ensure the summary includes concise summaries of each major section and key points."

    try:
        summary = model.generate_content([instruction, text])
        feedback = summary.prompt_feedback.get('safety', {})

        if feedback.get('blocked', False):
            print("Content is considered unsafe by Gemini AI. Skipping this video.")
            return None

        print(summary.text)
        return summary.text

    except Exception as e:
        print(f"An error occurred during summarization: {e}")
        return None




---



In [None]:
# Function to summarize a text using a transformer-based model
def summarize_text(text: str, max_length: int = 250, min_length: int = 0) -> str:
    """
    Summarize a text using a transformer-based summarization model.

    Args:
        text (str): The text to be summarized.
        max_length (int): The maximum length of the summary.
        min_length (int): The minimum length of the summary.

    Returns:
        str: The summary of the text.
    """
    if not text.strip():
        return "Summary not available due to lack of content."

    summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
    try:
        summary = summarizer(text, max_length=max_length, min_length=min_length, do_sample=False)
        return summary[0]['summary_text']
    except IndexError:
        return "Failed to generate summary due to content length."

# Function to summarize long texts by splitting them into chunks
def summarize_long_text(text: str, chunk_size: int = 1000, max_length: int = 250, min_length: int = 0) -> str:
    """
    Summarize a long text by splitting it into chunks and summarizing each chunk.

    Args:
        text (str): The long text to be summarized.
        chunk_size (int): The maximum size of each chunk in characters.
        max_length (int): The maximum length of each summary chunk.
        min_length (int): The minimum length of each summary chunk.

    Returns:
        str: The concatenated summaries of all text chunks.
    """
    if not text.strip():
        return "Summary not available due to lack of content."

    print(f"Text length before summarization: {len(text)} characters")
    chunks = chunk_text(text, chunk_size)
    print(f"Number of chunks: {len(chunks)}")

    summaries = []
    for i, chunk in enumerate(chunks):
        print(f"Processing chunk {i+1}/{len(chunks)} with length {len(chunk)} characters")
        summary = summarize_text(chunk, max_length, min_length)
        if summary != "Failed to generate summary due to content length.":
            summaries.append(summary)

    return ' '.join(summaries) if summaries else "No valid content to summarize."




---
***Method to chunk the text if it is longer than expected***

Split a large text into smaller chunks of approximately the specified size.

    Args:
        text (str): The text to be chunked.
        chunk_size (int): The maximum size of each chunk in characters.

    Returns:
        List[str]: A list of text chunks.

In [None]:
# Function to split a large text into smaller chunks
def chunk_text(text: str, chunk_size: int = 1000) -> List[str]:
    """
    Split a large text into smaller chunks of approximately the specified size.

    Args:
        text (str): The text to be chunked.
        chunk_size (int): The maximum size of each chunk in characters.

    Returns:
        List[str]: A list of text chunks.
    """
    sentences = sent_tokenize(text)
    chunks = []
    current_chunk = []
    current_size = 0

    for sentence in sentences:
        sentence_len = len(sentence)
        if current_size + sentence_len <= chunk_size:
            current_chunk.append(sentence)
            current_size += sentence_len
        else:
            chunks.append(' '.join(current_chunk))
            current_chunk = [sentence]
            current_size = sentence_len

    # Add the last chunk if it contains any content
    if current_chunk:
        chunks.append(' '.join(current_chunk))

    return chunks

---
***Method to process the videos retrieved***

    Process videos from a YouTube channel and generate blog posts for each video.

    Args:
        channel_id (str): The ID of the YouTube channel.
        video_count (int): The number of recent videos to process.

    Returns:
        None

In [None]:
def process_channel_videos(channel_id: str, video_count: int = 5) -> None:
    """
    Process videos from a YouTube channel and generate blog posts for each video.

    Args:
        channel_id (str): The ID of the YouTube channel.
        video_count (int): The number of recent videos to process.

    Returns:
        None
    """
    print(f"Processing channel {channel_id} with video count {video_count}")

    # Fetch recent videos
    videos = get_channel_videos_youtube_api(channel_id, video_count)

    # Process each video
    for video in videos:
        print(f"Processing video: {video['title']}")

        # Retrieve transcript
        transcript = get_transcript(video['id'])
        if transcript is None:
            print(f"Skipping video {video['id']} due to lack of transcript.")
            continue

        print(f"Transcript length: {len(transcript)} characters")

        # Summarize the transcript
        summary = gemini_summarizer(transcript)
        if summary is None:
            print(f"Skipping video {video['id']} due to unsafe content.")
            continue

        key_points = extract_key_points(transcript, num_points=5)

        # Create blog content
        blog_content = f"### Summary\n{summary}\n\n### Key Points\n" + '\n'.join([f"- {point}" for point in key_points])

        # Save blog post to Firebase
        save_blog_to_firebase(video, blog_content)

    print("Finished processing all videos.")



---


***Method to save the blog to firebase ***

Save the blog post to Firebase Firestore.

    Args:
        video_info (Dict[str, str]): Information about the video.
        blog_content (str): The content of the blog post.

    Returns:
        None

In [None]:
def save_blog_to_firebase(video_info: Dict[str, str], blog_content: str) -> None:
    """
    Save the blog post to Firebase Firestore.

    Args:
        video_info (Dict[str, str]): Information about the video.
        blog_content (str): The content of the blog post.

    Returns:
        None
    """
    blog = {
        'title': video_info['title'],
        'author': video_info['uploader'],
        'content': blog_content,
        'category': 'YouTube Summary',
        'thumbnailUrl': f"https://img.youtube.com/vi/{video_info['id']}/0.jpg",  # YouTube thumbnail is normally in this format
        'createdAt': datetime.datetime.now().isoformat(),
        'videoUrl': video_info['webpage_url'],
    }

    # Add the blog to Firestore
    try:
        db.collection('blogs').add(blog)
        print(f"Blog post '{video_info['title']}' saved to Firestore.")
    except Exception as e:
        print(f"An error occurred while saving to Firestore: {e}")