# Setting and suporting functions

In [4]:
import os
from dotenv import load_dotenv
from googleapiclient.discovery import build
from youtube_transcript_api import YouTubeTranscriptApi
import pandas as pd
import time
import json
from typing import List, Dict, Optional
from yt_dlp import YoutubeDL
# Load environment variables
load_dotenv()

# Get YouTube API key
API_KEY = os.getenv("J_YOUTUBE_API_KEY")
if not API_KEY:
    raise ValueError("API key not found. Please ensure the '.env' file is set up correctly.")
youtube = build("youtube", "v3", developerKey=API_KEY)

# Constants
MAX_RESULTS_PER_PAGE = 50
CACHE_DIR = "cache"
os.makedirs(CACHE_DIR, exist_ok=True)

def get_video_stats(video_id: str) -> Optional[Dict]:
    """
    Fetch statistics for a given video ID.
    """
    try:
        response = youtube.videos().list(
            part="snippet,statistics,contentDetails,liveStreamingDetails,paidProductPlacementDetails,status,topicDetails",
            id=video_id
        ).execute()
    except Exception as e:
        print(f"Error fetching video data: {e}")
        return None

    if not response.get("items"):
        print(f"No video found with ID: {video_id}")
        return None

    video_data = response["items"][0]
    return {
        "title": video_data.get("snippet", {}).get("title", ""),
        "description": video_data.get("snippet", {}).get("description", ""),
        "channelTitle": video_data.get("snippet", {}).get("channelTitle", ""),
        "publishTime": video_data.get("snippet", {}).get("publishedAt", ""),
        "channelId": video_data.get("snippet", {}).get("channelId", ""),
        "thumbnail": video_data.get("snippet", {}).get("thumbnails", {}).get("standard", {}).get("url", ""),
        "tags": video_data.get("snippet", {}).get("tags", []),
        "topicCategories": video_data.get("topicDetails", {}).get("topicCategories", []),
        "categoryId": video_data.get("snippet", {}).get("categoryId", None),
        "liveBroadcastContent": video_data.get("snippet", {}).get("liveBroadcastContent", None),
        "duration": video_data.get("contentDetails", {}).get("duration", None),
        "has_caption": video_data.get("contentDetails", {}).get("caption", None),
        "licensedContent": video_data.get("contentDetails", {}).get("licensedContent", None),
        "definition": video_data.get("contentDetails", {}).get("definition", None),
        "contentRating": video_data.get("contentDetails", {}).get("contentRating", None),
        "madeForKids": video_data.get("status", {}).get("madeForKids", None),
        "privacyStatus": video_data.get("status", {}).get("privacyStatus", None),
        "viewCount": video_data.get("statistics", {}).get("viewCount", None),
        "likeCount": video_data.get("statistics", {}).get("likeCount", None),
        "commentCount": video_data.get("statistics", {}).get("commentCount", None),
        "dislikeCount": video_data.get("statistics", {}).get("dislikeCount", None),
        "favoriteCount": video_data.get("statistics", {}).get("favoriteCount", None),
        "paidProductPlacement": video_data.get("paidProductPlacementDetails", {}).get("hasPaidProductPlacement", None)
    }

def get_channel_stats(channel_id: str) -> Optional[Dict]:
    """
    Fetch statistics for a given channel ID.
    """
    try:
        response = youtube.channels().list(
            part="snippet,statistics,contentDetails,topicDetails",
            id=channel_id
        ).execute()
    except Exception as e:
        print(f"Error fetching channel data: {e}")
        return None

    if not response.get("items"):
        print(f"No channel found with ID: {channel_id}")
        return None

    channel_data = response["items"][0]
    return {
        "channel_title": channel_data["snippet"]["title"],
        "channel_description": channel_data["snippet"]["description"],
        "channel_custom_url": channel_data["snippet"]["customUrl"],
        "channel_thumbnail": channel_data["snippet"]["thumbnails"]["default"]["url"],
        "channel_published_at": channel_data["snippet"]["publishedAt"],
        "channel_view_count": channel_data["statistics"].get("viewCount", None),
        "channel_subscriber_count": channel_data["statistics"].get("subscriberCount", None),
        "channel_video_count": channel_data["statistics"].get("videoCount", None),
        "channel_playlist_count": channel_data["statistics"].get("playlistCount", None),
        "channel_comment_count": channel_data["statistics"].get("commentCount", None),
        "channel_live_count": channel_data["statistics"].get("liveCount", None),
        "channel_topic_categories": channel_data["topicDetails"].get("topicCategories", []),
        "channel_topic_ids": channel_data["topicDetails"].get("topicIds", []),
        "channel_country": channel_data["snippet"].get("country", None),
    }

