<a href="https://colab.research.google.com/github/kkrusere/EV_Market-Analysis-and-Consumer-Behavior/blob/main/EV_YouTube_video_SA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%%shell
sudo apt -y update
sudo apt install -y wget curl unzip
wget http://archive.ubuntu.com/ubuntu/pool/main/libu/libu2f-host/libu2f-udev_1.1.4-1_all.deb
dpkg -i libu2f-udev_1.1.4-1_all.deb
wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb
dpkg -i google-chrome-stable_current_amd64.deb

wget -N https://edgedl.me.gvt1.com/edgedl/chrome/chrome-for-testing/118.0.5993.70/linux64/chromedriver-linux64.zip -P /tmp/
unzip -o /tmp/chromedriver-linux64.zip -d /tmp/
chmod +x /tmp/chromedriver-linux64/chromedriver
mv /tmp/chromedriver-linux64/chromedriver /usr/local/bin/chromedriver
pip install selenium chromedriver_autoinstaller

pip install bitsandbytes
pip install accelerate
pip install trl peft
pip install datasets
pip install rouge-score
pip install huggingface_hub
pip install transformers
pip install boto3

In [None]:
# Standard library imports
import json
import os
import shutil
import sys
import random
import re
import time
import datetime
import warnings
# Suppress warnings
warnings.filterwarnings("ignore")

# Third-party imports
import numpy as np
import pandas as pd
import torch
import torch.nn as nn

from datasets import Dataset

from peft import (
    LoraConfig,
    PeftConfig,
    PeftModel,
    TaskType,
    get_peft_model,
)
from tqdm import tqdm
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BartForConditionalGeneration,
    BartTokenizer,
    BitsAndBytesConfig,
    EarlyStoppingCallback,
    Seq2SeqTrainer,
    Seq2SeqTrainingArguments,
    Trainer,
    TrainingArguments,
    logging,
    pipeline,
)


import boto3
from botocore.exceptions import ClientError


from google.colab import drive, userdata
from huggingface_hub import login
# Mount Google Drive
drive.mount('/content/drive')
# Change the working directory to the desired location in Google Drive
os.chdir("/content/drive/MyDrive/NLP_Data")


# Retrieve the Hugging Face API token from user data
huggingface_token = userdata.get('Hugging_Face_Hub_API_TOKEN')
# Log in to Hugging Face Hub
login(huggingface_token, add_to_git_credential=True)

# Setting up AWS Credentials into Environment Variables
os.environ['AWS_ACCESS_KEY_ID'] = userdata.get('aws_access_key_id')
os.environ['AWS_SECRET_ACCESS_KEY'] = userdata.get('aws_secret_access_key')
os.environ['AWS_DEFAULT_REGION'] = userdata.get('aws_region')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Token is valid (permission: fineGrained).
Your token has been saved in your configured git credential helpers (store).
Your token has been saved to /root/.cache/huggingface/token
Login successful


In [None]:
# def create_dynamodb_table(table_name='YouTubeVideoData'):
#     dynamodb = boto3.client('dynamodb')

#     try:
#         dynamodb.create_table(
#             TableName=table_name,
#             KeySchema=[
#                 {'AttributeName': 'video_id', 'KeyType': 'HASH'},   # Partition Key
#                 {'AttributeName': 'last-updated-on', 'KeyType': 'RANGE'}  # Sort Key
#             ],
#             AttributeDefinitions=[
#                 {'AttributeName': 'video_id', 'AttributeType': 'S'},
#                 {'AttributeName': 'last-updated-on', 'AttributeType': 'S'}
#             ],
#             ProvisionedThroughput={'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5}
#         )
#         print(f"Table '{table_name}' created successfully with 'video_id' as Partition Key and 'last-updated-on' as Sort Key.")
#     except ClientError as e:
#         print(f"Error creating table: {e.response['Error']['Message']}")

# # Call the function to create the table
# create_dynamodb_table()



In [None]:
# def delete_dynamodb_table(table_name):
#     """
#     Deletes a DynamoDB table programmatically.

#     Args:
#         table_name (str): The name of the DynamoDB table to delete.

#     Returns:
#         None
#     """
#     dynamodb = boto3.client('dynamodb')

