In [1]:
# Imports
import requests
import cloudscraper
import subprocess
import logging
import pandas as pd
import numpy as np
import re
import random
import time
import os
from bs4 import BeautifulSoup
from stem import Signal
from stem.control import Controller

# Deprecated imports
# import selenium
# from selenium import webdriver
# from selenium.webdriver.common.by import By
# from selenium.webdriver.support.ui import WebDriverWait
# from selenium.webdriver.support import expected_conditions as EC

## Helper Functions

In [2]:
def get_question_links(tag, num_pages=10):
    '''
    Gets links to questions that belong to a certain tag.

    Parameters:
    tag (str): A tag for which to get questions.
    num_pages (int): Number of pages of the tag to scrape. Defaults to 10.

    Returns:
    question_links (list[str]): A list of URLs to Math Stack Exchange questions.
    '''
    question_links = []
    for page in range(1, num_pages + 1):
        url = f'https://math.stackexchange.com/questions/tagged/{tag}?tab=votes&page={page}&pagesize=50'
        response = requests.get(url)
        soup = BeautifulSoup(response.text, 'html.parser')
        for question in soup.select('.s-post-summary'):
            link = question.select_one('.s-link')['href']
            question_links.append(link)
        time.sleep(random.uniform(0,2)) # Random to mimic human behavior and hopefully not get banned...
        
    return question_links

In [3]:
def parse_title(title_str):
    '''
    Separates title into text portions and LaTeX portions.

    Parameters:
    title_str (str): The raw title.

    Returns:
    text_content (str): The text portion of the title.
    latex_content (str): The LaTeX portion of the title. 
    '''
    latex_parts = re.findall(r'\$(.*?)\$', title_str)
    text_content = re.sub(r'\$.*?\$', '', title_str).strip()
    latex_content = ' '.join(latex_parts)
    return text_content, latex_content

In [4]:
def parse_body(content_soup):
    '''
    Separates body into text portions and LaTeX portions. Takes into account more sophisiticated LaTeX than just "$" delimiters.

    Parameters:
    content_soup (soup): The soup object obtained from the contents of the question.

    Returns:
    text_content (str): The text portion of the body.
    latex_content (str): The LaTeX portion of the body. 
    '''
    latex_elements = content_soup.find_all('span', class_='math-container')
    latex_elements.extend(content_soup.find_all('script', type='math/tex'))
    for element in latex_elements:
        element.extract()
    text_content = content_soup.get_text(separator=" ", strip=True).replace('\n', ' ')
    latex_content = ' '.join([element.get_text(separator=" ", strip=True) for element in latex_elements]).replace('$', '')
    return text_content, latex_content

In [5]:
def remove_newlines_outside_dollar(text):
    '''
    Removes all "\n" characters outside of LaTeX (i.e. outside of "$" delimiters).

    Parameters:
    text (str): Raw text.

    Returns:
    cleaned (str): Cleaned text.
    '''
    # Split text by LaTeX parts
    parts = re.split(r'(\$.*?\$)', text)  
    cleaned_parts = []
    
    for part in parts:
        # Keep LaTeX parts as they are, strip out newlines from all other parts. 
        if part.startswith('$') and part.endswith('$'):
            cleaned_parts.append(part)
        else:
            cleaned_parts.append(part.replace('\n', ' '))  # Remove newlines from non-LaTeX parts
    cleaned = ''.join(cleaned_parts)
    return cleaned

In [6]:
def get_question_content(scraper, question_id):
    '''
    Gets title, body, and tags for a question. Also separates all text into text portions and LaTeX portions. 

    Parameters:
    scraper (cloudscraper.scraper): A cloudscraper scraper object.
    question_id (int): ID for math stack exchange question.

    Returns:
    data_dict (dict): A dictionary containing all the data for the question.
    '''
    url = f'https://math.stackexchange.com/questions/{question_id}'
    response = scrape_url(scraper, url)
    soup = BeautifulSoup(response.content, 'html.parser')
    title = soup.select_one('.question-hyperlink').get_text(separator=" ", strip=True)
    title_text, title_latex = parse_title(title)
    body = soup.select_one('.js-post-body')
    body_raw = remove_newlines_outside_dollar(body.get_text(separator=" ", strip=True))
    body_text, body_latex = parse_body(body)
    tags = []
    tag_data = soup.find_all('li', class_='d-inline mr4 js-post-tag-list-item')
    for tag in tag_data:
        tags.append(tag.get_text())
    data_dict = {'question_id':question_id, 'title_raw':title, 'title_text': title_text, 
                 'title_latex': title_latex, 'body_raw': body_raw, 'body_text': body_text, 
                 'body_latex': body_latex, 'tags':list(set(tags))}
    return data_dict

