<!-- # 纽约联储主席讲话数据爬取

@author : zhangwubin / 01208663

@date: Oct. 21, 2024 -->

In [1]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException, NoSuchElementException
import json
import pandas as pd
from datetime import datetime
from typing import List, Optional
import os

In [2]:
# Set the date range for scraping
start_date = datetime(2006, 1, 1)
end_date = datetime.now()

In [3]:
# Function to get the most recent speech date from existing files
def get_most_recent_speech_date():
    all_speeches_file = '../data/fed_speeches/newyork_fed_speeches/newyork_fed_speeches_all.json'
    
    if os.path.exists(all_speeches_file):
        with open(all_speeches_file, 'r', encoding='utf-8') as f:
            speeches = json.load(f)
            if speeches:  # Check if the file is not empty
                last_speech = speeches[-1]
                return datetime.strptime(last_speech['date'], '%B %d, %Y')
    
    return start_date

In [7]:
# automatically downloads and sets up ChromeDriver
driver = webdriver.Chrome()

In [None]:
# Navigate to the New York Fed speeches page
URL = "https://www.newyorkfed.org/newsevents/speeches/index"
driver.get(URL)
print(driver.title)

In [None]:
# Helper function to parse the date
def parse_date(date_string):
    try:
        return pd.to_datetime(date_string.strip()).to_pydatetime()
    except ValueError:
        pass

    try:
        return datetime.strptime(date_string.strip(), "%b %d, %Y")
    except ValueError:
        print(f"Error parsing date: {date_string}")
        return None
    
parse_date("Oct. 12, 2023")

## 搜集所有的讲话数据信息

In [11]:
def get_speech_infos(last_names: Optional[List[str]] = None):
    speech_urls = []
    try:
        driver.get(URL)
        # Wait for the table to be present
        table = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.CLASS_NAME, "newsTable"))
        )
        
        rows = table.find_elements(By.TAG_NAME, "tr")
        
        for row in rows:
            if "yrHead" in row.get_attribute("class"):
                continue
            
            try:
                columns = row.find_elements(By.TAG_NAME, "td")
                if len(columns) < 2: continue
                date_div = columns[0].find_element(By.TAG_NAME, "div")
                date = date_div.text.strip().split("==")[0].strip()  # Extract date and remove any extra text
                link_elem = row.find_element(By.TAG_NAME, "a")
                href = link_elem.get_attribute("href")
                # Check if the speech is by one of the specified speakers
                title = link_elem.text.strip()
                if last_names:
                    speaker_last_name = title.split(':')[0].strip()
                    if speaker_last_name in last_names:
                        speech_urls.append({"url": href, "date": date, "title": title})
                else:
                    speech_urls.append({"url": href, "date": date, "title": title})
            except NoSuchElementException:
                print(f"No speech link found in row: {row.text}")
                continue
        
        print(f"Collected {len(speech_urls)} speech links.")
        return speech_urls
    
    except Exception as e:
        print(f"An error occurred while collecting speech links: {str(e)}")
        return speech_urls

In [None]:
# 获取最新的讲话信息
# Williams and Dudley are the presidents of the New York Fed from 2020 to 2024
most_recent_date = get_most_recent_speech_date()
# most_recent_date = parse_date(most_recent_date)
print("most_recent_date: {}".format(most_recent_date))
# last_names_to_include = ["Williams", "Dudley", "Geithner", "Stewart", "Geithner"]
speech_infos = get_speech_infos()
speech_infos

In [None]:
def extract_speech_content(url):
    try:
        driver.get(url)
        title = driver.find_element(By.CLASS_NAME, "ts-article-title").text.strip()
        
        # Wait for the content to load
        WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.CLASS_NAME, "container_12"))
        )
        # Extract the "Posted" date
        contact_info_elements = driver.find_elements(
            By.XPATH, "//body/div/div/div[@class='ts-contact-info']"
        )
        if len(contact_info_elements) >=1:
            # 日期
            date_elem = contact_info_elements[0]
            date_text = date_elem.text.strip()
            posted_date = [line for line in date_text.split('\n') if 'Posted' in line]
            date = posted_date[0] if posted_date else date_text.split('\n')[0]
            # 演讲人
            speaker_title = contact_info_elements[1].text
            splits = speaker_title.split(',')
            speaker = splits[0].strip() if len(splits) >0 else "Unknown"
        else:
            date = 'Unknown'
            speaker = 'Unknown'

        content_elem = driver.find_element(By.CLASS_NAME, "ts-article-text")
        paragraphs = content_elem.find_elements(By.TAG_NAME, "p")
        content = "\n\n".join([p.text for p in paragraphs if p.text.strip()])
        
        return {
            "title": title,
            "date": date,
            "speaker": speaker,
            "url": url,
            "content": content.strip()
        }
    except TimeoutException as e:
        print(f"Timeout error extracting content from {url}: {str(e)}")
    except WebDriverException as e:
        print(f"WebDriver error extracting content from {url}: {str(e)}")
    except Exception as e:
        print(f"Unexpected error extracting content from {url}: {str(e)}")

test_url = "https://www.newyorkfed.org/newsevents/speeches/2006/gei060405"
extract_speech_content(test_url)

In [9]:
def save_speeches(speeches, year=None):
    folder_name = "newyork_fed_speeches"
    os.makedirs(folder_name, exist_ok=True)

    if year is not None:
        filename = f"newyork_fed_speeches_{year}.json"
        file_path = os.path.join(folder_name, filename)
        with open(file_path, "w") as f:
            json.dump(speeches, f, indent=2)
        print(f"Saved {len(speeches)} speeches for {year} to {file_path}")
    else:
        filename = "newyork_fed_speeches_all.json"
        file_path = os.path.join(folder_name, filename)
        with open(file_path, "w") as f:
            json.dump(speeches, f, indent=2)
        print(f"Saved {len(speeches)} speeches to {file_path}")

In [None]:
# Main scraping process
def scrape_speeches(start_date: datetime, speech_infos: list):
    speeches = []
    speeches_by_year = {}
    current_year = None

    for speech in speech_infos:
        speech_date = parse_date(speech['date'])
        if speech_date and speech_date >= start_date:
            full_speech_data = extract_speech_content(speech['url'])
            if full_speech_data:
                full_speech_data['date'] = speech['date']  # Use the date from the index page
                speeches.append(full_speech_data)
                
                year = speech_date.year
                if year != current_year:
                    if current_year:
                        save_speeches(speeches_by_year[current_year], current_year)
                    current_year = year
                    speeches_by_year[year] = []
                speeches_by_year[year].append(full_speech_data)
                print(f"Scraped speech: {speech['date']} - {full_speech_data['title']}")
        elif speech_date is None:
            print(f"Skipping speech due to invalid date: {speech_infos['url']}")
        else:
            print("Reached speeches older than our start date, stop here")
            break
    
    # Save the last year's speeches if any are left
    if current_year and speeches_by_year[current_year]:
        save_speeches(speeches_by_year[current_year], current_year)

    # Save all speeches
    save_speeches(speeches)

    return speeches, speeches_by_year

speeches, speeches_by_year = scrape_speeches(most_recent_date, speech_infos)

In [12]:
# Close the browser
driver.quit()