#     try:
#         # Initiate table deletion
#         dynamodb.delete_table(TableName=table_name)
#         print(f"Deletion initiated for table '{table_name}'.")

#         # Wait until the table is fully deleted
#         waiter = dynamodb.get_waiter('table_not_exists')
#         waiter.wait(TableName=table_name)
#         print(f"Table '{table_name}' has been deleted successfully.")
#     except ClientError as e:
#         print(f"Error deleting table: {e.response['Error']['Message']}")


# table_name = 'YouTubeVideoData'
# delete_dynamodb_table(table_name)


In [None]:
# Third-party imports
import chromedriver_autoinstaller

from bs4 import BeautifulSoup

from selenium import webdriver
from selenium.common.exceptions import (
    ElementNotInteractableException,
    TimeoutException,
)
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait

################################################################################
# Initialize and Close WebDriver
def init_webdriver():
    """Initializes and returns a Chrome WebDriver instance with options."""
    try:
        chrome_options = webdriver.ChromeOptions()
        chrome_options.add_argument("--headless")  # Run in headless mode
        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--disable-dev-shm-usage")
        chromedriver_autoinstaller.install()  # Automatically install chromedriver
        driver = webdriver.Chrome(options=chrome_options)
        print("WebDriver initialized successfully")
        return driver
    except Exception as e:
        print(f"Failed to initialize WebDriver: {e}")
        raise


def close_webdriver(driver):
    """Closes the provided WebDriver instance."""
    driver.quit()
    print("WebDriver successfully closed")


################################################################################
# YouTube URL and Video ID Handling
def get_youtube_url(video_id):
    """Constructs a YouTube URL from a given video ID."""
    return f"https://www.youtube.com/watch?v={video_id}"


def get_youtube_videoID(youtube_url):
    """Extracts the YouTube video ID from a given YouTube URL."""
    if not youtube_url:
        return None
    try:
        if "watch?v=" in youtube_url:
            video_id = youtube_url.split("watch?v=")[1].split("&")[0]
            return video_id
        elif "youtu.be/" in youtube_url:
            video_id = youtube_url.split("youtu.be/")[1].split("?")[0]
            return video_id
        else:
            return None
    except Exception as e:
        print(f"Error extracting video ID: {e}")
        return None


################################################################################
# Fetch Comments HTML
def get_comments_html(video_url, driver):
    """
    Fetches the HTML content of the comments section from a YouTube video.

    Args:
        video_url (str): The URL of the YouTube video from which to fetch comments.
        driver: An initialized WebDriver instance (from Selenium).

    Returns:
        str: The HTML content of the comments section.

    Raises:
        TimeoutException: If the comments section does not load within the specified time.
    """
    # Wait until the comments section is loaded
    WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.CSS_SELECTOR, "ytd-comments"))
    )

    # Scroll to the comments section to load initial comments
    driver.execute_script(
        "window.scrollTo(0, document.documentElement.scrollHeight);"
    )

    # Set initial values for dynamic loading
    last_height = driver.execute_script(
        "return document.documentElement.scrollHeight"
    )
    scroll_pause_time = 2  # Time to wait between scrolls
    max_scrolls = 100  # Max number of scrolls to ensure all comments are loaded
    scroll_count = 0

    while scroll_count < max_scrolls:
        # Scroll down to the bottom
        driver.execute_script(
            "window.scrollTo(0, document.documentElement.scrollHeight);"
        )

        # Wait for new comments to load dynamically
        time.sleep(scroll_pause_time)

        # Check the new scroll height and compare it with the last height
        new_height = driver.execute_script(
            "return document.documentElement.scrollHeight"
        )
        if new_height == last_height:
            # If the height hasn't changed, try one more scroll to ensure all comments are loaded
            time.sleep(scroll_pause_time)
            new_height = driver.execute_script(
                "return document.documentElement.scrollHeight"
            )
            if new_height == last_height:
                print("All comments have been loaded.")
                break

        last_height = new_height
        scroll_count += 1

    # Get the HTML of the comments section
    comments_html = driver.page_source

    # Close the driver
    close_webdriver(driver)

    return comments_html


