# API Source

https://rapidapi.com/ytjar/api/yt-api

# Install packages

In [None]:
!pip install requests

# Import the libraries

In [1]:
import time
import requests
import pandas as pd
from pprint import pprint

# Check to see if Data.csv already exists or else create a new dataframe

In [3]:
try:
    df = pd.read_csv('Data.csv')
except Exception as e:
    df = pd.DataFrame(columns=["Type", "Channel Name", "Title", "Channel ID", "Views", "Publication Date", "Duration"])

# Dataframe first 5 rows (if any)

In [11]:
df.head()

Unnamed: 0,Type,Channel Name,Title,Channel ID,Views,Publication Date,Duration
0,video,Brian Weissman,Unlock Consistent Power in Disc Golf: Discover...,Unknown,5763,2025-02-12,436
1,video,Brian Weissman,Fix Your Pesky Back Foot Disc Golf Throw,Unknown,36925,2025-02-12,923
2,video,Brian Weissman,X-Step Mastery Disc Golf Form Evolution,Unknown,695,2025-02-24,330
3,video,Brian Weissman,INSANE Disc Golf Drills To Level Up Your Game,Unknown,597,2025-02-23,733
4,video,Brian Weissman,What's your favorite disc for long bombs?,Unknown,900,2025-02-22,960


# API call

In [None]:
# using geo and type_ to extract various combinations of location and types
def get_api_data(geo='US', type_='now'):
    url = "https://yt-api.p.rapidapi.com/trending"
    querystring = {"geo":geo,"type":type_}

    headers = {
        "x-rapidapi-key": "70a934f756mshb7067f375e69142p15cd59jsnff61e7bc025c",
        "x-rapidapi-host": "yt-api.p.rapidapi.com"
    }

    response = requests.get(url, headers=headers, params=querystring).json()
    return response['data']

# Function to extract the necessary data points

In [None]:
def get_data(data):
    videos = []

    for d in data:
        if 'channelTitle' not in d:
            data_process = d['data']
            for dp in data_process:
                try:
                    channel_name = dp.get('channelTitle', d['title'])
                    view_count = dp.get('viewCount', dp.get('viewCountText', ''))
                    channel_id = dp.get('channelId', '')
                    publish_date = dp.get('publishDate', '')
                    duration = dp.get('lengthText', '')
    
                    videos.append([
                        dp['type'],
                        channel_name,
                        dp['title'],
                        channel_id,
                        view_count,
                        publish_date,
                        duration
                    ])
                except KeyError as e:
                    print(f"KeyError: {e} in {dp}")
        else:
            videos.append([
                d['type'],
                d['channelTitle'],
                d['title'],
                d['channelId'],
                d['viewCount'],
                d['publishDate'],
                d['lengthText']
            ])
            
    return videos

# Make the function call and convert the list of lists to pandas dataframe

In [None]:
def get_fresh_data(data):
    videos_data = get_data(data)
    
    return pd.DataFrame(videos_data, columns=["Type", "Channel Name", "Title", "Channel ID", "Views", "Publication Date", "Duration"])

# Clean columns

In [None]:
import pandas as pd

def clean_columns(df):
    # Clean the 'Views' column
    def clean_views(view):
        if pd.isna(view):
            return 0
        view = str(view).replace('views', '').strip()
        if 'B' in view:
            return int(float(view.replace('B', '')) * 1000000000)
        elif 'M' in view:
            return int(float(view.replace('M', '')) * 1000000)
        elif 'K' in view:
            return int(float(view.replace('K', '')) * 1000)
        else:
            return int(view) if view.isdigit() else 0

    df['Views'] = df['Views'].apply(clean_views)

    # Convert 'Publication Date' to a consistent format
    df['Publication Date'] = pd.to_datetime(df['Publication Date']).dt.strftime('%Y-%m-%d')

    # Clean the 'Duration' column
    def clean_duration(duration):
        # if pd.isna(duration):
        #     return 0
        duration = str(duration).replace('video', '').strip()
        parts = duration.split(':')
        if len(parts) == 2:  # MM:SS
            minutes, seconds = map(int, parts)
            return minutes * 60 + seconds
        elif len(parts) == 3:  # HH:MM:SS
            hours, minutes, seconds = map(int, parts)
            return hours * 3600 + minutes * 60 + seconds
        else:
            return duration if duration != 'SHORTS' else 0

    df['Duration'] = df['Duration'].apply(clean_duration)

    # Fill NaN cells
    df['Channel Name'] = df['Channel Name'].fillna('Unknown')
    df['Channel ID'] = df['Channel ID'].fillna('Unknown')
    df['Views'] = df['Views'].fillna(0)
    df['Duration'] = df['Duration'].fillna(0)

    return df

# Get the API response

In [None]:
data = get_api_data()

# First data point overview

In [None]:
data[0]

# Process the data

In [None]:
df_new = get_fresh_data(data)


# Concatenate the two dataframes

In [None]:
df = pd.concat([df, df_new], ignore_index=True)

# Remove duplicates

In [None]:
print(f'Original df size: {len(df)}')
df = df.drop_duplicates()
print(f'New df size: {len(df)}')

# Data overview

In [None]:
df

# Clean dataframe

In [None]:
clean_df = clean_columns(df)

In [None]:
clean_df

# Save the df to Data.csv

In [None]:
clean_df.to_csv('Data.csv', index=False)

# Periodic run

##### As the monthly limit of the API is 300 requests and trending videos often tend to stay on YouTube for atleast 24 hours, but to be safe we will be making the API call every 12 hours (for cloud/server)

In [None]:
def periodic_run(df):
    # looping over different combinations of geo and types to extract worldwide data
    geo = ['US', 'GB', 'AU'] # 'IN', 'JP', 'SA', 'RU'
    types = ['music', 'games', 'movies'] # 'now'

    # 3 * 3 = 9 API calls
    # 9 * 30 days = 270 calls per month (limit is 300)

    # loop over every location
    for g in geo:
        # for every location process all the types
        for typ in types:
            print(f'Geo: {g} - type: {typ}\n')
            # get the API response
            data = get_api_data(g, typ)
            
            # process the data
            df_new = get_fresh_data(data)
            
            # merge the dataframes
            df = pd.concat([df, df_new], ignore_index=True)
            
            # remove duplicates (if any)
            df = df.drop_duplicates()
            
            # clean the data
            clean_df = clean_columns(df)
            
    # save the updated dataframe
    clean_df.to_csv('Data.csv', index=False)

    print(f'Data.csv updated and saved - df size: {clean_df.shape}')    
    return clean_df

In [None]:
# run this cell to call the API every 24 hours
clean_df = ''

while True:
# try to make the request
# in case of an error the program won't crash and retry
    try:
        df = periodic_run(df)
        
        # sleep for 24 hours
        print('Sleeping for 24 hours')
        time.sleep(24 * 60 * 60)
        df.head()
    except:
        continue

In [None]:
clean_df.tail()