In [None]:
!pip install beautifulsoup4
!pip install lxml

In [103]:
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup
from random import uniform
from requests import get
from http import HTTPStatus
from lxml import etree
from lxml import html
from time import sleep
import os

In [120]:
class ProgressBars:
    
    def __init__(self):
        self.progress_bars = {}
    
    def add_bar(self, bar_name, prog_max, prog=0, num_bars=50):
        self.progress_bars[bar_name] = {
            'prog': prog,
            'prog_max': prog_max,
            'num_bars': num_bars
        }
    
    def del_bar(self, bar_name):
        del self.progress_bars[bar_name]
    
    def update_progress(self, bar_name, prog=1):
        self.progress_bars[bar_name]['prog'] += prog
    
    def write_to_screen(self):
        output = ''
        for key in self.progress_bars.keys():
            bar = self.progress_bars[key]
            
            num_bars = int((bar['prog']/bar['prog_max'])*bar['num_bars'])
            num_spaces = bar['num_bars']-num_bars
            percent_complete = np.round(bar['prog']/bar['prog_max']*100, 2)
            
            bar = key+': <'+('|'*num_bars)+(' '*num_spaces)+'> ('+str(percent_complete)+'%)'
            if len(self.progress_bars.keys()) > 1:
                bar += '\n'
            output += bar
        if len(self.progress_bars.keys()) > 1:
            clear_output()
        print(output, end='\r')

In [183]:
class Scraper:
    def __init__(self, max_talks=float('inf')):
        """
        max_talks can be used to test the scraper, set it to an arbitrary number to test collecting a small
        sample size.
        The initializer will automatically collect all the talk urls, so it can take a while to initialize if
        you do not set max_talks to a smaller number.
        """
        self.RAND_LONG = lambda: uniform(10, 15)
        self.RAND_MED = lambda: uniform(5,8)
        self.RAND_SHORT = lambda: uniform(0.5,2)
        self.HOMEPAGE = "https://www.ted.com/talks/"
        self.BASE = "https://www.ted.com"
        self.SAVE_PATH = "ted_talk_data.csv"
        self.pages = self.init_page_urls()
        self.talks = self.collect_talk_urls(max_talks)
    
    def make_soup(self, url: str) -> BeautifulSoup:
        #takes a url and returns the BeautifulSoup object corresponding to it
        r = self.get_(url)
        return BeautifulSoup(r.text)
    
    def make_etree(self, url: str) -> etree._Element:
        #similar to makesoup, takes url and returns an lxml element tree
        r = self.get_(url)
        return etree.HTML(r.text)
    
    def get_(self, url: str) -> str:
        #used to handle http requests and timeout errors
        #detects if we are getting a transcript and will correct the headers so that correct
        #http response is sent out.
        sleep(self.RAND_SHORT())
        headers = {
            "user-agent" :"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36",
            "accept-language": "en-US,en;q=0.9"
        }
        if "/talks/" in url:
            headers.update({"user-agent" :"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36"})
        r = get(url, headers=headers)
        if r.status_code != 200:
            if r.status_code == 429 or r.status_code == 502:
                #print("Error 429 recieved, sleeping for RAND_LONG, then trying again")
                sleep(self.RAND_LONG())
                return self.get_(url)
            print(f"error collecting url: {url} \nStatus code recieved: {HTTPStatus(r.status_code)}")
            return ""
        return r
    
    def get_page_count(self) -> int:
        #returns the total amount of pages (each page contains many ted talk links)
        dom = self.make_etree(self.HOMEPAGE)
        return int(dom.xpath("/html/body/div[1]/div[2]/div/div[2]/div[2]/div[2]/div/a[5]")[0].text)
    
    def init_page_urls(self) -> dict:
        #returns the pages dictionary, where each key is a (str) page url
        #and the value is a string which represents that the page has been unscraped
        #this value should always either be "unscraped", "scraped", "in_progress", or "error".b
        page_count = self.get_page_count()
        pages = {f"https://www.ted.com/talks?page={i}": "unscraped" for i in range(1, page_count + 1)}
        return pages
    
    def collect_talk_urls(self, max_talks=float('inf')):
        #goes to each page and collects each talk url, assigns them to a list
        assert len(self.pages.keys()) != 0, "pages was never filled out, cannot collect talk urls"
        self.talks = []
        errors = 0
        prog = ProgressBars()
        prog.add_bar("Collecting Talk URLs", len(self.pages.keys()), prog=0, num_bars=50)
        for i, page in enumerate(self.pages.keys()):
            self.pages[page] = "in_progress"
            prog.update_progress("Collecting Talk URLs")
            prog.write_to_screen()
            try:
                soup = self.make_soup(page)
                thumb_images = soup.find_all(class_="thumb thumb--video thumb--crop-top")
                for thumb_image in thumb_images:
                    href = thumb_image.parent.get('href')
                    self.talks.append(self.BASE+href+"/transcript")
                    if len(self.talks) >= max_talks:
                        return self.talks
                self.pages[page] = "scraped"
            except Exception as e:
                self.pages[page] = "error"
                errors += 1
                print(f"error in collecting page {page}\n{e}")
        print()
        return self.talks
    
    def collect_talk_data(self, link: str, raw_input=None):
        #takes a link (in str format) that points to a specific ted talk.
        #will then go to that ted talk and collect relevant information
        #returns a dictionary where the key is the name of the data and the value is the data itself
        if link:
            raw = self.get_(link).text
        else:
            raw = raw_input
            
        soup = BeautifulSoup(raw)
        
        def get_topics() -> dict:
            #returns the set representation of the topics, they are transformed into lower case.
            topics_raw = soup.find_all(class_="inline-block py-1 text-tui-sm capitalize underline")
            topics = set([topic.text.lower() for topic in topics_raw])
            return {"topics" : topics}
        def get_summary() -> dict:
            #returns the summary provided by TED.
            summary = soup.find_all(class_="text-sm mb-6")[0].text
            return {"summary": summary}
        def get_transcript() -> dict:
            #returns the full transcript
            start = raw.find('"transcript":') + len('"transcript":') + 1
            end = raw.find('"embedUrl":') - 2
            transcript = raw[start:end]
            transcript = transcript.replace("&apos;", "'")
            transcript = transcript.replace("&quot;", '"')
            return {"transcript": transcript}
        def get_title() -> dict:
            #returns the title of the talk
            title = soup.find_all(class_="text-textPrimary-onLight font-light text-tui-2xl leading-tui-sm tracking-tui-tight md:text-tui-3xl md:tracking-tui-tightest lg:text-tui-4xl mr-5")[0].text
            return {"title": title}
        def get_misc() -> dict:
            #returns authors, and event in a dictionary
            block = soup.find_all(class_="text-sm w-full truncate text-gray-900")[0].text
            separator = block.find("•")
            author, event = block[:separator-1], block[separator+2:]
            return {"author": author, "event": event}
        out = {"link": link}
        [out.update(d) for d in [get_topics(), get_summary(), get_transcript(), get_misc(), get_title()]]
        return out
    
    def collect_raw(self, **kwargs):
        errs = 0
        columns = ['link', 'raw']
        dtypes = {'link': str, 'raw': str}
        df = pd.DataFrame(columns=columns)
        df = df.astype(dtypes)
        prog = ProgressBars()
        prog.add_bar("Scraping Raw Data   ", len(self.talks), prog=0, num_bars=50)
        for i, link in enumerate(self.talks):
            raw_dat = self.get_(link).text
            prog.update_progress("Scraping Raw Data   ")
            prog.write_to_screen()
            df = pd.concat([df, pd.DataFrame([{"link":link, "raw":raw_dat}])], ignore_index=True)
        return df
    
    def main(self, **kwargs):
        #collects data on all talks, has to be run manually
        #automatically does not add data for talks with no transcript
        """
        kwargs:
            save=True will set function to save a csv file of the collected data.
                    automatically saves into ./'SAVE_PATH', which is be default set to "ted_talk_data.csv"
                    **to change save path, change Scraper.SAVE_PATH to your desired path
            overwrite=True will delete the old save file and replace it with the new one
                            if an old save file exists in the same directory.
        """
        no_transcripts = 0
        columns = ['title', 'topics', 'transcript', 'author', 'event', 'link']
        dtypes = {'title': str, 'topics': object, 'transcript': str, 'author': str, 'event': str, 'link': str}
        df = pd.DataFrame(columns=columns)
        df = df.astype(dtypes)
        prog = ProgressBars()
        prog.add_bar("Scraping Data       ", len(self.talks), prog=0, num_bars=50)
        for i, talk in enumerate(self.talks):
            dat = self.collect_talk_data(talk)
            prog.update_progress("Scraping Data       ")
            prog.write_to_screen()  
            if (not dat['transcript']) or (not dat['topics']):
                no_transcripts += 1
                continue
            df = pd.concat([df, pd.DataFrame([dat])], ignore_index=True)
        print(f"\n\nFailed to collect data on {no_transcripts} talks due to a lack of transcript/topics.")
        if "save" in kwargs and kwargs["save"] == True:
            if "overwrite" in kwargs and kwargs["overwrite"] == True:
                #delete old file if it exists
                if os.path.exists(self.SAVE_PATH):
                    os.remove(self.SAVE_PATH)
            df.to_csv(self.SAVE_PATH, index_label=False)
        return df