In [7]:
def get_question_content_raw(scraper, question_id):
    '''
    Gets title, body, and tags for a question.

    Parameters:
    scraper (cloudscraper.scraper): A cloudscraper scraper object.
    question_id (int): ID for math stack exchange question.

    Returns:
    data_dict (dict): A dictionary containing all the data for the question.
    '''
    url = f'https://math.stackexchange.com/questions/{question_id}'
    response = scrape_url(scraper, url)
    soup = BeautifulSoup(response.content, 'html.parser')
    title = soup.select_one('.question-hyperlink').get_text(separator=" ", strip=True)
    body = soup.select_one('.js-post-body').get_text(separator=" ", strip=True)
    tags = []
    tag_data = soup.find_all('li', class_='d-inline mr4 js-post-tag-list-item')
    for tag in tag_data:
        tags.append(tag.get_text())
    data_dict = {'question_id':question_id, 'title':title, 'body': body, 'tags':list(set(tags))}
    return data_dict

In [8]:
def load_counter(filename='aux_files/counter.txt'):
    if os.path.exists(filename):
        with open(filename, 'r') as f:
            return int(f.read())
    return 0

In [9]:
def save_counter(counter, filename='aux_files/counter.txt'):
    with open(filename, 'w') as f:
        f.write(str(counter))

In [10]:
def scrape_url(scraper, url):
    response = scraper.get(url)
    if 'Just a moment...' in response.text:
        logging.warning('Encountered "Just a moment..." page.')
        start_time = time.time()
        while True:
            time.sleep(5)
            response = scraper.get(url, headers=headers)
            if 'Just a moment...' in response.text:
                continue
            logging.info(f'Waited {round(time.time() - start_time, 1)} seconds.')
            break
    return response

## Main Code

In [11]:
# Top 30 tags on Math Stack Exchange
tags = [
    'real-analysis',
    'calculus',
    'linear-algebra',
    'probability',
    'abstract-algebra',
    'integration',
    'sequences-and-series',
    'combinatorics',
    'general-topology',
    'matrices',
    'functional-analysis',
    'complex-analysis',
    'geometry',
    'group-theory',
    'algebra-precalculus',
    'probability-theory',
    'ordinary-differential-equations',
    'limits',
    'analysis',
    'number-theory',
    'measure-theory',
    'statistics',
    'multivariable-calculus',
    'functions',
    'derivatives',
    'differential-geometry',
    'discrete-mathematics',
    'trigonometry',
    'algebraic-geometry',
    'elementary-set-theory'
]

In [None]:
# Get 10,000 questions for each tag. Should take around two hours to run based on the average 1 second time.sleep() between calls.
all_links = []
for tag in tags:
    curr_links = get_question_links(tag, num_pages=200)
    all_links.extend(curr_links)
    with open('aux_files/links.txt', 'w') as f:
        for link in all_links:
            f.write(f'{link}\n')
    print(f'Gotten {len(curr_links)} questions for tag: {tag}')

In [12]:
# Get all question IDs from file
with open('aux_files/links.txt') as f:
    all_links = f.read().splitlines()
question_ids = sorted([int(re.search(r'/questions/(\d+)', link).group(1)) for link in all_links])
question_ids = list(dict.fromkeys(question_ids))

