In [None]:
from googleapiclient.discovery import build
import pandas as pd
import os
import boto3
import io
from dotenv import load_dotenv

Retreiving the API keys and AWS Credentials from the .env file

In [None]:
# Retrieve the API key and channel ID
api_key = os.getenv("YOUTUBE_API_KEY").strip()
channel_id = os.getenv("CHANNEL_ID").strip()
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
aws_region = os.getenv("AWS_REGION")

#Initialize the S3 client
s3_client = boto3.client(
    's3',
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name=aws_region)

# Initialize YouTube API client
youtube = build('youtube', 'v3', developerKey=api_key)
print(aws_access_key_id)

Fetch Channel Details

In [None]:


def channel_details(youtube, channel_id):
    nextPageToken = None
    while True: 
            # Fetch video ids from the specified channel using pagination
        response_1 = youtube.channels().list(
                id=channel_id,
                part="snippet, contentDetails, statistics",
                pageToken=nextPageToken

            ).execute()

        nextPageToken = response_1.get('nextPageToken')

        if not nextPageToken:
            break

    return response_1
channel_details = channel_details(youtube, channel_id)
channel_details_df = pd.DataFrame(channel_details['items'])
bucket_name = 'andrew-huberman-podcast-analytics'
s3_key = 'channeldetails/channeldetails.csv'

# Convert DataFrame to CSV in memory
csv_buffer = io.StringIO()
channel_details_df.to_csv(csv_buffer, index=False)

s3_client.put_object(
    Bucket=bucket_name,
    Key=s3_key,
    Body=csv_buffer.getvalue()
)


Fetch the video ids details from the channel

In [None]:
def get_videoids(youtube, channel_id):
    nextPageToken = None
    videoids = []
    
    while True: 
            # Fetch video ids from the specified channel using pagination
        response_1 = youtube.search().list(
                channelId=channel_id,
                type = 'video',
                part="id,snippet",
                maxResults = 50,
                pageToken=nextPageToken

            ).execute()

        nextPageToken = response_1.get('nextPageToken')

        for i in response_1['items']:
            videoids.append(i['id']['videoId'])
            

        if not nextPageToken:
            break

    return videoids

# Get the total number of videos
videoids = get_videoids(youtube, channel_id)
# Create a DataFrame from the video IDs and save it to a CSV file with an index, overwriting the existing file
videoids_df = pd.DataFrame(videoids, columns=['video_id'])
bucket_name = 'andrew-huberman-podcast-analytics'
s3_key = 'videoids/videoids.csv'

# Convert DataFrame to CSV in memory
csv_buffer_videoids = io.StringIO()
videoids_df.to_csv(csv_buffer_videoids, index=True)

s3_client.put_object(
    Bucket=bucket_name,
    Key=s3_key,
    Body=csv_buffer_videoids.getvalue()
)




Fetch video details From using video ids of channel

In [None]:
def get_videos(youtube, video_ids):
    all_videos = []
    # Process in chunks of 50 IDs
    for chunk in chunk_video_ids(video_ids, 50):
        response = youtube.videos().list(
            id=','.join(chunk),
            part="snippet,contentDetails,statistics"
        ).execute()
        all_videos.extend(response.get('items', []))

    return all_videos

# Helper function to break video IDs into 50 
def chunk_video_ids(video_ids, chunk_size=50):
    for i in range(0, len(video_ids), chunk_size):
        yield video_ids[i:i + chunk_size]

  # Example IDs
videos = get_videos(youtube, videoids)
print(len(videos))
videos_df = pd.DataFrame(videos)

bucket_name = 'andrew-huberman-podcast-analytics'
s3_key = 'videos/videos.csv'

# Convert DataFrame to CSV in memory
csv_buffer_videos = io.StringIO()
videos_df.to_csv(csv_buffer_videos, index=False)

s3_client.put_object(
    Bucket=bucket_name,
    Key=s3_key,
    Body=csv_buffer_videos.getvalue()
)



Fetch comments for all videos

In [None]:

def get_all_comments(youtube, video_id):
    # Fetch comments from the specified video
    PageToken = None
    comments_list = []
    while True:
        response = youtube.commentThreads().list(
            part="snippet,replies",
            videoId=video_id,
            maxResults=100,
            pageToken=PageToken
        ).execute()

        comments_list.extend(response['items'])

        PageToken = response.get('nextPageToken')

        if not PageToken:
            break
    return comments_list

df = pd.read_csv('videoids/videoids.csv')

bucket_name = 'andrew-huberman-podcast-analytics'
for i in range(200, 280):
    video_id = df.loc[i]['video_id']
    comment_details = get_all_comments(youtube, video_id)
    comments_df = pd.DataFrame(comment_details)

    # Convert to parquet format in memory
    parquet_buffer = io.BytesIO()
    comments_df.to_parquet(parquet_buffer)

    # Upload to S3
    s3_key = f'comments/{video_id}_{i}.parquet'
    s3_client.put_object(
        Bucket=bucket_name,
        Key=s3_key,
        Body=parquet_buffer.getvalue(),
)
    print(f'completed file {i} and uploaded to S3')