In [None]:
# Kerry Zhang
# 7/16/2023
# Objective: Scrape earnings call transcripts from roic.ai
# Notes: Earnings transcripts are divided by person with div class "p-3 rounded-lg false".

import os
import copy
import time
import random
import pandas as pd
import numpy as np

from textblob import TextBlob
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options

# Avoid throttling
chrome_options = Options()
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_argument("--headless")  # Enable headless mode

# Options

INPUT_FILE = "firms_full.csv"
LOG_FILE = "scrape_log.csv"
OUTPUT_FOLDER = r"D:\finance_tools\transcripts"

In [None]:
# Load tickers
# INPUT_FILE = "firms.xlsx"
# YEARS = list(range(2006, 2023))
# QUARTERS = [1, 2, 3, 4]
# TICKERS = pd.read_excel(INPUT_FILE, header = None)[0].values.tolist()
# print("Number of tickers: {}".format(len(TICKERS)))

TICKER_DATA = pd.read_csv(INPUT_FILE)

# Require known fiscal quarter
TICKER_DATA = TICKER_DATA[TICKER_DATA['fqtr'].notna()]

# Require active
TICKER_DATA = TICKER_DATA[TICKER_DATA['costat'] == "A"]

# Require known market value > $1B
TICKER_DATA = TICKER_DATA[TICKER_DATA['mkvaltq'].notna()]
TICKER_DATA = TICKER_DATA[TICKER_DATA['mkvaltq'] > 1000]

# Clean
TICKER_DATA = TICKER_DATA[["tic", 'fyearq', 'fqtr']]
TICKER_DATA['fqtr'] = TICKER_DATA['fqtr'].astype('int')

print("Unique Firms: {}".format(TICKER_DATA["tic"].nunique()))
print("Total Firm-Quarters: {}".format(TICKER_DATA.shape[0]))

# Remove already scraped
if os.path.exists(LOG_FILE):
    log_df = pd.read_csv(LOG_FILE)
    TICKER_DATA = pd.merge(TICKER_DATA, log_df, how = "left", on = ["tic", 'fyearq', 'fqtr'])
    TICKER_DATA = TICKER_DATA[TICKER_DATA["captured"].isna()]
    
    print("\n")
    print("REMAINING: ")
    print("Unique Firms: {}".format(TICKER_DATA["tic"].nunique()))
    print("Total Firm-Quarters: {}".format(TICKER_DATA.shape[0]))

In [None]:
class text_to_disappear_and_return_new_text(object):
    def __init__(self, locator, initial_text):
        self.locator = locator
        self.initial_text = initial_text

    def __call__(self, driver):
        try:
            element = driver.find_element(*self.locator)
            element_text = element.text
            if self.initial_text not in element_text:
                return element_text
        except:
            # If element is not found or initial text still present, return False to continue waiting.
            return False

In [None]:
def scrape_transcript(url):
    
    # Set up Selenium WebDriver
    driver = webdriver.Chrome(chrome_options)  
    
    # Navigate to the website
    driver.get(url)
    
    # Wait until element is loaded
    wait = WebDriverWait(driver, 10)
    #locator = (By.ID, "__next")
    locator = (By.CLASS_NAME, "space-y-6")
    initial_text = "Please wait for a while ..."
    
    #element = wait.until_not(EC.invisibility_of_element_located(locator))
    #element = wait.until(EC.visibility_of_element_located((By.ID, "__next")))
    element = wait.until(text_to_disappear_and_return_new_text(locator, initial_text))
    
    # WIP: Throttling
    #wait.until(ExpectedConditions.not(ExpectedConditions.textToBePresentInElement(element, "Completed successfully")));
    
    # Extract Text
    text = copy.deepcopy(element)
    
    driver.quit()
    
    return text

In [None]:
def clean_text(earnings_transcript: str) -> list:

    # WIP: Remove non-ASCII characters
    earnings_transcript = earnings_transcript.replace("â\x80\x99", "'").replace("â\x80\x98", "'").replace("â\x80\x93", "'")
    
    lst = earnings_transcript.splitlines()
    
    # Remove speaker abbreviations
    lst = [element for element in lst if len(element) > 1]
    
    return lst