In [13]:
# Configure logging
logging.basicConfig(filename='aux_files/scraping.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

In [14]:
# Reset log
# with open('aux_files/scraping.log', 'w'):
#     pass

In [15]:
# Get question data for each link
counter = load_counter()
curr_data = []
scraper = cloudscraper.create_scraper()
for question_id in question_ids[counter:]:
    try:
        next_data = get_question_content_raw(scraper, question_id)
        time.sleep(random.uniform(0,2))
        curr_data.append(next_data)
        counter += 1
        logging.info(f'Successfully scraped question: {question_id} with tags: {next_data["tags"]}')
        if counter % 100 == 0: # Save data incrementally due to running this process on my local machine (sometimes have to restart). 
            question_data = pd.DataFrame(curr_data)
            question_data.to_csv(f'raw_data/{counter-99}-{counter}.csv', index=False)
            curr_data = []
            save_counter(counter)
            logging.info(f'Saved {counter} questions.')
            print(f'Saved {counter} questions.')
    except Exception as e:
        logging.error(f'Error scraping question {question_id}: {e}')
        continue
if curr_data:
    question_data = pd.DataFrame(curr_data)
    question_data.to_csv(f'raw_data/{counter-len(curr_data) + 1}-{counter}.csv', index=False)
    save_counter(counter)
    logging.info(f'Scraping completed. Total of {counter} questions found.')

Saved 164600 questions.
Saved 164700 questions.
Saved 164800 questions.
Saved 164900 questions.
Saved 165000 questions.
Saved 165100 questions.
Saved 165200 questions.
Saved 165300 questions.
Saved 165400 questions.
Saved 165500 questions.
Saved 165600 questions.
Saved 165700 questions.
Saved 165800 questions.
Saved 165900 questions.
Saved 166000 questions.
Saved 166100 questions.
Saved 166200 questions.
Saved 166300 questions.
Saved 166400 questions.
Saved 166500 questions.
Saved 166600 questions.
Saved 166700 questions.
Saved 166800 questions.
Saved 166900 questions.
Saved 167000 questions.
Saved 167100 questions.
Saved 167200 questions.
Saved 167300 questions.
Saved 167400 questions.
Saved 167500 questions.
Saved 167600 questions.
Saved 167700 questions.
Saved 167800 questions.
Saved 167900 questions.
Saved 168000 questions.
Saved 168100 questions.
Saved 168200 questions.
Saved 168300 questions.
Saved 168400 questions.
Saved 168500 questions.
Saved 168600 questions.
Saved 168700 que

## Deprecated

At one point I attempted to use Selenium, but it was too complicated and too slow. These functions don't work properly, but I left them in just in case I decide to revisit them.

In [113]:
def parse_body_selenium(content_element):
    latex = []
    text = []
    raw = []
    child_nodes = content_element.find_elements(By.XPATH, './*')[0].get_property('childNodes')
    for node in child_nodes:
        if type(node) == dict:
            curr_text = node['data'].strip()
            text.append(curr_text.replace('\n', ''))
            raw.append(curr_text.replace('\n', ''))
        elif type(node) == selenium.webdriver.remote.webelement.WebElement:
            if node.tag_name == 'span':
                inner_child_nodes = node.get_property('childNodes')
                for inner_node in inner_child_nodes:
                    if inner_node.tag_name == 'script':
                        curr_text = inner_node.get_attribute('textContent').strip()
                        latex.append(curr_text)
                        raw.append('$' + curr_text + '$')
            else:
                curr_text = node.get_attribute('textContent').strip()
                text.append(curr_text.replace('\n', ''))
                raw.append(curr_text.replace('\n', ''))
    raw_content = ' '.join(raw)
    text_content = ' ' .join(text)
    latex_content = ' '.join(latex)
    return raw_content, text_content, latex_content

In [111]:
def parse_title_selenium(content_element):
    raw = []
    text = []
    latex = []
    child_nodes = content_element.get_property('childNodes')
    for node in child_nodes:
        if type(node) == dict:
            curr_text = node['data'].strip()
            text.append(curr_text.replace('$', ''))
            raw.append(curr_text.replace('$', ''))
        elif type(node) == selenium.webdriver.remote.webelement.WebElement:
            if node.tag_name == 'script':
                curr_text = node.get_attribute('textContent').strip()
                latex.append(curr_text)
                raw.append('$' + curr_text + '$')
    title = ' '.join(raw)
    text_content = ' ' .join(text)
    latex_content = ' '.join(latex)
    return title, text_content, latex_content

In [None]:
def get_question_content_selenium(driver, question_id):
    '''
    Gets title, body, and tags for a question using selenium. Also separates all text into text portions and LaTeX portions. 

    Parameters:
    driver (selenium.webdriver): A selenium webdriver object.
    question_id (int): ID for math stack exchange question.

    Returns:
    data_dict (dict): A dictionary containing all the data for the question.
    '''
    url = f'https://math.stackexchange.com/questions/{question_id}'
    driver.get(url)
    title_element = driver.find_element(By.CSS_SELECTOR, '.question-hyperlink')
    body_element = driver.find_element(By.CSS_SELECTOR, '.js-post-body')

    # Parse
    title, title_text, title_latex = parse_title_selenium(title_element)
    body_raw, body_text, body_latex = parse_body_selenium(body_element)

    # Get the tags
    tags = []
    tag_elements = driver.find_elements(By.CSS_SELECTOR, 'li.d-inline.mr4.js-post-tag-list-item')
    for tag_element in tag_elements:
        tags.append(tag_element.text.strip())

    # Prepare data dictionary
    data_dict = {'question_id':question_id, 'title_raw':title, 'title_text': title_text, 
                 'title_latex': title_latex, 'body_raw': body_raw, 'body_text': body_text, 
                 'body_latex': body_latex, 'tags':list(set(tags))}
    return data_dict

I also attempted to use Tor to get arround rate limits. But, even when I got a rotating IP address, they still detected that I was using Tor and blocked me. 

In [2]:
# Different user agents
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0"
]