In [184]:
scraper = Scraper()
main()

Collecting Talk URLs: <                                                  > (1.22%)

In [165]:
test = scraper.collect_raw()

Scraping Raw Data   : <||||||||||||||||||||||||||||||||||||||||||||||||||> (100.0%)

In [191]:
responses = pd.Series([scraper.collect_talk_data(test.iloc[i]['link']) for i in range(test.shape[0])])

ConnectionError: HTTPSConnectionPool(host='www.ted.com', port=443): Max retries exceeded with url: /talks/ted_ed_how_to_manage_your_emotions/transcript (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x17ebdb880>: Failed to establish a new connection: [Errno 8] nodename nor servname provided, or not known'))

In [190]:
responses

Unnamed: 0,link,raw
0,https://www.ted.com/talks/megan_alubicki_flick...,"<!DOCTYPE html><html><head><link rel=""preconne..."
1,https://www.ted.com/talks/kate_kahle_why_autis...,"<!DOCTYPE html><html><head><link rel=""preconne..."
2,https://www.ted.com/talks/malachy_mchugh_how_s...,"<!DOCTYPE html><html><head><link rel=""preconne..."
3,https://www.ted.com/talks/alona_fyshe_does_ai_...,"<!DOCTYPE html><html><head><link rel=""preconne..."
4,https://www.ted.com/talks/dan_shipper_the_unex...,"<!DOCTYPE html><html><head><link rel=""preconne..."
5,https://www.ted.com/talks/dan_finkel_this_one_...,"<!DOCTYPE html><html><head><link rel=""preconne..."
6,https://www.ted.com/talks/melanie_charles_damb...,"<!DOCTYPE html><html><head><link rel=""preconne..."
7,https://www.ted.com/talks/ted_countdown_how_do...,"<!DOCTYPE html><html><head><link rel=""preconne..."
8,https://www.ted.com/talks/alexandra_horowitz_w...,"<!DOCTYPE html><html><head><link rel=""preconne..."
9,https://www.ted.com/talks/charles_wallace_when...,"<!DOCTYPE html><html><head><link rel=""preconne..."