################################################################################
# Parse Comments
def get_comment_thread_renderers(comments_html):
    """
    Parses the provided HTML content to extract YouTube comment threads.

    Args:
        comments_html (str): The HTML content of the comments section of a YouTube video.

    Returns:
        list: A list of `ytd-comment-thread-renderer` elements found in the HTML.
    """
    soup = BeautifulSoup(comments_html, "html.parser")
    comment_thread_renderers = soup.find_all(
        "ytd-comment-thread-renderer",
        class_="style-scope ytd-item-section-renderer",
    )
    return comment_thread_renderers


def get_comments(comment_thread_renderers):
    """
    Extracts comments and associated data from the list of comment thread renderers.

    Args:
        comment_thread_renderers (list): List of 'ytd-comment-thread-renderer' elements.

    Returns:
        tuple: A tuple containing a list of comment texts and a list of dictionaries with comment data.
    """
    comments = []
    comments_data = []

    for comment_thread_renderer in comment_thread_renderers:
        # Extract the comment text
        comment_text_element = comment_thread_renderer.find(
            "yt-attributed-string", id="content-text"
        )
        comment_text = (
            comment_text_element.get_text(strip=True)
            if comment_text_element
            else None
        )

        # Extract the number of likes
        like_count_element = comment_thread_renderer.find(
            "span", class_="style-scope ytd-comment-engagement-bar"
        )
        like_count = (
            like_count_element.get_text(strip=True)
            if like_count_element
            else None
        )

        # Extract the number of replies
        reply_count_element = comment_thread_renderer.find(
            "ytd-button-renderer", id="more-replies"
        )
        reply_count = (
            reply_count_element.get_text(strip=True)
            if reply_count_element
            else None
        )

        comments.append(comment_text)
        comments_data.append(
            {
                "comment_text": comment_text,
                "like_count": like_count,
                "reply_count": reply_count,
            }
        )

    return comments, comments_data


################################################################################
# Retrieve Video Comments
def get_video_comments(video_url, driver):
    """
    Retrieves comments from the provided YouTube video URL.

    Args:
        video_url (str): The URL of the YouTube video.
        driver: An initialized WebDriver instance (from Selenium).

    Returns:
        list: A list of comments and their data.
    """
    comments_html = get_comments_html(video_url, driver)
    comment_thread_renderers = get_comment_thread_renderers(comments_html)
    _, comments_data = get_comments(comment_thread_renderers)
    return comments_data


