**YouTube Data Extraction**

*   "###" - Comment
*   "### abc ###" - Comment with Actions
*   "#" - Redacted Code
*   "#" following code - In Code Comment



**1. Set Up**

In [None]:
### Import relevant libraries
import os
import re
from datetime import datetime, timedelta
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import numpy as np
import psutil
import time
from tqdm import tqdm
from googleapiclient.errors import HttpError
from google.colab import files
!pip install youtube-transcript-api
from youtube_transcript_api import YouTubeTranscriptApi


In [None]:
### API KEYS - Portion some API Keys to be used for Channel and Video datasets
###            and some API Keys to be used in the list for Comment dataset.
### This is an area to store API Keys for Access during development.
### Please use a secure location to store API Keys when this file is not active.
### 1.
### 2.
### 3.
### 4.
### 5.
### 6.
### 7.
### 8.
### 9.
### 10.
### 11.
### 12.

In [None]:
# Check your RAM capacity
ram_gb = psutil.virtual_memory().total / (1024**3)
print(f'Total RAM: {ram_gb:.2f} GB')

In [None]:
### Initialise YouTube API v3
### Insert API Key below ###
api_key = ''

### Install the google-api-python-client package
!pip install google-api-python-client

### Verify the installation by importing the package
from googleapiclient.discovery import build

### If you don't see any errors, the installation was successful
print("google-api-python-client installed and imported successfully!")

In [None]:
### Import API Key
from googleapiclient.discovery import build

### Build "youtube" function to use Youtube api, using api_key
youtube = build('youtube','v3',developerKey=api_key)

**2. Getting Videos for Channels**

In [None]:
### Getting the list of channel ID's

### List of Channel URL's ###
channel_urls = ['']
channel_ids = []

### Get the channel ID using the custom URL
for custom_url in channel_urls:
    request = youtube.search().list(part='snippet', q=custom_url, type='channel')
    response = request.execute()

    ### Check if the channel was found
    if response['pageInfo']['totalResults'] > 0:
        channel_id = response['items'][0]['snippet']['channelId']
        channel_ids.append(channel_id)

        ### Get detailed statistics using the channel ID
        request = youtube.channels().list(part='contentDetails,statistics', id=channel_id)
        response = request.execute()

        print(f"Channel: {custom_url}")
        print(response)
        print()
    else:
        print(f"Channel not found: {custom_url}")


### Output the list of channel IDs
print("Channel IDs:", channel_ids)

In [None]:
### Getting list of videos on a channel

### Loop for "Channel IDs"

### Initialise
### Insert Start and End Dates as desired ###
start_date = '2020-01-01'
end_date = '2020-12-31'

def get_videos_in_date_range(youtube, channel_id, start_date, end_date):
    video_ids = []
    start_date = datetime.strptime(start_date, '%Y-%m-%d').isoformat() + 'Z'
    end_date = datetime.strptime(end_date, '%Y-%m-%d').isoformat() + 'Z'

    next_page_token = None
    while True:
        request = youtube.search().list(
            part='id',
            channelId=channel_id,
            publishedAfter=start_date,
            publishedBefore=end_date,
            maxResults=50,
            pageToken=next_page_token,
            type='video'
        )
        response = request.execute()

        for item in response['items']:
            video_id = item['id']['videoId']
            video_ids.append(video_id)

        next_page_token = response.get('nextPageToken')
        if next_page_token is None:
            break

    return video_ids

### Get videos in the date range
# video_ids = get_videos_in_date_range(youtube, channel_id, start_date, end_date)

### Output the video IDs
# video_ids

### Loop through each Channel ID and get videos in the date range
all_video_ids = {}
for channel_id in channel_ids:
    video_ids = get_videos_in_date_range(youtube, channel_id, start_date, end_date)
    all_video_ids[channel_id] = video_ids

### Output the video IDs for each channel
for channel_id, video_ids in all_video_ids.items():
    print(f"Channel ID: {channel_id} - Video IDs: {video_ids}")

In [None]:
### Checking number of videos in date range for each channel

### all_video_ids is the dictionary containing first part (channel_id), and within each channel_id is video_ids

for channel_id, video_ids in all_video_ids.items():
    print(f"Channel ID: {channel_id} - Number of Video IDs: {len(video_ids)}")

In [None]:
### Count the number of videos
# video_count = len(video_ids)
# print(f"Total number of videos: {video_count}")

In [None]:
### Function to get video details
def get_video_details(video_id):
    request = youtube.videos().list(
        part='snippet,statistics',
        id=video_id
    )
    response = request.execute()
    return response

In [None]:
### Function to get video transcript
def get_video_transcript(video_id):
    try:
        transcript = YouTubeTranscriptApi.get_transcript(video_id)
        return " ".join([entry['text'] for entry in transcript])
    except Exception as e:
        return f"Error retrieving transcript: {str(e)}"

**3. Getting Video Data**

In [None]:
### Initialise YouTube API v3
### Insert API Key ###
api_key = ''

### Install the google-api-python-client package
!pip install google-api-python-client

### Verify the installation by importing the package
from googleapiclient.discovery import build

### If you don't see any errors, the installation was successful
print("google-api-python-client installed and imported successfully!")

In [None]:
### Build "youtube" function to use Youtube api, using api_key
youtube = build('youtube','v3',developerKey=api_key)

In [None]:
### List to store video details
video_data = []

