In [1]:
# pip install google-api-python-client 

In [None]:
# Install if needed
# !pip install google-api-python-client pandas

# Imports
import pandas as pd
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import pandas as pd
import json
import time


# API Key (Replace with actual API key)
API_KEY = "API_KEY" 

# Build YouTube API service
youtube = build("youtube", "v3", developerKey=API_KEY)

# CSV file to store data for each channel
csv_file = "yt_data_lisa_j.csv"

# Progress tracking file
progress_file = "progress_lisa_j.json"

In [None]:
# Functions to Get Channel Uploads Playlist ID and Video IDs

def get_uploads_playlist_id(channel_id):
    """Gets the uploads playlist ID for a given channel ID."""
    try:
        request = youtube.channels().list(
            part="contentDetails",
            id=channel_id
        )
        response = request.execute()

        if response and response['items']:
            uploads_playlist_id = response['items'][0]['contentDetails']['relatedPlaylists']['uploads']
            return uploads_playlist_id
        else:
            return None
    except HttpError as e:
        print(f"Error retrieving uploads playlist ID: {e}")
        return None

def get_video_ids_from_playlist(playlist_id, max_results=50):
    """Gets video IDs from a playlist ID."""
    video_ids = []
    nextPageToken = None

    try:
        while True:
            request = youtube.playlistItems().list(
                part="contentDetails",
                playlistId=playlist_id,
                maxResults=max_results,
                pageToken=nextPageToken
            )
            response = request.execute()

            if response and response['items']:
                for item in response['items']:
                    video_ids.append(item['contentDetails']['videoId'])

                nextPageToken = response.get('nextPageToken')
                if not nextPageToken:
                    break
            else:
                break

        return video_ids
    except HttpError as e:
        print(f"Error retrieving video IDs: {e}")
        return []


In [4]:
def get_video_data(video_id):
    """Gets video details for a video ID, excluding comments."""
    try:
        # Get video details
        video_request = youtube.videos().list(
            part="snippet,contentDetails,statistics,status",
            id=video_id
        )
        video_response = video_request.execute()

        if video_response and video_response['items']:
            item = video_response['items'][0]
            snippet = item.get('snippet', {})
            content_details = item.get('contentDetails', {})
            statistics = item.get('statistics', {})
            status = item.get('status', {})

            video_data = {
                "video_id": item['id'],
                "title": snippet.get('title', 'N/A'),
                "description": snippet.get('description', 'N/A'),
                "published_at": snippet.get('publishedAt', 'N/A'),
                "duration": content_details.get('duration', 'N/A'),
                "definition": content_details.get('definition', 'N/A'),
                "dimension": content_details.get('dimension', 'N/A'),
                "caption": content_details.get('caption', 'N/A'),
                "licensedContent": content_details.get('licensedContent', 'N/A'),
                "view_count": statistics.get('viewCount', 0),
                "like_count": statistics.get('likeCount', 0),
                "dislike_count": statistics.get('dislikeCount', 0),
                "comment_count": statistics.get('commentCount', 0),
                "privacy_status": status.get('privacyStatus', 'N/A'),
            }
            return video_data

        else:
            return None

    except HttpError as e:
        print(f"Error retrieving data for {video_id}: {e}")
        return None
    except Exception as e:
        print(f"Unexpected error retrieving data for {video_id}: {e}")
        return None

In [None]:
# Functions to Save and Load Progress

def save_progress(video_id):
    """Saves the last processed video ID to a JSON file."""
    with open(progress_file, "w") as f:
        json.dump({"last_processed_video_id": video_id}, f)

def load_progress():
    """Loads the last processed video ID from a JSON file."""
    try:
        with open(progress_file, "r") as f:
            data = json.load(f)
            return data.get("last_processed_video_id")
    except FileNotFoundError:
        return None


In [None]:
# Main Execution Logic

# Replace with the specific channel ID
CHANNEL_ID = "UCY8LkGSO_34lHxujnvATGAw"  # lisa j 

uploads_playlist_id = get_uploads_playlist_id(CHANNEL_ID)

if uploads_playlist_id:
    video_ids = get_video_ids_from_playlist(uploads_playlist_id)

    last_processed_video_id = load_progress()

    if last_processed_video_id:
        try:
            start_index = video_ids.index(last_processed_video_id) + 1
            print(f"Resuming from video ID: {last_processed_video_id}")
            video_ids_to_process = video_ids[start_index:]
        except ValueError:
            print(f"Video ID {last_processed_video_id} not found. Starting from the beginning.")
            video_ids_to_process = video_ids
    else:
        video_ids_to_process = video_ids

    df_videos = pd.DataFrame()

    try:
        df_videos = pd.read_csv(csv_file)
    except FileNotFoundError:
        df_videos = pd.DataFrame()

    for video_id in video_ids_to_process:
        video_data = get_video_data(video_id)
        if video_data:
            df_videos = pd.concat([df_videos, pd.DataFrame([video_data])], ignore_index=True)
            save_progress(video_id)
            time.sleep(1) #Avoid quota issues.
        else:
            print(f"Skipping {video_id} due to errors.")
        df_videos.to_csv(csv_file, index=False)
    print("Finished processing.")

else:
    print("Could not retrieve uploads playlist ID.")

Finished processing.