################################################################################
# Clean and Summarize Video Description
def clean_description(video_data_dict, model_path="./Bart-Desc-Sum-fine-tuned-lora-model"):
    """
    Cleans and summarizes YouTube video descriptions using a fine-tuned LoRA model.

    Args:
        video_data_dict (dict): A dictionary containing video data with keys 'channel_name', 'video_title', and 'video_description'.
        model_path (str, optional): Path to the fine-tuned summarization model.

    Returns:
        str: The cleaned and summarized description.
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # Load the fine-tuned model and tokenizer
    pipe = pipeline("summarization", model=model_path, tokenizer=model_path, device=device)

    # Construct the input text
    input_text = (
        f"Channel: {video_data_dict['channel_name']}, "
        f"Title: {video_data_dict['video_title']}, "
        f"Description: {video_data_dict['video_description']}"
    )
    tokenizer = BartTokenizer.from_pretrained(model_path)
    input_length = len(tokenizer.encode(input_text))
    max_length = int(input_length * 0.5)


    # Generate a cleaned description
    cleaned_description = pipe(input_text, max_length=max_length)[0]["summary_text"]

    return cleaned_description


################################################################################
# Fetch Video Data
def get_video_data(video_id):
    """
    Fetches video data from YouTube given a video ID.

    Args:
        video_id (str): The ID of the YouTube video to fetch data for.

    Returns:
        dict: A dictionary containing the video data with keys:
            - 'channel_name'
            - 'video_title'
            - 'video_description'
            - 'comments'
    """
    driver = init_webdriver()
    video_url = f"https://www.youtube.com/watch?v={video_id}"
    video_data = {}

    try:
        driver.get(video_url)

        # Handle YouTube consent dialog if it appears
        try:
            consent_button = WebDriverWait(driver, 10).until(
                EC.element_to_be_clickable(
                    (By.XPATH, '//button[contains(., "I agree")]')
                )
            )
            consent_button.click()
        except TimeoutException:
            print("No consent dialog found or already handled.")

        # Handle any other potential modal dialogs that might pop up
        try:
            dialog_close_button = WebDriverWait(driver, 5).until(
                EC.element_to_be_clickable(
                    (By.XPATH, '//button[@aria-label="Close"]')
                )
            )
            dialog_close_button.click()
        except TimeoutException:
            print("No additional modal dialogs found.")

        try:
            # Wait for the bottom-row element to be present
            WebDriverWait(driver, 20).until(
                EC.presence_of_element_located(
                    (By.XPATH, '//*[@id="bottom-row"]')
                )
            )

            # Locate and click the expand button if it exists
            try:
                expand_button = WebDriverWait(driver, 10).until(
                    EC.presence_of_element_located(
                        (By.XPATH, '//tp-yt-paper-button[@id="expand"]')
                    )
                )
                # Use JavaScript to click the element to bypass any overlay issues
                driver.execute_script(
                    "arguments[0].scrollIntoView();", expand_button
                )
                driver.execute_script("arguments[0].click();", expand_button)
            except TimeoutException:
                pass  # Ignore if the expand button is not found

            # Wait for elements to be visible and extract data
            expanded_description = WebDriverWait(driver, 10).until(
                EC.visibility_of_element_located(
                    (By.ID, "description-inline-expander")
                )
            )
            title_element = WebDriverWait(driver, 10).until(
                EC.presence_of_element_located(
                    (
                        By.XPATH,
                        '//h1[@class="style-scope ytd-watch-metadata"]//yt-formatted-string',
                    )
                )
            )
            channel_name_element = WebDriverWait(driver, 10).until(
                EC.presence_of_element_located(
                    (
                        By.XPATH,
                        '//ytd-channel-name[@id="channel-name"]//yt-formatted-string//a',
                    )
                )
            )

            video_data = {
                "channel_name": channel_name_element.text,
                "video_title": title_element.text,
                "video_description": expanded_description.text,
            }

            # Clean the description
            cleaned_description = clean_description(video_data)
            video_data["video_description"] = cleaned_description

            # Fetch comments
            comments_data = get_video_comments(video_url, driver)
            video_data["comments"] = comments_data

        except TimeoutException:
            print(
                f"Error processing {video_url}: Elements not found within timeout."
            )

    except Exception as e:
        print(f"Error processing {video_url}: {e}")

    finally:
        # Close the browser when done
        close_webdriver(driver)

    return video_data


################################################################################
# Data Conversion and Cleaning
def convert_to_int(count_str):
    """
    Converts a string representing a large number with suffixes 'K', 'M', or 'B' into an integer.

    Args:
        count_str (str or int): The count string to convert.

    Returns:
        int: The numerical equivalent of the input string.
    """
    if count_str is None:
        return 0

    if isinstance(count_str, str):
        count_str = count_str.strip()
        multiplier = 1

        if "K" in count_str:
            multiplier = 1000
            count_str = count_str.replace("K", "").strip()
        elif "M" in count_str:
            multiplier = 1_000_000
            count_str = count_str.replace("M", "").strip()
        elif "B" in count_str:
            multiplier = 1_000_000_000
            count_str = count_str.replace("B", "").strip()

        # Extract numerical part
        numeric_part = re.findall(r"[-+]?\d*\.\d+|\d+", count_str)
        if numeric_part:
            count_str = numeric_part[0]
        else:
            return 0

        try:
            return int(float(count_str) * multiplier)
        except ValueError:
            return 0
    else:
        try:
            return int(count_str)
        except ValueError:
            return 0


def convert_video_comments_to_dataframe(video_data):
    """
    Converts the video data and its comments into a list of dictionaries.

    Args:
        video_data (dict): The video data containing comments.

    Returns:
        list: A list of dictionaries with comment data.
    """
    if "comments" not in video_data or not video_data["comments"]:
        print("No comments to convert.")
        return []

    rows = []

    for comment in video_data["comments"]:
        row = {
            "channel_name": video_data["channel_name"],
            "video_title": video_data["video_title"],
            "video_description": video_data["video_description"],
            "comment_text": comment["comment_text"],
            "like_count": convert_to_int(comment["like_count"]),
            "reply_count": convert_to_int(comment["reply_count"]),
        }
        rows.append(row)

    df = pd.DataFrame(rows)
    print(f"Converted comments to DataFrame with {len(df)} rows.")
    return df.to_dict(orient="records")


################################################################################
# Prepare the Test Data for Inference
def prepare_test_data_from_dict(data_dict, for_inference=True):
    """
    Prepares the test data for inference from a dictionary.

    Args:
        data_dict (list): List of dictionaries containing comment data.
        for_inference (bool, optional): If True, the output is left empty.

    Returns:
        Dataset: A Hugging Face Dataset object.
    """
    df = pd.DataFrame(data_dict)

    formatted_input_data = [
        {
            "input": (
                f"Channel Name: {row['channel_name']}\n"
                f"Video Title: {row['video_title']}\n"
                f"Description: {row['video_description']}\n"
                f"Comment Text: {row['comment_text']}\n"
            ),
            "output": "Sentiment: , Explanation: ",
        }
        for _, row in df.iterrows()
    ]

    return Dataset.from_list(formatted_input_data)


def tokenize_dataset(dataset, tokenizer):
    """
    Tokenizes the dataset for model inference using the provided tokenizer.

    Args:
        dataset (Dataset): The dataset to tokenize.
        tokenizer: The tokenizer to use.

    Returns:
        Dataset: The tokenized dataset.
    """

    def tokenize_data(example):
        return tokenizer(
            example["input"],
            max_length=512,
            padding="max_length",
            truncation=True,
        )

    return dataset.map(tokenize_data, batched=True)


def run_inference(model, tokenizer, tokenized_dataset):
    """
    Runs inference on the tokenized dataset using the provided model and tokenizer.

    Args:
        model: The fine-tuned model.
        tokenizer: The tokenizer to use.
        tokenized_dataset (Dataset): The tokenized dataset.

    Returns:
        list: A list of predictions.
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()
    predictions = []

    for example in tokenized_dataset:
        inputs = tokenizer(
            example["input"],
            return_tensors="pt",
            truncation=True,
            padding=True,
            max_length=512,
        )
        inputs = {key: value.to(device) for key, value in inputs.items()}

        with torch.no_grad():
            outputs = model.generate(
                **inputs, max_length=150, num_beams=5, early_stopping=True
            )

        prediction = tokenizer.decode(outputs[0], skip_special_tokens=True)
        predictions.append(prediction)

    return predictions


