<a href="https://colab.research.google.com/github/kkrusere/youTube-comments-Analyzer/blob/main/YT_comments_collection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%%shell
sudo apt -y update
sudo apt install -y wget curl unzip
wget http://archive.ubuntu.com/ubuntu/pool/main/libu/libu2f-host/libu2f-udev_1.1.4-1_all.deb
dpkg -i libu2f-udev_1.1.4-1_all.deb
wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb
dpkg -i google-chrome-stable_current_amd64.deb

wget -N https://edgedl.me.gvt1.com/edgedl/chrome/chrome-for-testing/118.0.5993.70/linux64/chromedriver-linux64.zip -P /tmp/
unzip -o /tmp/chromedriver-linux64.zip -d /tmp/
chmod +x /tmp/chromedriver-linux64/chromedriver
mv /tmp/chromedriver-linux64/chromedriver /usr/local/bin/chromedriver
pip install selenium chromedriver_autoinstaller

In [None]:
!pip install peft
!pip install datasets
!pip install rouge-score

In [3]:
import os
from google.colab import drive

In [4]:
#mounting google drive
drive.mount('/content/drive')

########################################

#changing the working directory
os.chdir("/content/drive/MyDrive/NLP_Data")

!pwd


Mounted at /content/drive
/content/drive/MyDrive/NLP_Data


In [5]:
import sys
sys.path.insert(0,'/usr/lib/chromium-browser/chromedriver')


from selenium import webdriver
import chromedriver_autoinstaller
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.chrome.service import Service
from selenium.common.exceptions import TimeoutException, ElementNotInteractableException
from selenium.webdriver.common.action_chains import ActionChains

import random
import time


chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chromedriver_autoinstaller.install()
driver = webdriver.Chrome(options=chrome_options)


from bs4 import BeautifulSoup
import requests
import json

import pandas as pd
import numpy as np

import operator
from google.colab import userdata
api_key = userdata.get('YouTubeAPI_key')

import googleapiclient.discovery
from googleapiclient.discovery import build
import datetime

from datasets import Dataset

from sklearn.model_selection import train_test_split
from datasets import Dataset, load_metric
from transformers import BartTokenizer, BartForConditionalGeneration, TrainingArguments, Trainer
from peft import LoraConfig, get_peft_model

import torch
import re
import string

This Jupyter Notebook is designed to collect YouTube video comments from various videos. These comments will be used for creating, training, and validating a Sentiment Analysis model. The videos from which the comments were collected were chosen with no particular criteria other than being from my favorite channels and videos.

The process of collecting comments makes use of the YouTube API, Selenium, BeautifulSoup, and other custom functions.

The process:

1. Create a list of favorite channels.
2. Use the YouTube API to select the top 10 most-watched videos from each channel and store them in a master list of videos from which we will collect comments.
3. For each video in the master video list, use Selenium and BeautifulSoup to collect the comments and store them in a pandas DataFrame.
4. Clean and sanitize the comments in the DataFrame and prepare the data for Sentiment Analysis.

In [6]:
# Below are functions for reading a writting json file for the current working directory

def save_to_json(data, filename):
    with open(filename, 'w') as json_file:
        json.dump(data, json_file, indent=4)

def load_from_json(filename):
    with open(filename, 'r') as json_file:
        comments = json.load(json_file)
    return comments

In [7]:
# Function to convert string values containing suffixes 'K', 'M', or 'B' to integers and extract numeric values.
def convert_to_int(value):
  """
    - If the value is NaN or an empty string, return 0.
    - If the value is a string:
      - Extract numeric digits from the string.
      - Convert the extracted digits to an integer.
      - If the string contains 'K', multiply the number by 1,000.
      - If the string contains 'M', multiply the number by 1,000,000.
      - If the string contains 'B', multiply the number by 1,000,000,000.
    - Return the converted integer value.

  """
  if pd.isna(value) or value == '':
      return 0
  if isinstance(value, str):
      # Extract numbers and convert them
      num = re.findall(r'\d+', value)
      if not num:
          return 0
      num = ''.join(num)
      if 'K' in value:
          return int(float(num) * 1000)
      if 'M' in value:
          return int(float(num) * 1000000)
      if 'B' in value:
          return int(float(num) * 1000000000)
      return int(num)
  return int(value)

In [8]:
# # Reading the channel list from the saved json file
# json_data = load_from_json("channels.json")
# channel_list = json_data['channels']

In [9]:
# Initialize YouTube API client
def initialize_youtube_api(api_key):
    return build("youtube", "v3", developerKey=api_key)

