In [None]:
# Put your API key in a .txt file in this folder
with open("API_KEY.txt", "r") as file:
    API_KEY = file.read().strip()

#pip install --upgrade google-api-python-client

In [None]:
import os
import csv
from googleapiclient.discovery import build

CSV_FILE = "video_data.csv"

youtube = build('youtube', 'v3', developerKey=API_KEY)

def initialize_csv(csv_file):
    if not os.path.exists(csv_file):
        with open(csv_file, mode='w', newline='', encoding='utf-8') as file:
            writer = csv.DictWriter(file, fieldnames=["video_id", "title", "views", "tags", "channel_name", "subscribers", "search_query", "thumbnail_url"])
            writer.writeheader()
        print(f"Created CSV file: {csv_file}")
    else:
        print(f"CSV file already exists: {csv_file}")

def load_existing_video_ids(csv_file):
    if not os.path.exists(csv_file):
        return set()

    with open(csv_file, mode='r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        return {row["video_id"] for row in reader}

def collect_video_data(query, max_results=100, csv_file=CSV_FILE):
    """Collect video data from YouTube API with pagination."""
    initialize_csv(csv_file)

    existing_ids = load_existing_video_ids(csv_file)
    print(f"Loaded {len(existing_ids)} existing video IDs.")

    video_data = []
    count = 0
    next_page_token = None

    while count < max_results:
        search_request = youtube.search().list(q=query, part="id,snippet", maxResults=50, pageToken=next_page_token)
        search_response = search_request.execute()
        for item in search_response['items']:
            # Check if the item is a video (not a playlist or channel)
            if item['id']['kind'] != 'youtube#video':
                continue

            video_id = item['id']['videoId']
            
            # Skip already collected videos
            if video_id in existing_ids:
                continue
            
            # Get the video information
            video_request = youtube.videos().list(part="snippet,statistics", id=video_id)
            video_response = video_request.execute()
            
            video_info = video_response['items'][0]
            title = video_info['snippet']['title']
            views = video_info['statistics'].get('viewCount', 'Unknown')
            tags = video_info['snippet'].get('tags', [])
            channel_name = video_info['snippet']['channelTitle']
            
            # Get the highest quality thumbnail available (maxres -> high -> standard -> default)
            thumbnail_info = video_info['snippet']['thumbnails']
            thumbnail_url = thumbnail_info.get('maxres', thumbnail_info.get('high', thumbnail_info.get('standard', thumbnail_info.get('default')))).get('url')

            # Get channel subscribers
            channel_request = youtube.channels().list(part="statistics", id=video_info['snippet']['channelId'])
            channel_response = channel_request.execute()
            subscribers = channel_response['items'][0]['statistics'].get('subscriberCount', 'Unknown')

            # Append data
            video_data.append({
                "video_id": video_id,
                "title": title,
                "views": views,
                "tags": "|".join(tags),  # Join tags as a string
                "channel_name": channel_name,
                "subscribers": subscribers,
                "search_query": query,
                "thumbnail_url": thumbnail_url
            })

            count += 1
            if count >= max_results:
                break

        # Check for next page
        next_page_token = search_response.get('nextPageToken', None)
        if not next_page_token:
            break

    # Save the data to CSV
    with open(csv_file, mode='a', newline='', encoding='utf-8') as file:
        writer = csv.DictWriter(file, fieldnames=["video_id", "title", "views", "tags", "channel_name", "subscribers", "search_query", "thumbnail_url"])
        writer.writerows(video_data)

    print(f"\nData saved to {csv_file}. New videos collected: {len(video_data)}")
    return video_data

# Parameters to change
query = "2hollis type beat"
_ = collect_video_data(query, max_results=50)  # Change max_results (this uses credits)


Created CSV file: video_data.csv
Loaded 0 existing video IDs.


ModuleNotFoundError: No module named 'isodate'

In [42]:
# Download thumbnails (skips already downloaded ones)
import os
import requests
import pandas as pd

# Folder where thumbnails will be saved
THUMBNAIL_DIR = "thumbnails"

def download_thumbnail(thumbnail_url, video_id):
    """Download the thumbnail and return the local file path."""
    if not thumbnail_url:
        return None

    try:
        if not os.path.exists(THUMBNAIL_DIR):
            os.makedirs(THUMBNAIL_DIR)

        # Request to download the thumbnail given url
        response = requests.get(thumbnail_url)
        response.raise_for_status()

        file_path = os.path.join(THUMBNAIL_DIR, f"{video_id}.jpg")
        with open(file_path, 'wb') as f:
            f.write(response.content)

        return file_path # To reference in csv

    except requests.exceptions.RequestException as e:
        print(f"Error downloading thumbnail for video {video_id}: {e}")
        return None


def update_thumbnails_in_csv(csv_file="video_data.csv"):
    # Read the CSV file into a DataFrame
    df = pd.read_csv(csv_file)

    # Check if the 'thumbnail_path' column exists otherwise create it
    if 'thumbnail_path' not in df.columns:
        df['thumbnail_path'] = None

    # Loop through rows and update the thumbnail if missing
    for index, row in df.iterrows():
        if pd.isna(row['thumbnail_path']):  # If 'thumbnail_path' is missing
            local_thumbnail_path = download_thumbnail(row['thumbnail_url'], row['video_id'])
            if local_thumbnail_path:
                df.at[index, 'thumbnail_path'] = local_thumbnail_path  # Update the path in the DataFrame

    # Save the updated DataFrame back to the CSV
    df.to_csv(csv_file, index=False)
    print(f"CSV updated. {len(df)} rows processed.")

update_thumbnails_in_csv("video_data.csv")


CSV updated. 300 rows processed.


In [43]:
# Remove tutorials and add search volume

import pandas as pd

data = pd.read_csv("video_data.csv")

data_new = data.sort_values(by="views", ascending=False) # Highest views first

strings_to_remove = ["how to", "tutorial", "how"]
data_new = data_new[~data['title'].str.lower().str.contains('|'.join(strings_to_remove), case=False, na=False)]

# Search term volume
search_volume_dict = {
    "2hollis type beat": 11237409,
    "nate sib type beat": 261955
}

# Add the search_volume column based on the search_query column
data_new["search_volume"] = data_new["search_query"].str.lower().map(search_volume_dict).fillna(0)

data_new.to_csv("data_final.csv", index=False, encoding="utf-8")

  data_new = data_new[~data['title'].str.lower().str.contains('|'.join(strings_to_remove), case=False, na=False)]