def get_predictions_from_data_dict(data_dict, model, tokenizer):
    """
    Processes the data from a dictionary, runs inference, and returns the predictions.

    Args:
        data_dict (list): List of dictionaries containing comment data.
        model: The fine-tuned model.
        tokenizer: The tokenizer to use.

    Returns:
        list: A list of predictions.
    """
    formatted_dataset = prepare_test_data_from_dict(data_dict, for_inference=True)
    tokenized_dataset = tokenize_dataset(formatted_dataset, tokenizer)
    predictions = run_inference(model, tokenizer, tokenized_dataset)
    return predictions


################################################################################
# Sentiment and Explanation Processing
def split_sentiment_explanation(item):
    """
    Splits the model's output into sentiment and explanation.

    Args:
        item (str): The model's output string.

    Returns:
        tuple: A tuple containing the sentiment and explanation.
    """
    sentiment_part, explanation_part = item.split(", Explanation: ")
    sentiment = sentiment_part.replace("Sentiment: ", "").strip()
    explanation = explanation_part.strip()
    print(f"Sentiment: {sentiment}, Explanation: {explanation}")
    return sentiment, explanation



################################################################################
# Saving Data to S3
def save_data_to_s3(video_id, processed_video_data, bucket_name =  'experiment-api-data-bucket'):
    """
    Saves processed_video_data to an S3 bucket.

    Args:
        video_id (str): The YouTube video ID.
        processed_video_data (list): The processed video data to save.
        bucket_name (str): The name of the S3 bucket.

    Returns:
        str: The S3 object key where the data is stored.
    """
    s3 = boto3.client('s3')

    # Serialize data to JSON
    data_json = json.dumps(processed_video_data)

    # Define the S3 object key
    s3_key = f"{video_id}.json"

    # Save to S3
    try:
        s3.put_object(Bucket=bucket_name, Key=s3_key, Body=data_json)
        print(f"Data for video_id '{video_id}' saved to S3 bucket '{bucket_name}'.")
        return s3_key
    except ClientError as e:
        print(f"Error saving data to S3: {e.response['Error']['Message']}")
        return None

