In [1]:
import json
import re
import requests

from typing import Any, Dict, List, Tuple

from bs4 import BeautifulSoup as bs

In [2]:
BASE_URL = "https://web.archive.org"
START_URL = "https://socialblade.com/youtube/top/category/entertainment"

CATEGORIES_DICT = {}
CHANNELS_DICT = {}

In [3]:
def short_form_to_num(short_form: str) -> int:
    if short_form[-1].lower() == "k":
        multiplier = 1000
    elif short_form[-1].lower() == "m":
        multiplier = 1e6
    elif short_form[-1].lower() == "b":
        multiplier = 1e9
    else:
        raise ValueError("Can not recognize the short form.")
    num = float(short_form[:-1].strip())
    return int(num * multiplier)

def comma_separated_to_num(string: str) -> int:
    splits = [s.strip() for s in string.split(",")[::-1]]
    num = 0
    base = 0
    for split in splits:
        num += int(split) * (10 ** base)
        base += len(split)
    return num

def find_sb_url(url: str) -> Tuple[str, str]:
        pat = re.compile("\/web\/(\d+)\/(.*)")
        return re.search(pat, url).groups()

In [4]:
class Page:
    def __init__(self, name: str, url: str, archive_datetime: str):
        self.sb_url = url
        self.url = f"{BASE_URL}/web/{archive_datetime}/{url}"
        
    def process(self) -> List[Any]:
        raise NotImplementedError("Base Page can not process.")
        
    def _get_soup(self):
        response = requests.get(self.url)
        if response.status_code != 200:
            raise ValueError(f"Error fetching page {self.url}")
            
        return bs(response.text, 'html.parser')

In [5]:
class Category(Page):
    def __init__(self, name: str, url: str, archive_datetime: str):
        super(Category, self).__init__(name, url, archive_datetime)
        self.url = f"{self.url}/mostsubscribed"
        self.channels = []
        self.channels_visited = {}
        
    def process(self) -> List[Dict[str, str]]:
        soup = self._get_soup()
        left = soup.find("div", {"style": "float: left; width: 300px;"})
        right = soup.find("div", {"style": "float: right; width: 900px;"})
        self._add_channels(right)
        return self._other_categories(left)
    
    def _add_channels(self, root):
        global CHANNELS_DICT
        
        for channel_div in self._get_channel_divs(root):
            children = channel_div.findChildren("div", recursive=False)
            archive_date, sb_url = find_sb_url(children[2].find('a')['href'])
            if sb_url in CHANNELS_DICT:
                if not self.channels_visited.get(sb_url, False):
                    channel = CHANNELS_DICT[sb_url]
                else:
                    continue
            else:
                channel = Channel(children[2].find("a").text.strip(), sb_url, archive_date)
                channel.process()
                CHANNELS_DICT["sb_url"] = channel
            self.channels_visited["sb_url"] = True
            self.channels.append(channel)

    def _other_categories(self, root) -> List[Dict[str, str]]:
        others = []
        for div in self._get_category_divs(root):
            others.append(self._get_category_details(div))
        return others
    
    @staticmethod
    def _get_channel_divs(root):
        i = 0
        children = root.findChildren("div", recursive=False)
        for div in children:
            i += 1
            if div.attrs.get("id") == "sort-by":
                i += 1
                break
        return children[i:]
    
    @staticmethod
    def _get_category_divs(root):
        i = 0
        children = root.findChildren("div", recursive=False)
        for div in children:
            i += 1
            if div.text.strip() == 'Top 100 by Channeltype':
                break
        return children[i].find_all("a")
    
    @staticmethod
    def _get_category_details(cat_div):
        archive_date, sb_url = find_sb_url(cat_div["href"])
        return {
            "name": cat_div.text.strip(),
            "archive_datetime": archive_date,
            "url": sb_url
        }

In [6]:
class Channel(Page):
    def __init__(self, name: str, url: str, archive_datetime: str):
        super(Channel, self).__init__(name, url, archive_datetime)
        
    def process(self):
        soup = self._get_soup()
        top_info = soup.find("div", {"id": "YouTubeUserTopInfoWrap"})
        self._set_top_info(top_info)
        user_content = soup.find("div", {"id": "socialblade-user-content"})
        contents = user_content.findChildren("div", recursive=False)
        self._set_grade_rank(contents[0])
    
    def _set_top_info(self, root):
        avatar = root.find("img")["src"]
        info = root.find("div", {"id": "YouTubeUserTopInfoBlockTop"})
        info = info.find("div", {"id": "YouTubeUserTopInfoBlock"})
        info = info.find_all("div", {"class": "YouTubeUserTopInfo"})
        uploads = info[0].find_all("span")[1].text.strip()
        subs = info[1].find_all("span")[1].text.strip()
        views = info[2].find_all("span")[1].text.strip()
        country = info[3].find_all("span")[1].find("a")
        archive_date, url = find_sb_url(country["href"])
        category = info[4].find_all("span")[2].find("a")
        cat_archive_date, cat_url = find_sb_url(category["href"])

        self.uploads = comma_separated_to_num(uploads),
        self.subs = short_form_to_num(subs),
        self.views = comma_separated_to_num(views),
        self.country = country.text.strip(),
        self.country_archive_date = archive_date,
        self.country_url = url,
        self.category = category.text.strip(),
        self.category_archive_date = cat_archive_date,
        self.category_url = cat_url,
        self.created_at = info[5].find_all("span")[1].text.strip()
        
    def _set_grade_rank(self, root):
        grade, ranks = root.findChildren("div", recursive=False)
        ranks = ranks.findChildren("div", recursive=False)
        self.grade = grade.findChild("div", recursive=False).text.strip()
        self.sb_rank = ranks[0].find("p").text.strip()
        self.subs_rank = ranks[1].find("p").text.strip()
        self.vid_view_rank = ranks[2].find("p").text.strip()
        self.country_rank = ranks[3].find("p").text.strip()
        self.category_rank = ranks[4].find("p").text.strip()
        
    def serialize(self):
        return {
            "uploads": self.uploads,
            "subs": self.subs,
            "views": self.views,
            "country": self.country,
            "country_archive_date": self.country_archive_date,
            "country_url": self.country_url,
            "category": self.category,
            "cat_archive_date": self.category_archive_date,
            "cat_url": self.category_url,
            "created_at": self.created_at,
            "grade": self.grade,
            "sb_rank": self.sb_rank,
            "subs_rank": self.subs_rank,
            "vid_view_rank": self.vid_view_rank,
            "country_rank": self.country_rank,
            "category_rank": self.category_rank
        }

In [7]:
def process_category(name: str, url: str, archive_datetime: str):
    if url in CATEGORIES_DICT:
        return
    category = Category(name, url, archive_datetime)
    other_categories = category.process()
    CATEGORIES_DICT[url] = category
    print(len(CATEGORIES_DICT))
    for category in other_categories:
        process_category(**category)

In [None]:
process_category("Entertainment", START_URL, "20210318122140")