In [None]:
def split_text(lst: list, year: int, quarter: int):
    
    def is_name(segment: str) -> bool:
        words_in_segment = segment.split()
    
        if (
            segment == "Operator" or 
            (
             len(words_in_segment) == 2 and 
             words_in_segment[0].istitle() and 
             words_in_segment[1].istitle()
            )
           ):
            return True
        else:
            return False
        
    def get_sentiment(text):
        analysis = TextBlob(text)
        sentiment_score = analysis.sentiment.polarity
        
        return sentiment_score
    
    # Metadata
    filing_year_quarter = lst[0].replace(" · Earnings Call Transcript", "").split()
    date = lst[1]
    
#     filing_qtr = int(filing_year_quarter[0].replace("Q", ""))
#     filing_year = int(filing_year_quarter[1])
    
#     if year != filing_year or quarter != filing_qtr:
#         return None
        
    # Store [Position, Caller, Speech, Total Length, Average Segment Length] 
    position = 0
    caller = ""
    speech = ""
    paragraph_len_list = []
    
    # Process segments
    data = []
    for segment in lst[2:]:
        
        if is_name(segment):
            
            # Store current data
            data.append([position, 
                         caller, 
                         speech, 
                         np.sum(paragraph_len_list), 
                         np.mean(paragraph_len_list), 
                         get_sentiment(speech)])
            
            # Reset for next caller
            position += 1
            caller = segment
            speech = ""
            paragraph_len_list = []
            
        else:
            speech += segment + " "
            paragraph_len_list.append(len(segment.split()))
            
    df = pd.DataFrame(data[1:], columns=["position", "name", "speech", "tot_len", "avg_len", "sentiment"])
    
    return df

In [None]:
if os.path.exists(LOG_FILE):
    log = log_df.values.tolist()
else:
    log = []
    
for index, row in TICKER_DATA.iterrows():
    
    ticker = row["tic"]
    year = row["fyearq"]
    quarter = row["fqtr"]
    
    url = 'https://roic.ai/transcripts/{}:US/{}/{}'.format(*[ticker, year, quarter])

    print("{} {} Q{} - {}".format(*[ticker, year, quarter, url]))

    filename = os.path.join(OUTPUT_FOLDER, 
                            "{:}_{:}_Q{:}.csv".format(*[ticker, year, quarter])
                           )

    if os.path.exists(filename):
        print(f"The file '{filename}' already exists.")
        continue

    # Get text from URL
    earnings_transcript = scrape_transcript(url)

    # Convert text into list of lists
    earnings_transcript = clean_text(earnings_transcript)

    if earnings_transcript == ["Nothing to show", "We apologize for the inconvenience, but there is no content to display at this time."] or earnings_transcript == []:
        print("No data...")
        log.append([ticker, year, quarter, False])
        continue

    # Convert to dataframe
    earnings_transcript = split_text(earnings_transcript, year, quarter)

    # Save
    earnings_transcript.to_csv(filename, encoding='utf-8-sig')
    log.append([ticker, year, quarter, True])
    
    if (index + 1) % 10 == 0:
        log_df_new = pd.DataFrame(log, columns = ["tic", "fyearq", "fqtr", "captured"])
        log_df_new.to_csv(LOG_FILE)

    time.sleep(random.uniform(3, 7))

# Take advantage of div separation via xpath (WIP)

In [None]:
if 0:
    # Set up Selenium WebDriver
    driver = webdriver.Chrome()  

    # Navigate to the website
    driver.get("https://roic.ai/transcripts/ABT?y=2022&q=4")

    # Find all <div> elements with the class "p-3 rounded-lq false"
    WebDriverWait(driver, 10).until(EC.element_to_be_clickable((By.CLASS_NAME, "p-3"))).click()
    div_elements = driver.find_elements("xpath", '//*[@id="__next"]/div/main/div[3]/div/div[2]/div/div[2]')

    # Scrape the text content of each matching <div> element
    for div_element in div_elements:
        text = div_element.text
        print(text)

    # Close the browser
    driver.quit()