################################################################################
# Saving S3 Reference to DynamoDB
def save_s3_reference_to_dynamodb(video_id, s3_key, table_name='YouTubeVideoData'):
    """
    Stores the S3 object key and last-updated-on timestamp in DynamoDB, indexed by video_id and last-updated-on.

    Args:
        video_id (str): The YouTube video ID.
        s3_key (str): The S3 object key where the data is stored.
        table_name (str): The name of the DynamoDB table.

    Returns:
        None
    """
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(table_name)

    # Get the current UTC datetime in ISO 8601 format
    current_time = datetime.datetime.utcnow().isoformat()

    try:
        table.put_item(
            Item={
                'video_id': video_id,
                'last-updated-on': current_time,
                's3_key': s3_key
            }
        )
        print(f"S3 reference for video_id '{video_id}' saved to DynamoDB with timestamp '{current_time}'.")
    except ClientError as e:
        print(f"Error saving S3 reference to DynamoDB: {e.response['Error']['Message']}")

################################################################################
# Retrieving S3 Reference from DynamoDB

def get_latest_s3_reference_from_dynamodb(video_id, table_name='YouTubeVideoData'):
    """
    Retrieves the latest S3 object key and last-updated-on timestamp from DynamoDB using the video_id.

    Args:
        video_id (str): The YouTube video ID.
        table_name (str): The name of the DynamoDB table.

    Returns:
        dict: A dictionary containing 's3_key' and 'last-updated-on'.
    """
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(table_name)

    try:
        response = table.query(
            KeyConditionExpression=Key('video_id').eq(video_id),
            ScanIndexForward=False,  # Sorts results in descending order
            Limit=1  # Get only the latest item
        )
        items = response.get('Items')
        if items:
            item = items[0]
            print(f"S3 reference for video_id '{video_id}' retrieved from DynamoDB.")
            return {
                's3_key': item.get('s3_key'),
                'last-updated-on': item.get('last-updated-on')
            }
        else:
            print(f"No data found for video_id '{video_id}'.")
            return None
    except ClientError as e:
        print(f"Error retrieving S3 reference from DynamoDB: {e.response['Error']['Message']}")
        return None

################################################################################
# Retrieve Data from S3

def get_data_from_s3(video_id, bucket_name='experiment-api-data-bucket'):
    """
    Retrieves processed_video_data from S3 using the video_id.

    Args:
        video_id (str): The YouTube video ID.
        bucket_name (str): The name of the S3 bucket.

    Returns:
        list: The processed_video_data associated with the video_id.
    """
    s3 = boto3.client('s3')
    s3_key = f"{video_id}.json"

    try:
        response = s3.get_object(Bucket=bucket_name, Key=s3_key)
        data_json = response['Body'].read().decode('utf-8')
        data = json.loads(data_json)
        print(f"Data for video_id '{video_id}' retrieved from S3.")
        return data
    except ClientError as e:
        print(f"Error retrieving data from S3: {e.response['Error']['Message']}")
        return None


################################################################################
# Main Execution Flow
def main(youtube_url):
    """
    Main function to execute all steps.

    Args:
        youtube_url (str): The YouTube video URL.
    """
    video_id = get_youtube_videoID(youtube_url)
    if not video_id:
        print("Invalid YouTube URL.")
        return

    # Step 1 & 2: Collect comments and clean description
    video_data = get_video_data(video_id)
    if not video_data:
        print("Failed to retrieve video data.")
        return

    # Convert comments to DataFrame
    data_dict = convert_video_comments_to_dataframe(video_data)
    if not data_dict:
        print("No comments data available.")
        return

    # Step 3: Sentiment Analysis
    model_path = "./SA-bart-fine-tuned-lora-model"  # Update with your model path
    model = BartForConditionalGeneration.from_pretrained(model_path)
    model.eval()
    tokenizer = BartTokenizer.from_pretrained(model_path)

    predictions = get_predictions_from_data_dict(data_dict, model, tokenizer)
    print("Sentiment analysis completed.")

    processed_video_data = [
        {**data_entry, "sentiment_&_explanations": prediction}
        for data_entry, prediction in zip(data_dict, predictions)
    ]

    return processed_video_data