def init_webdriver():
    """Initializes and returns a Chrome WebDriver instance with options."""
    try:
        chrome_options = webdriver.ChromeOptions()
        chrome_options.add_argument('--headless')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        chromedriver_autoinstaller.install()
        driver = webdriver.Chrome(options=chrome_options)

        print("WebDriver initialized successfully")  # Confirm initialization
        return driver
    except Exception as e:
        print(f"Failed to initialize WebDriver: {e}")
        raise

def close_webdriver(driver):
    """Closes the provided WebDriver instance."""
    print("WebDriver successfully closed")
    driver.quit()


In [10]:
# The function will accept a single parameter, the video_id.
# It will construct the YouTube URL using the standard base URL and the provided video_id.
# The completed URL will be returned.

def get_youtube_url(video_id):
  """
  Constructs a YouTube URL from a given video ID.

  Args:
      video_id: The unique identifier for a YouTube video.

  Returns:
      The full URL of the YouTube video.
  """

  video_url = f"https://www.youtube.com/watch?v={video_id}"

  return video_url


In [11]:
def get_comments_html(video_url, driver):
    """
    Fetches the HTML content of the comments section from a YouTube video.

    This function initializes a WebDriver instance to open the provided YouTube video URL,
    scrolls down to load the comments section, and retrieves the HTML content of the loaded
    comments section.

    Args:
        video_url (str): The URL of the YouTube video from which to fetch comments.
        driver: An initialized WebDriver instance (from Selenium).

    Returns:
        str: The HTML content of the comments section.

    Raises:
        TimeoutException: If the comments section does not load within the specified time.
    """

    # Wait until the comments section is loaded
    WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, 'ytd-comments')))

    # Scroll to the comments section to load initial comments
    driver.execute_script("window.scrollTo(0, document.documentElement.scrollHeight);")

    # Set initial values for dynamic loading
    last_height = driver.execute_script("return document.documentElement.scrollHeight")
    scroll_pause_time = 2  # Time to wait between scrolls
    max_scrolls = 100  # Increase the max number of scrolls to ensure all comments are loaded
    scroll_count = 0

    while scroll_count < max_scrolls:
        # Scroll down to the bottom
        driver.execute_script("window.scrollTo(0, document.documentElement.scrollHeight);")

        # Wait for new comments to load dynamically
        time.sleep(scroll_pause_time)  # Simple wait to allow comments to load

        # Check the new scroll height and compare it with the last height
        new_height = driver.execute_script("return document.documentElement.scrollHeight")
        if new_height == last_height:
            # If the height hasn't changed, try one more scroll to ensure all comments are loaded
            time.sleep(scroll_pause_time)
            new_height = driver.execute_script("return document.documentElement.scrollHeight")
            if new_height == last_height:
                # If the height still hasn't changed, we've reached the end
                print("All comments have been loaded.")
                break

        last_height = new_height
        scroll_count += 1

    # Get the HTML of the comments section
    comments_html = driver.page_source

    # Close the driver
    driver.quit()

    return comments_html

In [12]:
def get_comment_thread_renderers(comments_html):
    """
    Parses the provided HTML content to extract YouTube comment threads and their counts.

    This function uses BeautifulSoup to parse the HTML content of a YouTube video's comments section.
    It finds and prints the number of comments and the number of comment thread renderers (`ytd-comment-thread-renderer`).
    It then returns a list of all the `ytd-comment-thread-renderer` elements found in the HTML.

    Args:
        comments_html (str): The HTML content of the comments section of a YouTube video.

    Returns:
        list: A list of `ytd-comment-thread-renderer` elements found in the HTML.
    """

    soup = BeautifulSoup(comments_html, 'html.parser')

    # Find the span element with the specified class
    comment_count_span = soup.find('span', class_='style-scope yt-formatted-string')

    # Extract the text content of the span element
    comment_count = comment_count_span.text.strip()

    # # Print or use the comment count
    # print("Comment Count:", comment_count)

    # Find all occurrences of the ytd-comment-thread-renderer element
    comment_thread_renderers = soup.find_all('ytd-comment-thread-renderer', class_='style-scope ytd-item-section-renderer')

    # Count the number of occurrences
    comment_thread_count = len(comment_thread_renderers)

    # # Print or use the comment thread count
    # print("Number of ytd-comment-thread-renderer elements:", comment_thread_count)

    return comment_thread_renderers

