In [2]:
pip install yt_dlp

Collecting yt_dlp
  Downloading yt_dlp-2025.2.19-py3-none-any.whl.metadata (171 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m171.9/171.9 kB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading yt_dlp-2025.2.19-py3-none-any.whl (3.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m57.8 MB/s[0m eta [36m0:00:00[0m:00:01[0m
[?25hInstalling collected packages: yt_dlp
Successfully installed yt_dlp-2025.2.19
Note: you may need to restart the kernel to use updated packages.


# Importing Necessary library and Defining main class

In [3]:
import os
import pandas as pd
import requests
import time
from yt_dlp import YoutubeDL

class YouTubeDataScraper:
    def __init__(self, save_dir='thumbnails'):
        self.save_dir = save_dir
        self.create_directories()

    def create_directories(self):
        if not os.path.exists(self.save_dir):
            os.makedirs(self.save_dir)

    def download_thumbnail(self, url, video_id):
        try:
            response = requests.get(url)
            if response.status_code == 200:
                file_path = os.path.join(self.save_dir, f"{video_id}.jpg")
                with open(file_path, 'wb') as f:
                    f.write(response.content)
                return file_path
        except Exception as e:
            print(f"Error downloading thumbnail: {e}")
        return None

    def get_video_data(self, video_url):
        try:
            ydl_opts = {
                'quiet': True,
                'no_warnings': True,
                'extract_flat': False,
                'skip_download': True,
            }
            with YoutubeDL(ydl_opts) as ydl:
                info = ydl.extract_info(video_url, download=False)
                video_id = info['id']
                thumbnail_url = info.get('thumbnail', '')
                thumbnail_path = self.download_thumbnail(thumbnail_url, video_id)
                video_data = {
                    'video_id': video_id,
                    'title': info.get('title', ''),
                    'description': info.get('description', ''),
                    'view_count': info.get('view_count', 0),
                    'like_count': info.get('like_count', 0),
                    'dislike_count': info.get('dislike_count', 0),
                    'thumbnail_url': thumbnail_url,
                    'local_thumbnail_path': thumbnail_path,
                    'duration': info.get('duration', 0),
                    'upload_date': info.get('upload_date', ''),
                    'channel_id': info.get('channel_id', ''),
                    'channel_name': info.get('channel', ''),
                    'video_url': video_url
                }
                print(f"Successfully processed video: {info['title']}")
                return video_data
        except Exception as e:
            print(f"Error processing video {video_url}: {e}")
            return None

    def get_channel_videos(self, channel_url,number_of_videos=100):
        """
        Fetch video URLs from a channel's uploads playlist.
        """
        try:
            if '/@' in channel_url:
                # Convert channel URL to uploads playlist URL
                channel_username = channel_url.split('/')[-1]
                playlist_url = f"https://www.youtube.com/{channel_username}/videos"
            else:
                playlist_url = channel_url
            
            ydl_opts = {
                'quiet': True,
                'no_warnings': True,
                'extract_flat': True,
                'playlistend': number_of_videos,
            }
    
            with YoutubeDL(ydl_opts) as ydl:
                info = ydl.extract_info(playlist_url, download=False)
                if 'entries' in info:
                    video_urls = [entry['url'] for entry in info['entries'] if 'url' in entry]
                    print(f"Found {len(video_urls)} videos on the channel")
                    return video_urls
                else:
                    print("No videos found on the channel")
                    return []
        except Exception as e:
            print(f"Error fetching channel videos: {e}")
            return []


    def process_videos(self, video_urls):
        video_data_list = []
        for url in video_urls:
            print(f"\nProcessing video: {url}")
            video_data = self.get_video_data(url)
            if video_data:
                video_data_list.append(video_data)
                print("Data collected successfully")
            else:
                print("Failed to collect data")
            time.sleep(1)
        if video_data_list:
            df = pd.DataFrame(video_data_list)
            print(f"\nCollected data for {len(video_data_list)} videos")
            return df
        else:
            print("No data collected")
            return pd.DataFrame()

    def save_data(self, df, csv_path='youtube_data.csv'):
        if not df.empty:
            df.to_csv(csv_path, index=False, encoding='utf-8-sig')
            print(f"Data saved to {csv_path}")
        else:
            print("No data to save")


# Defining Some extra function

In [4]:
from yt_dlp import YoutubeDL

def get_playlist_video_urls(playlist_id):
    """
    Get all video URLs from a YouTube playlist
    
    Args:
        playlist_id (str): YouTube playlist ID or URL
        
    Returns:
        list: List of video URLs in the playlist
    """
    # If full URL is provided, extract playlist ID
    if 'youtube.com' in playlist_id:
        if 'playlist?list=' in playlist_id:
            playlist_id = playlist_id.split('playlist?list=')[1]
        else:
            print("Invalid playlist URL")
            return []

    # Create YouTube playlist URL
    playlist_url = f'https://www.youtube.com/playlist?list={playlist_id}'
    
    # Configure yt-dlp options
    ydl_opts = {
        'quiet': True,
        'no_warnings': True,
        'extract_flat': True,  # Don't download videos
        'force_generic_extractor': False
    }

    try:
        # Create YouTube downloader object
        with YoutubeDL(ydl_opts) as ydl:
            # Extract playlist information
            playlist_info = ydl.extract_info(playlist_url, download=False)
            
            if not playlist_info:
                print("Could not fetch playlist information")
                return []

            # Extract video URLs
            video_urls = []
            for entry in playlist_info['entries']:
                if entry:
                    video_url = f"https://www.youtube.com/watch?v={entry['id']}"
                    video_urls.append(video_url)
            
            print(f"Found {len(video_urls)} videos in playlist")
            return video_urls

    except Exception as e:
        print(f"Error fetching playlist: {e}")
        return []


# Defining a general class for Fetching Data

**Note**: Here the class has one function named **fetch_videos**(source_type,url,no_of_videos) , here, **no_of_videos** is optional, by default it is set to 100. This is applicable for when we select type as channel. The function will return a data frame. We may concat or save it as per our wish. 


In [13]:
class YouTubeVideoFetcher:
    def __init__(self, save_dir='thumbnails'):
        self.scraper = YouTubeDataScraper(save_dir=save_dir)

    def fetch_videos(self, source_type, url,no_of_videos=100):
        """
        Fetches video data from a playlist, single video, or channel.

        Parameters:
        - source_type (str): "playlist", "video", or "channel"
        - url (str): The URL or ID of the source

        Returns:
        - pandas.DataFrame: Data collected from the videos
        """
        video_urls = []

        if source_type == "playlist":
            video_urls = get_playlist_video_urls(url)
        elif source_type == "video":
            video_urls = [url]
        elif source_type == "channel":
            video_urls = self.scraper.get_channel_videos(url)
        else:
            print("Invalid source type. Choose from 'playlist', 'video', or 'channel'.")
            return None

        if video_urls:
            df = self.scraper.process_videos(video_urls)
            
            if not df.empty:
                df = df.applymap(lambda x: x if not isinstance(x, str) else x.encode('utf-8').decode('utf-8'))
                return df  # Returns DataFrame
            else:
                print("No data was collected.")
                return None
        else:
            print("No videos found.")
            return None


In [17]:
fetcher = YouTubeVideoFetcher()

df=fetcher.fetch_videos("playlist", "https://www.youtube.com/playlist?list=PLNfsjhqtvHwXTv95LVTQUmj6IJxVIPYNX")


Found 7 videos in playlist

Processing video: https://www.youtube.com/watch?v=qb1X7yOjqoo
Successfully processed video: ভারতীয় ক্রিকেটে স্পন্সর করলে কি দেউলিয়া হতে হয়? India Cricket
Data collected successfully

Processing video: https://www.youtube.com/watch?v=lxzHRgbrxlo
Successfully processed video: বাংলাদেশের বিশ্বকাপ শুরু ধরমশালায়, ভিসা জটিলতায় সমর্থকরা
Data collected successfully

Processing video: https://www.youtube.com/watch?v=1PhXbzaQvVs
Successfully processed video: ইংল্যান্ড দলের হতাশা, পাকিস্তানকে বরণ, বিশ্বকাপ নিয়ে ভারতে যত উন্মাদনা Bangladesh Trending
Data collected successfully

Processing video: https://www.youtube.com/watch?v=jyk2EaCBONI
Successfully processed video: ভারতে আইসিসি ক্রিকেট বিশ্বকাপের যে পাঁচটি বিষয় হয়তো আপনার অজানা
Data collected successfully

Processing video: https://www.youtube.com/watch?v=5_A2lFKkJiQ
Successfully processed video: বিশ্বকাপে বাংলাদেশ দল নিয়ে ভক্তদের প্রত্যাশা কেমন?
Data collected successfully

Processing video: https://www.youtube.com

  df = df.applymap(lambda x: x if not isinstance(x, str) else x.encode('utf-8').decode('utf-8'))


## Here is some functions to make it easier.

In [None]:

df_combined = pd.concat([df1, df2], ignore_index=True).reset_index(drop=True)
df_combined.to_csv(csv_path, index=False, encoding='utf-8-sig')

In [34]:
import zipfile
import os

def zip_directory(directory_name, zip_name):
    # Create a Zip file
    with zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED) as zipf:
        # Walk through directory
        for root, _, files in os.walk(directory_name):
            for file in files:
                file_path = os.path.join(root, file)
                # Write file to zip
                zipf.write(file_path, os.path.relpath(file_path, directory_name))
    print(f"{zip_name} created successfully")

zip_directory("/kaggle/working/thumbnails", "/kaggle/working/thumbnails.zip")


/kaggle/working/thumbnails.zip created successfully