In [None]:
# # # Example Usage
# #input youtube_video_url
# youtube_video_url = "https://www.youtube.com/watch?v=MOreJAyfUUU" #@param {type:"string"}




In [None]:
# video_id = get_youtube_videoID(youtube_video_url)

# if not video_id:
#     print("Failed to extract video ID. Exiting.")
#     sys.exit(1)

# # Process the video data
# processed_video_data = main(youtube_video_url)

# if not processed_video_data:
#     print("No processed data to save. Exiting.")
#     sys.exit(1)

# bucket_name = 'experiment-api-data-bucket'
# # Save data to S3
# s3_key = save_data_to_s3(video_id, processed_video_data)

# if s3_key:
#     # Save S3 reference and timestamp to DynamoDB
#     save_s3_reference_to_dynamodb(video_id, s3_key)
# else:
#     print("Failed to save data to S3. Skipping DynamoDB update.")

In [None]:
# from googleapiclient.discovery import build


# API_KEY = userdata.get('YouTubeAPI_key')

# # Define the search queries related to electric vehicles
# search_queries = [
#     "electric vehicles", "EV charging infrastructure", "Tesla vs BYD",
#     "electric car reviews", "EV market trends", "future of electric cars",
#     "EV adoption barriers", "EV battery technology", "EV incentives and policies",
#     "news about EVs", "electric car comparisons", "EV pros and cons", "EV supply chain",
#     "EV maintenance costs", "EV safety and features", "affordable electric cars",
#     "luxury electric cars", "EV growth and market share", "self-driving electric vehicles"
# ]

# # Initialize the YouTube API client
# youtube = build('youtube', 'v3', developerKey=API_KEY)

# # Function to fetch videos from YouTube based on a search query
# def fetch_youtube_videos(query, max_results=50):
#     request = youtube.search().list(
#         part="snippet",
#         q=query,
#         type="video",
#         maxResults=max_results
#     )
#     response = request.execute()

#     videos = []
#     for item in response['items']:
#         video_details = {
#             'title': item['snippet']['title'],
#             'channel': item['snippet']['channelTitle'],
#             'published_at': item['snippet']['publishedAt'],
#             'description': item['snippet']['description'],
#             'video_id': item['id']['videoId'],
#             'url': f"https://www.youtube.com/watch?v={item['id']['videoId']}"
#         }
#         videos.append(video_details)

#     return videos

# # List to store the URLs of all videos
# video_urls = []
# video_channels = []

# # Iterate over the search queries and fetch video URLs
# for query in search_queries:
#     ev_videos = fetch_youtube_videos(query)
#     for video in ev_videos:
#         print(f"Title: {video['title']}")
#         print(f"Channel: {video['channel']}")
#         print(f"URL: {video['url']}")
#         video_urls.append(video['url'])
#         video_channels.append(video['channel'])


In [None]:
with open('EV_videoID.json', 'r') as f:
  video_IDs_list = json.load(f)


In [None]:
for video_id in video_IDs_list:
    #video_id = get_youtube_videoID(youtube_video_url)
    youtube_video_url = f"https://www.youtube.com/watch?v={video_id}"

    if not video_id:
        print("Failed to extract video ID. Exiting.")
        sys.exit(1)

    # Process the video data
    processed_video_data = main(youtube_video_url)

    if not processed_video_data:
        print("No processed data to save. Exiting.")
        sys.exit(1)

    bucket_name = 'experiment-api-data-bucket'
    # Save data to S3
    s3_key = save_data_to_s3(video_id, processed_video_data)

    if s3_key:
        # Save S3 reference and timestamp to DynamoDB
        save_s3_reference_to_dynamodb(video_id, s3_key)
    else:
        print("Failed to save data to S3. Skipping DynamoDB update.")



WebDriver initialized successfully
No consent dialog found or already handled.
No additional modal dialogs found.


config.json:   0%|          | 0.00/1.58k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.63G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/363 [00:00<?, ?B/s]

WebDriver successfully closed




WebDriver successfully closed
Converted comments to DataFrame with 2000 rows.




Map:   0%|          | 0/2000 [00:00<?, ? examples/s]

In [None]:
len(video_IDs_list)