In [13]:
def get_comments(comment_thread_renderers):
    comments = list()
    comments_data = list()
    # Iterate through each comment thread renderer
    for comment_thread_renderer in comment_thread_renderers:

        # Extracting the comment text
        comment_text_element = comment_thread_renderer.find('yt-attributed-string', id='content-text')
        comment_text = comment_text_element.get_text(strip=True) if comment_text_element else None

        # Extracting the number of likes
        like_count_element = comment_thread_renderer.find('span', class_='style-scope ytd-comment-engagement-bar')
        like_count = like_count_element.get_text(strip=True) if like_count_element else None

        # Extracting the number of replies
        reply_count_element = comment_thread_renderer.find('ytd-button-renderer', id='more-replies')
        reply_count = reply_count_element.get_text(strip=True) if reply_count_element else None

        comments.append(comment_text)

        comments_data.append(
            {
            "comment_text": comment_text,
            "like_count": like_count,
            "reply_count": reply_count

            }
        )

    return comments, comments_data


In [14]:
def clean_description(description_data, model_path="./fine-tuned-lora-model"):
    """
    Cleans and summarizes YouTube video descriptions using a fine-tuned LoRA model.

    Args:
        description_data: A list of dictionaries containing video details (channel_name, video_title, video_description).
        model_path: The path to the fine-tuned LoRA model.

    Returns:
        A list of cleaned and summarized video descriptions.
    """
    # Configure LoRA
    lora_config = LoraConfig(
        r=16,  # Rank of the LoRA matrix
        lora_alpha=32,  # Scaling factor for LoRA
        target_modules=["q_proj", "v_proj"],  # Target attention layers to apply LoRA
        lora_dropout=0.05,  # Dropout rate for LoRA
        bias="none",  # No bias
    )

    # Preparing input for inference
    formatted_inputs = [
        f"Channel: {item['channel_name']}, Title: {item['video_title']}, Description: {item['video_description']}"
        for item in description_data
    ]


    # Loading the fine-tuned LoRA model and tokenizer
    model = BartForConditionalGeneration.from_pretrained(model_path)

    # # Tokenize input
    tokenizer = BartTokenizer.from_pretrained(model_path)
    inputs = tokenizer(formatted_inputs, max_length=512, return_tensors="pt", truncation=True, padding="max_length")

    # Move inputs to the same device as the model
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    inputs = {key: value.to(device) for key, value in inputs.items()}


    # LoRA configuration applied to the model
    lora_model = get_peft_model(model, lora_config)
    lora_model.to(device)

    # Generate Cleaned Descriptions
    with torch.no_grad():
        outputs = lora_model.generate(
            input_ids=inputs["input_ids"],
            attention_mask=inputs["attention_mask"],
            max_length=128,
            num_beams=4,
            early_stopping=True
        )

    # Decode and print summaries
    cleaned_descriptions = tokenizer.batch_decode(outputs, skip_special_tokens=True)

    return cleaned_descriptions[0]

In [15]:
# Retrieve and display video comments
def get_video_comments(video_url, driver):
    """
    Retrieves comments from the provided YouTube video URL.

    Args:
        video_url (str): The URL of the YouTube video.

    Returns:
        list: A list of comments and their data.
    """
    #print("\nwe are now in the get_video_comments function\n")
    comments_html = get_comments_html(video_url, driver)  # Get HTML of comments section
    comment_thread_renderers = get_comment_thread_renderers(comments_html)  # Extract comment renderers
    _, comments_data = get_comments(comment_thread_renderers)  # Extract comment data

    return comments_data