### Loop through all channels and their video IDs to get details
for channel, video_ids in tqdm(all_video_ids.items(), desc="Channels"):
    for video_id in tqdm(video_ids, desc=f"Videos in {channel}", leave=False):
        try:
            video_details = get_video_details(video_id)
            snippet = video_details['items'][0]['snippet']
            statistics = video_details['items'][0]['statistics']
            content_details = video_details['items'][0]['contentDetails']
            transcript = get_video_transcript(video_id)
            video_data.append({
                'channel_id': snippet['channelId'],
                'video_id': video_id,
                'title': snippet['title'],  # Added title
                'description': snippet['description'],
                'published_at': snippet['publishedAt'],
                'duration': content_details['duration'],  # Added duration
                'likes': statistics.get('likeCount', 'N/A'),
                'dislikes': statistics.get('dislikeCount', 'N/A'),
                'views': statistics.get('viewCount', 'N/A'),
                'comment_count': statistics.get('commentCount', 'N/A'),  # Added comment count
                'transcript': transcript
            })
        except Exception as e:
            video_data.append({
                'channel_id': 'N/A',
                'video_id': video_id,
                'title': 'N/A',  # Added title in error case
                'description': f"Error retrieving description: {str(e)}",
                'published_at': 'N/A',
                'duration': 'N/A',  # Added duration in error case
                'likes': 'N/A',
                'dislikes': 'N/A',
                'views': 'N/A',
                'comment_count': 'N/A',  # Added comment count in error case
                'transcript': f"Error retrieving transcript: {str(e)}"
            })

In [None]:
### Create a DataFrame
df = pd.DataFrame(video_data, columns=['channel_id', 'video_id', 'description', 'likes', 'dislikes', 'transcript'])

### Convert the pandas DataFrame to a Dask DataFrame
ddf = dd.from_pandas(df, npartitions=1)

### Display the top 5 rows of the Dask DataFrame
print(ddf.head(5))

In [None]:
### Check Final Rows
print(ddf.tail(5))

In [None]:
### Define the CSV file path
csv_path = '/mnt/data/video_details.csv'

### Remove the file if it already exists
if os.path.exists(csv_path):
    os.remove(csv_path)

### Save the Dask DataFrame to a CSV file
ddf.to_csv(csv_path, single_file=True)

### Download the CSV file
files.download(csv_path)

**3. Scraping Comments for Videos**

In [None]:
### List of API keys from different projects
### Insert Remaining API Keys for Comment Dataset
api_keys = ['']
current_api_key_index = 0

### Function to initialize the YouTube API client
def initialize_youtube(api_key):
    return build('youtube', 'v3', developerKey=api_key)

youtube = initialize_youtube(api_keys[current_api_key_index])

def rotate_api_key():
    global current_api_key_index, youtube
    current_api_key_index = (current_api_key_index + 1) % len(api_keys)
    youtube = initialize_youtube(api_keys[current_api_key_index])

def getcomments(video):
    comments = []
    while True:
        try:
            request = youtube.commentThreads().list(
                part="snippet",
                videoId=video,
                maxResults=100
            )

            response = request.execute()

            while request is not None:
                for item in response['items']:
                    comment = item['snippet']['topLevelComment']['snippet']
                    public = item['snippet']['isPublic']
                    comments.append([
                        comment['authorDisplayName'],
                        comment['publishedAt'],
                        comment['likeCount'],
                        comment['textOriginal'],
                        comment['videoId'],
                        public
                    ])

                if 'nextPageToken' in response:
                    request = youtube.commentThreads().list(
                        part="snippet",
                        videoId=video,
                        maxResults=100,
                        pageToken=response['nextPageToken']
                    )
                    response = request.execute()
                else:
                    request = None
            break

        except HttpError as e:
            if e.resp.status == 404:
                print(f"Video not found: {video}, skipping...")
                break
            elif e.resp.status == 400:
                print(f"Invalid request for video: {video}, skipping...")
                break
            elif e.resp.status == 403:
                error_reason = str(e)
                if 'commentsDisabled' in error_reason:
                    print(f"Comments are disabled for video: {video}")
                    break
                elif 'quotaExceeded' in error_reason:
                    print("Quota exceeded, rotating API key and waiting...")
                    rotate_api_key()
                    time.sleep(60)  # Wait for a minute before retrying
                else:
                    raise
            else:
                raise

    return comments



### Initialize an empty Dask DataFrame
meta = pd.DataFrame(columns=['author', 'updated_at', 'like_count', 'text', 'video_id', 'public'])
ddf = dd.from_pandas(meta, npartitions=1)

### Collect comments and construct the DataFrame in chunks
for channel_id, video_ids in tqdm(all_video_ids.items(), desc='Channels'):
    for video_id in tqdm(video_ids, desc=f'Videos in Channel {channel_id}', leave=False):
        comments = getcomments(video_id)
        if comments:  # Only proceed if comments were successfully fetched
            df_chunk = pd.DataFrame(comments, columns=['author', 'updated_at', 'like_count', 'text', 'video_id', 'public'])
            ddf_chunk = dd.from_pandas(df_chunk, npartitions=1)
            ddf = dd.concat([ddf, ddf_chunk])


In [None]:
ddf.tail(5)

In [None]:
### Compute the final DataFrame with progress bar
with ProgressBar():
    final_df = ddf.compute()



In [None]:
final_df.head(5)

In [None]:
# Replace problematic characters if needed
final_df['text'] = final_df['text'].str.replace('\n', ' ')

# Save the DataFrame to a CSV file
final_df.to_csv('all_comments.csv', index=False, escapechar='\\')

In [None]:
# Save the DataFrame to a CSV file
csv_filename = 'all_comments.csv'
final_df.to_csv(csv_filename, index=False, escapechar='\\')

# Download the CSV file
files.download(csv_filename)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

**4. Optional Merge**

In [None]:
### Merge Video Dataset and Comment Dataset
full_data = dd.merge(ddf, final_df, on='video_id', how='inner')