In [3]:
def start_tor():
    subprocess.Popen(["start_tor.bat"], shell=True)
    time.sleep(10)  # Wait for Tor to start

In [4]:
def stop_tor():
    subprocess.Popen(["stop_tor.bat"], shell=True)

In [34]:
def get_tor_session():
    session = cloudscraper.create_scraper()
    # session = requests.Session()
    session.proxies = {
        'http': 'socks5h://127.0.0.1:9080',
        'https': 'socks5h://127.0.0.1:9080'
    }
    session.headers.update({
        'User-Agent': random.choice(USER_AGENTS)
    })
    return session

In [38]:
def get_current_ip(session):
    try:
        response = session.get("http://httpbin.org/ip")
        ip = response.json()["origin"]
        return ip
    except Exception as e:
        logging.error(f"Error fetching current IP: {e}")
        return None

In [7]:
def renew_tor_ip(controller):
    controller.signal(Signal.NEWNYM)
    logging.info('Renewing Tor IP...')
    time.sleep(10)  # Wait for the new IP address to take effect
    session = get_tor_session()
    new_ip = get_current_ip(session)
    logging.info(f'Switched to new Tor IP: {new_ip}')
    return session

In [None]:
# Get question data for each link
counter = load_counter()
curr_data = []
# start_tor()
scraper = get_tor_session()
with Controller.from_port(port=5096) as controller:
    controller.authenticate()
    for question_id in question_ids[counter:300]:
        try:
            next_data = get_question_content(question_id, controller)
            time.sleep(random.uniform(0,1)) # Once again, used to hopefully not get banned. 
            curr_data.append(next_data)
            counter += 1
            logging.info(f'Successfully scraped question: {question_id} with tags: {next_data["tags"]}')
            if counter % 100 == 0: # Save data incrementally due to running this process on my local machine (sometimes have to restart). 
                question_data = pd.DataFrame(curr_data)
                question_data.to_csv(f'raw_data/{counter-99}-{counter}.csv', index=False)
                curr_data = []
                save_counter(counter)
                logging.info(f'Saved {counter} questions.')
                print(f'Saved {counter} questions.')
        except Exception as e:
            logging.error(f'Error scraping question {question_id}: {e}')
            continue
    if curr_data:
        question_data = pd.DataFrame(curr_data)
        question_data.to_csv(f'raw_data/{counter-len(curr_data) + 1}-{counter}.csv', index=False)
        save_counter(counter)
        logging.info(f'Scraping completed. Total of {counter} questions found.')
# stop_tor()