In [16]:
def get_video_data(video_id):
    """Fetches video data from YouTube given a video ID.

    Args:
        video_id (str): The ID of the YouTube video to fetch data for.

    Returns:
        dict: A dictionary containing the video data with the following keys:
            - 'channel_name': The name of the channel that uploaded the video.
            - 'video_title': The title of the video.
            - 'video_description': The description of the video.

    Raises:
        Exception: If there is an error accessing or processing the video data.
    """
    driver = init_webdriver()
    video_url = f"https://www.youtube.com/watch?v={video_id}"
    video_data = {}

    try:
        driver.get(video_url)

        try:
            # Wait for the bottom-row element to be present
            bottom_row = WebDriverWait(driver, 20).until(
                EC.presence_of_element_located((By.XPATH, '//*[@id="bottom-row"]'))
            )

            # Locate and click the expand button if it exists
            try:
                expand_button = WebDriverWait(driver, 10).until(
                    EC.element_to_be_clickable((By.XPATH, '/html/body/ytd-app/div[1]/ytd-page-manager/ytd-watch-flexy/div[5]/div[1]/div/div[2]/ytd-watch-metadata/div/div[4]/div[1]/div/ytd-text-inline-expander/tp-yt-paper-button[1]'))
                )
                expand_button.click()
            except TimeoutException:
                pass  # Ignore if the expand button is not found

            # Wait for elements to be visible and extract data
            expanded_description = WebDriverWait(driver, 10).until(
                EC.visibility_of_element_located((By.ID, 'description-inline-expander'))
            )
            title_element = WebDriverWait(driver, 10).until(
                EC.presence_of_element_located((By.XPATH, '//h1[@class="style-scope ytd-watch-metadata"]//yt-formatted-string'))
            )
            channel_name_element = WebDriverWait(driver, 10).until(
                EC.presence_of_element_located((By.XPATH, '//ytd-channel-name[@id="channel-name"]//yt-formatted-string//a'))
            )

            video_data = {
                'channel_name': channel_name_element.text,
                'video_title': title_element.text,
                'video_description': expanded_description.text
            }
            # print("\nwe got the raw video_data")
            temp_list = list()
            temp_list.append(video_data)
            cleaned_description = clean_description(temp_list)
            video_data['video_description'] = cleaned_description

            comments_data = get_video_comments(video_url, driver)
            video_data['comments'] = comments_data

        except TimeoutException:
            print(f"Error processing {video_url}: Elements not found within timeout.")

    except Exception as e:
        print(f"Error processing {video_url}: {e}")



    finally:
        # Close the browser when done
        close_webdriver(driver)

    return video_data




In [17]:
def read_and_shuffle_video_ids(filename):
  """Reads video IDs from a JSON file and returns a shuffled list."""
  video_id_list = load_from_json(filename)
  random.shuffle(video_id_list)
  return video_id_list


video_id_list = read_and_shuffle_video_ids("video_Id_list.json")


In [18]:
len(video_id_list)

1674

In [19]:
video_id = video_id_list[5]
video_id

'f7NpnBPtilI'

In [20]:
# Test
video_data = get_video_data(video_id)
print(video_data)


WebDriver initialized successfully


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/1.58k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.63G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/363 [00:00<?, ?B/s]

All comments have been loaded.




WebDriver successfully closed
{'channel_name': 'Breakfast Club Power 105.1 FM', 'video_title': 'The Breakfast Club Reacts to Dave Chapelle\'s Netflix Special "The Closer"', 'video_description': 'The Breakfast Club Reacts to Dave Chapelle\'s Netflix Special "The Closer" The Breakfast Club features celebrity interviews, Charlamagne tha God’s Donkey of the Day, Angela Yee\'s Rumor Reports, DJ Envy\'s mixes and so much more! Every guest visiting the world\'s most dangerous morning show is grilled with a blend of honesty and humor.', 'comments': [{'comment_text': "It is impossible to cancel Dave Chappelle. You cannot cancel someone who doesn't need your validation & would happily walk away from it all.", 'like_count': '9.7K', 'reply_count': '199 replies'}, {'comment_text': 'The Closer is a masterclass of comedic mastery, man. Probably the strongest of his Netflix specials. Even with the two (clearly LGBTQ) ladies in front middle section lookin like they wanted to shank him in the parking lo

In [36]:
video_data.keys()

dict_keys(['channel_name', 'video_title', 'video_description', 'comments'])

In [22]:
video_data['video_description']

'The Breakfast Club Reacts to Dave Chapelle\'s Netflix Special "The Closer" The Breakfast Club features celebrity interviews, Charlamagne tha God’s Donkey of the Day, Angela Yee\'s Rumor Reports, DJ Envy\'s mixes and so much more! Every guest visiting the world\'s most dangerous morning show is grilled with a blend of honesty and humor.'

In [21]:


# # Initialize YouTube API client
# api_key = userdata.get('YouTubeAPI_key')
# youtube = initialize_youtube_api(api_key)

# # # Display video info
# # print("\nVideo Information:\n")
# # title, extracted_video_url, view_count, like_count, commentCount, date_posted, first_paragraph, thumbnail_url = get_video_info(video_id)
# # print(f"Title: {title}")
# # print(f"Video URL: {extracted_video_url}")
# # print(f"View count: {view_count}")
# # print(f"Like count: {like_count}")
# # print(f"Number of Comments: {commentCount}")
# # print(f"Date posted: {date_posted}")
# # print(f"Description: {first_paragraph}")
# # print(f"Thumbnail URL: {thumbnail_url}")
# # print("\n")