def get_comments(video_id: str, cmt_count: int = 200, max_results: int = 100) -> Dict:
    """
    Fetch comments for a given video ID.
    """
    all_comments = []
    next_page_token = None
    search_pages = (cmt_count // max_results) + (1 if cmt_count % max_results else 0)

    for i in range(search_pages):
        try:
            comment_request = youtube.commentThreads().list(
                part="snippet",
                videoId=video_id,
                pageToken=next_page_token,
                textFormat="plainText",
                maxResults=max_results,
                order="relevance"
            )
            comment_response = comment_request.execute()

            for item in comment_response['items']:
                top_comment = item['snippet']['topLevelComment']['snippet']
                all_comments.append({
                    'Timestamp': top_comment['publishedAt'],
                    'Username': top_comment['authorDisplayName'],
                    'VideoID': video_id,
                    'Comment': top_comment['textDisplay'],
                    'Date': top_comment.get('updatedAt', top_comment['publishedAt']),
                    "likeCount": top_comment["likeCount"],
                    "totalReplyCount": item["snippet"]["totalReplyCount"],
                })
            print(f"Page {i + 1} of comments fetched for video {video_id}")
            next_page_token = comment_response.get('nextPageToken')
            if not next_page_token:
                break
        except Exception as e:
            print(f"Error fetching comments for video {video_id}: {e}")
            break

    return {"top_comments": all_comments}

def get_transcript(video_id: str) -> Dict:
    """
    Fetch transcript for a given video ID.
    """
    try:
        transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
        transcript = transcript_list.find_transcript(['en'])
        transcript_src = transcript.fetch()
        transcript_txt = " ".join([line['text'] for line in transcript_src])
        return {"transcript": transcript_txt, "transcript_is_generated": transcript.is_generated}
    except Exception as e:
        print(f"Error fetching transcript for video {video_id}: {e}")
        return {"transcript": None, "transcript_is_generated": None}

def fetch_video_page(youtube, keyword: str, order: str, region: str, language: str, video_duration: str, 
                     published_start: str, published_end: str, video_quality: str, token: Optional[str] = None) -> Dict:
    """
    Fetch a single page of video search results from the YouTube API.
    """
    try:
        request = youtube.search().list(
            part="snippet",
            maxResults=MAX_RESULTS_PER_PAGE,
            q=keyword,
            order=order,
            type="video",
            regionCode=region,
            relevanceLanguage=language,
            videoDuration=video_duration,
            publishedAfter=published_start,
            publishedBefore=published_end,
            pageToken=token,
            videoDefinition=video_quality
        )
        return request.execute()
    except Exception as e:
        print(f"Error fetching video page: {e}")
        return {}

def extract_video_data(item: Dict, product_type: str, product_name: str) -> Dict:
    """
    Extract relevant video data from a single API response item.
    """
    return {
        "product_type": product_type,
        "product_name": product_name,
        "video_id": item["id"]["videoId"],
        "video_url": f"https://www.youtube.com/watch?v={item['id']['videoId']}",
        "video_title": item["snippet"]["title"],
        "video_description": item["snippet"]["description"],
        "video_channel": item["snippet"]["channelTitle"],
        "video_channel_id": item["snippet"]["channelId"],
        "video_published": item["snippet"]["publishedAt"],
        "video_thumbnail": item["snippet"]["thumbnails"]["default"]["url"]
    }

def save_to_cache(cache_file: str, keyword: str, response_list: List[Dict]) -> None:
    """
    Save the API response to a cache file.
    """
    cache_json = {'keyword': keyword, 'response_list': response_list}
    with open(cache_file, 'w') as f:
        json.dump(cache_json, f, indent=4, ensure_ascii=False)

def search_videos_by_keyword(youtube, product_type: str, product_name: str, region: str = 'US', language: str = "en", 
                            video_quality: str = "any", video_duration: str = "any", videos_count: int = 100, 
                            order: str = "viewCount", published_start: str = '2023-01-01T00:00:00Z', 
                            published_end: str = '2024-12-31T23:59:59Z', token: Optional[str] = None) -> List[Dict]:
    """
    Search for videos by keyword and return a list of video data.
    """
    keyword = f"{product_type} review"
    cache_file = os.path.join(CACHE_DIR, f'ytb_search_cache_{product_type}.json')
    response_list = []
    pages = (videos_count // MAX_RESULTS_PER_PAGE) + (1 if videos_count % MAX_RESULTS_PER_PAGE else 0)

    for i in range(pages):
        response = fetch_video_page(youtube, keyword, order, region, language, video_duration, 
                                   published_start, published_end, video_quality, token)
        if not response:
            break

        response_list.append(response)
        print(f"Page {i+1}: {response['pageInfo']['totalResults']} videos found. Next page token: {response.get('nextPageToken')}")

        token = response.get("nextPageToken")
        if not token:
            break
        # time.sleep(1)  # Avoid hitting API rate limits

    save_to_cache(cache_file, keyword, response_list)

    ytb_search_data = []
    for response in response_list:
        for item in response.get("items", []):
            video_data = extract_video_data(item, product_type, product_name)
            ytb_search_data.append(video_data)

    return ytb_search_data

def process_video_results(all_search_results: List[Dict], cache_file: str) -> List[Dict]:
    """
    Process video results, fetch stats, and update cache.
    """
    video_stats_cache = load_cache(cache_file)
    testing_search = []

    for result in all_search_results:
        video_id = result["video_id"]
        if video_id in video_stats_cache:
            print(f"Using cached data for video ID: {video_id}")
            video_stats = video_stats_cache[video_id]
        else:
            video_stats = get_video_stats(video_id)
            if video_stats:
                update_cache(video_stats_cache, video_id, video_stats, cache_file)
                print(f"Fetched and cached data for video ID: {video_id}")
            else:
                print(f"Failed to fetch data for video ID: {video_id}")
                continue

        result.update(video_stats)
        testing_search.append(result)

    return testing_search

def fetch_and_process_videos(product_type: str, product_name: str = "", videos_count: int = 1000) -> pd.DataFrame:
    """
    Fetch videos, process stats, fetch transcripts, and save results to a CSV file.
    """
    # Define cache file paths
    product_type_file = product_type.replace(" ", "_").lower()
    cache_file = os.path.join(CACHE_DIR, f"video_stats_cache_{product_type_file}.json")
    output_file = os.path.join(CACHE_DIR, f"ytb_search_results_{product_type_file}.csv")

    # Load cache if it exists, or initialize an empty cache
    if os.path.exists(cache_file):
        with open(cache_file, "r") as f:
            video_stats_cache = json.load(f)
    else:
        video_stats_cache = {}

    # Fetch videos by keyword
    print(f"Searching for videos related to '{product_type}'...")
    search_results = search_videos_by_keyword(
        youtube=youtube,
        product_type=product_type,
        product_name=product_name,
        videos_count=videos_count,
        order="relevance",
        region="US",
        language="en",
        video_duration="medium",
        published_start='2023-01-01T00:00:00Z',
        published_end='2024-12-31T23:59:59Z'
    )
    print(f"Found {len(search_results)} videos for {product_type}.")

    # Process each video
    all_search_results = []
    for result in search_results:
        video_id = result["video_id"]

        # Fetch video stats (from cache or API)
        if video_id in video_stats_cache:
            print(f"Using cached data for video ID: {video_id}")
            video_stats = video_stats_cache[video_id]
        else:
            video_stats = get_video_stats(video_id)
            if video_stats:
                # Fetch transcript
                transcript_data = get_transcript(video_id)
                video_stats.update(transcript_data)  # Add transcript to video stats

                # Update cache
                video_stats_cache[video_id] = video_stats
                with open(cache_file, "w") as f:
                    json.dump(video_stats_cache, f)
                print(f"Fetched and cached data for video ID: {video_id}")
            else:
                print(f"Failed to fetch data for video ID: {video_id}")
                continue  # Skip to the next result if API call fails

        # Update result with video stats and add to the list
        result.update(video_stats)
        all_search_results.append(result)

    print(f"Total videos processed: {len(all_search_results)}")

    # Convert to DataFrame and save
    ytb_search_df = pd.DataFrame(all_search_results)
    ytb_search_df.to_csv(output_file, index=False)
    print(f"Data saved to {output_file}")

    return ytb_search_df

def load_cache(cache_file: str) -> Dict:
    """
    Load cache from a file.
    """
    if os.path.exists(cache_file):
        with open(cache_file, "r") as f:
            return json.load(f)
    return {}

def update_cache(video_stats_cache: Dict, video_id: str, video_stats: Dict, cache_file: str) -> None:
    """
    Update cache with new video stats.
    """
    video_stats_cache[video_id] = video_stats
    with open(cache_file, "w") as f:
        json.dump(video_stats_cache, f)

def save_results_to_csv(testing_search: List[Dict], keyword: str) -> None:
    """
    Save results to a CSV file.
    """
    keyword_file = keyword.replace(" ", "_").lower()
    ytb_search_df = pd.DataFrame(testing_search)
    output_file = os.path.join(CACHE_DIR, f"ytb_search_results_{keyword_file}.csv")
    ytb_search_df.to_csv(output_file, index=False)
    print(f"Data saved to {output_file}")


def fetch_video_metadata(video_id: str, ydl: YoutubeDL, cache_file: str, video_info_cache: Dict) -> Optional[Dict]:
    """
    Fetch metadata for a video using yt-dlp and update the cache.
    """
    if video_id in video_info_cache:
        print(f"Skipping {video_id}: Already in cache.")
        return video_info_cache[video_id]

    url = f"https://www.youtube.com/watch?v={video_id}"
    try:
        print(f"Fetching info for {video_id}...")
        info = ydl.extract_info(url, download=False)
        video_info_cache[video_id] = info
        return info
    except Exception as e:
        print(f"Error fetching info for {video_id}: {e}")
        return None

In [43]:
products = [
    "COSRX Snail Mucin Serum",
    "Tesla Model Y",
    "Samsung OLED TV",
    "Liquid Death",
    "Stanley tumbler",
    "Apple AirTag",
    "Meta quest 3",
    "Samsung Galaxy Watch 7",
    "Macbook Pro M4",
    "Lululemon Align Pants",
]

In [44]:
for keyword in products[8:]:
    ytb_search_df = fetch_and_process_videos(product_type=keyword, videos_count=500)
    print(ytb_search_df.head())
    print(f"Finished processing {keyword} videos. Total videos: {len(ytb_search_df)}")
    

Searching for videos related to 'Macbook Pro M4'...
Page 1: 378396 videos found. Next page token: CDIQAA
Page 2: 378397 videos found. Next page token: CGQQAA
Page 3: 378396 videos found. Next page token: CJYBEAA
Page 4: 378395 videos found. Next page token: CMgBEAA
Page 5: 378395 videos found. Next page token: CPoBEAA
Page 6: 378395 videos found. Next page token: CKwCEAA
Page 7: 378395 videos found. Next page token: CN4CEAA
Page 8: 378395 videos found. Next page token: CJADEAA
Page 9: 378395 videos found. Next page token: CMIDEAA
Page 10: 378395 videos found. Next page token: CPQDEAA
Found 500 videos for Macbook Pro M4.
Error fetching transcript for video anIVewgtbFc: 
Could not retrieve a transcript for the video https://www.youtube.com/watch?v=anIVewgtbFc! This is most likely caused by:

Subtitles are disabled for this video

If you are sure that the described cause is not responsible for this error and that a transcript should be retrievable, please create an issue at https://github

In [None]:
def fetch_metadata_for_dataframe(df: pd.DataFrame, category: str) -> pd.DataFrame:
    """
    Fetch metadata for all videos in a DataFrame and update the cache.
    """
    # Define cache file path
    metadata_cache_file = os.path.join(CACHE_DIR, f"video_info_cache_{category}.json")

    # Load cache if it exists, or initialize an empty cache
    if os.path.exists(metadata_cache_file):
        with open(metadata_cache_file, "r") as f:
            video_info_cache = json.load(f)
    else:
        video_info_cache = {}

    # Initialize yt-dlp
    ydl_opts = {"quiet": True}
    ydl = YoutubeDL(ydl_opts)

    # Fetch metadata for each video
    metadata_list = []
    for idx, video_id in enumerate(df["video_id"]):
        metadata = fetch_video_metadata(video_id, ydl, metadata_cache_file, video_info_cache)
        metadata_list.append(metadata)

        # Save cache every 10 iterations
        if idx % 10 == 0:
            with open(metadata_cache_file, "w") as f:
                json.dump(video_info_cache, f, indent=4)

        # Delay to avoid rate limiting
        # time.sleep(1)

    # Final save of metadata cache
    with open(metadata_cache_file, "w") as f:
        json.dump(video_info_cache, f, indent=4)

    # Add metadata to the DataFrame
    df["metadata"] = metadata_list
    return df

# Example usage
if __name__ == "__main__":
    product_type = "lululemon align pants"
    ytb_search_df = fetch_and_process_videos(product_type=product_type, videos_count=500)

    # Fetch metadata as a post-processing step
    ytb_search_df = fetch_metadata_for_dataframe(ytb_search_df, category="fashion_50")

    # Save the updated DataFrame
    output_file = os.path.join(CACHE_DIR, f"ytb_search_results_with_metadata_{product_type.replace(' ', '_').lower()}.csv")
    ytb_search_df.to_csv(output_file, index=False)
    print(f"Data with metadata saved to {output_file}")

# Clean text dataset 

In [40]:
import pandas as pd

def add_cols(df: pd.DataFrame) -> pd.DataFrame:
    """
    Add new columns to the DataFrame and clean existing ones.
    """
    # Convert duration to seconds (e.g., PT4M20S to 260)
    try:
        df["duration_seconds"] = df["duration"].apply(lambda x: pd.to_timedelta(x).seconds)
    except Exception as e:
        print(f"Error converting duration to seconds: {e}")
        df["duration_seconds"] = None  # Fallback to None if conversion fails

    # Convert 'video_published' to datetime format
    df["video_published"] = pd.to_datetime(df["video_published"], errors="coerce")

    # Check for invalid dates
    if df["video_published"].isnull().any():
        print("Warning: Some values in 'video_published' could not be converted to datetime!")

    # Extract 'publish_month' in 'YYYY-MM' format
    df["publish_month"] = df["video_published"].dt.strftime("%Y-%m")

    # Convert numeric columns to numeric type
    numeric_cols = ["viewCount", "likeCount", "commentCount", "duration_seconds"]
    for col in numeric_cols:
        df[col] = pd.to_numeric(df[col], errors="coerce")

    # Sort the DataFrame by 'viewCount' in descending order
    df_sorted = df.sort_values(by=["viewCount"], ascending=False)
    return df_sorted

def drop_cols_rows(df: pd.DataFrame) -> pd.DataFrame:
    """
    Drop unnecessary columns and rows, and clean the DataFrame.
    """
    # Drop columns with all null values
    df = df.dropna(axis=1, how="all")

    # Drop rows with all null values
    df = df.dropna(how="all")

    # Drop duplicates, reset index, and sort by 'viewCount'
    df = df.drop_duplicates().reset_index(drop=True).sort_values(by=["viewCount"], ascending=False)
    # drop unnecessary columns
    try:
        df = df.drop(['liveBroadcastContent', 'contentRating', 'video_channel', 'title', 'video_channel_id', 'privacyStatus'], axis=1)
    except KeyError:
        pass

    # drop rows with null values in 'transcript'
    df = df.dropna(subset=["transcript"])

    
    # drop dumplicate columns
    df = df.loc[:,~df.columns.duplicated()]
    return df

In [41]:
# load the data
import glob
import os

csv_search_files = glob.glob(os.path.join(CACHE_DIR, "ytb_search_results*.csv"))
csv_search_files

['cache\\ytb_search_results_apple_airtag.csv',
 'cache\\ytb_search_results_cosrx_snail_mucin_serum.csv',
 'cache\\ytb_search_results_liquid_death.csv',
 'cache\\ytb_search_results_meta_quest_3.csv',
 'cache\\ytb_search_results_samsung_galaxy_watch_7.csv',
 'cache\\ytb_search_results_samsung_oled_tv.csv',
 'cache\\ytb_search_results_stanley_tumbler.csv',
 'cache\\ytb_search_results_tesla_model_y.csv']

In [38]:
csv_search_files[0]

'cache\\ytb_search_results_apple_airtag.csv'

In [39]:
pd.read_csv(csv_search_files[1]).columns

Index(['product_type', 'video_id', 'video_url', 'video_title',
       'video_description', 'video_channel', 'video_channel_id',
       'video_published', 'video_thumbnail', 'title', 'description',
       'channelTitle', 'publishTime', 'channelId', 'thumbnail', 'tags',
       'topicCategories', 'categoryId', 'liveBroadcastContent', 'duration',
       'has_caption', 'licensedContent', 'definition', 'contentRating',
       'madeForKids', 'privacyStatus', 'viewCount', 'likeCount',
       'commentCount', 'favoriteCount', 'paidProductPlacement', 'transcript',
       'transcript_is_generated', 'duration_seconds', 'publish_month'],
      dtype='object')

In [42]:
# clean the data and save it
for csv_search_file in csv_search_files:
    df = pd.read_csv(csv_search_file)
    df = add_cols(df)
    df = drop_cols_rows(df)
    name_f = "cleaned_" + os.path.basename(csv_search_file)
    df.to_csv(os.path.join(CACHE_DIR, name_f), index=False)
    print(f"Data cleaned and saved to {os.path.join(CACHE_DIR, name_f)}")
    # remove timezone information
    df["video_published"] = df["video_published"].dt.tz_localize(None)
    # save to excel
    df.to_excel(os.path.join(CACHE_DIR, name_f.replace(".csv", ".xlsx")), index=False)
    print(f"Data cleaned and saved to {os.path.join(CACHE_DIR, name_f.replace('.csv', '.xlsx'))}")

Data cleaned and saved to cache\cleaned_ytb_search_results_apple_airtag.csv
Data cleaned and saved to cache\cleaned_ytb_search_results_apple_airtag.xlsx
Data cleaned and saved to cache\cleaned_ytb_search_results_cosrx_snail_mucin_serum.csv
Data cleaned and saved to cache\cleaned_ytb_search_results_cosrx_snail_mucin_serum.xlsx
Data cleaned and saved to cache\cleaned_ytb_search_results_liquid_death.csv
Data cleaned and saved to cache\cleaned_ytb_search_results_liquid_death.xlsx
Data cleaned and saved to cache\cleaned_ytb_search_results_meta_quest_3.csv
Data cleaned and saved to cache\cleaned_ytb_search_results_meta_quest_3.xlsx
Data cleaned and saved to cache\cleaned_ytb_search_results_samsung_galaxy_watch_7.csv
Data cleaned and saved to cache\cleaned_ytb_search_results_samsung_galaxy_watch_7.xlsx
Data cleaned and saved to cache\cleaned_ytb_search_results_samsung_oled_tv.csv
Data cleaned and saved to cache\cleaned_ytb_search_results_samsung_oled_tv.xlsx
Data cleaned and saved to cache\cl