In [None]:
%load_ext lab_black

In [None]:
import selenium
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup
from time import sleep
from datetime import datetime
import requests
import numpy as np
import time
import re
from tqdm import tqdm
import pickle
import tempfile
from collections import defaultdict
import os

import selenium
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.common.exceptions import NoSuchElementException

import logging

logging.basicConfig(level=logging.INFO, filename="../logs/scrape.log")

In [None]:
companies = pd.read_csv(
    "../data/intermediate/companies_filling_minimal.csv", index_col=0
)
companies["most_recent_filling"] = pd.to_datetime(companies["most_recent_filling"])
companies_more_2008 = companies[companies["most_recent_filling"].dt.year > 2008]

ciks = list(companies_more_2008["CIK"])
len(ciks)

In [None]:
ciks_2021 = companies.loc[companies["most_recent_filling"].dt.year == 2021, "CIK"]
len(ciks_2021)

In [None]:
sp500 = pd.read_csv("../data/processed/sp500.csv")
ciks = sp500["CIK"]
len(ciks)

In [None]:
headers = {"User-Agent": "Anselme F.E. Borgeaud (aborgeaud@gmail.com)"}

In [None]:
def get_date(soup) -> datetime:
    ps = soup.findAll("p")
    bs = soup.findAll("b")
    texts = [
        p.get_text(strip=True).replace(u"\xa0", u" ")
        for p in ps + bs
        if "fiscal year ended" in p.text
    ]
    if len(texts) > 0:
        match = re.findall(r"ended .*[0-9]{4}", texts[0])
        if len(match) == 1:
            date_str = match[0].replace("ended ", "")
            try:
                date = datetime.strptime(date_str, "%B %d, %Y")
                return date
            except:
                return None


def find_value(td_elem) -> float:
    def is_number(s):
        return len(re.findall(r"[0-9]+", s)) > 0

    elem = td_elem.find_next_sibling("td")
    if elem is None:
        return None
    i = 0
    while elem and not is_number(elem.text) and i < 4:
        elem = elem.find_next_sibling("td")
    if elem is None:
        return None
    if is_number(elem.text):
        try:
            return float(elem.text.replace(",", ""))
        except ValueError:
            return None


def find_value_in_table(soup, key: str) -> float:
    key_elem = None
    key_text = key.strip().replace("\n", "_").replace(" ", "_").lower()
    for e in soup.findAll("td"):
        text = e.text.strip().replace("\n", "_").replace(" ", "_").lower()
        matches = re.findall(key_text, text)
        if len(matches) > 0:
            key_elem = e
            break
    #         if key_text == text:
    #             key_elem = e
    if key_elem:
        return find_value(key_elem)


def find_value_in_table_txt(text: str, key: str) -> float:
    key_text = key.strip().lower().replace(" ", "_")
    text_ = text.lower().replace(" ", "_")
    occurences = [m.start() for m in re.finditer(key_text, text_)]
    if len(occurences) > 0:
        occurence = occurences[0]
        line = text_[occurence : occurence + 200].replace(",", "")
        values = re.findall(r"[0-9]+", line)
        if len(values) > 0:
            value = values[0]
            try:
                value = float(value)
            except ValueError:
                value = None

In [None]:
def expend_10k_button(driver):
    elem_10k = None
    for elem in driver.find_elements_by_class_name("expandCollapse"):
        parent = elem.find_element_by_xpath("..")
        if "10-K" in parent.text:
            elem_10k = parent
            break
    if elem_10k:
        elem_10k.click()


def click_view_all_10k(driver):
    elem_view10k = driver.find_element_by_xpath(
        '//button[@data-group="annualOrQuarterlyReports"]'
    )
    elem_view10k.click()


def input_search_10k(driver):
    search_elem = driver.find_element_by_xpath('//input[@placeholder="Search table"]')
    # blank space to avoid 10-K/A (amendments)
    search_elem.send_keys("10-K ")


def html_url_from_xbrl_viewer(annual_report_elem, driver):
    annual_report_elem.click()
    driver.switch_to.window(driver.window_handles[1])
    t = 1
    time.sleep(1)
    html_elem = None
    url = None
    while not html_elem and t < 10:
        try:
            menu_elem = driver.find_element_by_xpath('//a[@id="menu-dropdown-link"]')
            menu_elem.click()
            time.sleep(0.2)
            html_elem = driver.find_element_by_id("form-information-html")
        except:
            time.sleep(1)
            t += 1
    if html_elem:
        url = html_elem.get_attribute("href")
    driver.close()
    driver.switch_to.window(driver.window_handles[0])
    return url


def is_xbrl(url):
    return r"ix?doc=" in url

In [None]:
entries = [
    "total_current_assets",
    "total_current_liabilities",
    "long-term_debt",
    "total_liabilities",
    "total_equity",
    r"total_stockholders.?.?equity",
    r"earnings_per_share_attributable.*diluted",
]


def run(ciks: list):
    driver = webdriver.Chrome()
    time.sleep(8)

    report_urls = []
    report_pages = defaultdict(list)
    data_dict = defaultdict(list)
    data_df = None

    previous_file_report = None
    previous_file_df = None
    for i, cik in enumerate(tqdm(ciks)):
        if (i > 0) and (i % 30 == 0):
            timestamp = time.time_ns()
            file_report = f"../data/intermediate/report_urls{timestamp}.pickle"
            file_page = f"../data/intermediate/report_pages_{i//30}.pickle"
            file_df = f"../data/intermediate/scraped_financials{timestamp}.csv"

            with open(file_report, "wb") as f:
                pickle.dump(report_urls, f)
            d = {k: v for k, v in report_pages.items()}
            with open(file_page, "wb") as f:
                pickle.dump(d, f)
            data_df = pd.DataFrame(data_dict)
            data_df.to_csv(file_df)

            if previous_file_report:
                os.remove(previous_file_report)
                os.remove(previous_file_df)
            previous_file_report = file_report
            previous_file_df = file_df

            del report_pages
            report_pages = defaultdict(list)

        url = f"https://www.sec.gov/edgar/browse/?CIK={cik}"
        try:
            driver.get(url)
        except:
            logging.info(f"{url} Did not get page")
            continue

        # TODO in while loop to decrease wait time
        time.sleep(4)

        try:
            input_search_10k(driver)
        except:
            logging.info(f"{url} Did not input 10-K")
            try:
                expend_10k_button(driver)
                time.sleep(1.5)
                click_view_all_10k(driver)
                time.sleep(1.5)
                input_search_10k(driver)
            except:
                logging.info(f"{url} Did not expend and input 10-K")
                continue

        link_elems = driver.find_elements_by_class_name("document-link")
        annual_report_elems = [
            e
            for e in link_elems
            if "Annual report" in e.text
            and "right" in e.get_attribute("data-placement")
        ]
        annual_report_pages = []

        for annual_report_elem in annual_report_elems:
            #             try:
            page_url = annual_report_elem.get_property("href")
            if is_xbrl(page_url):
                page_url = html_url_from_xbrl_viewer(annual_report_elem, driver)
            page = requests.get(page_url, headers=headers, timeout=5)
            annual_report_pages.append(page)
        #             except:
        #                 href_elem = annual_report_elem.get_property("href")
        #                 logging.info(f"{href_elem} Did not fetch report page")
        #                 pass

        report_urls.extend([a.get_property("href") for a in annual_report_elems])
        page_texts = []
        for a in annual_report_pages:
            if a.url.endswith(".txt"):
                text = a.text
            else:
                asoup = BeautifulSoup(a.content, "html.parser")
                ps = asoup.findAll("p")
                text = "\n".join([p.text for p in ps])
            page_texts.append(text)
        report_pages[cik].extend(page_texts)

        for i, page in enumerate(annual_report_pages):
            if page is None:
                continue

            try:
                date_str = driver.find_elements_by_xpath(
                    '//a[@data-index="reportDate"]'
                )[i].text
                date = datetime.fromisoformat(date_str)
            except:
                logging.info(f"{url} Did not read date")
                continue

            soup = BeautifulSoup(page.content, "html.parser")

            tmp_dict = None
            #             try:
            tmp_dict = dict()
            for entry in entries:
                if page.url.endswith(".txt"):
                    tmp_dict[entry] = find_value_in_table_txt(page.text, entry)
                else:
                    tmp_dict[entry] = find_value_in_table(soup, entry)
            #             except:
            #                 logging.info(f"{page.url} Did not read html")
            #                 continue
            if (tmp_dict.keys() - set(entries)) == set():
                for entry, value in tmp_dict.items():
                    data_dict[entry].append(value)
                data_dict["CIK"].append(cik)
                data_dict["date_filled"].append(date)
                data_dict["url"].append(page.url)

    data_df = pd.DataFrame(data_dict)
    data_df.to_csv("../data/intermediate/scraped_financials.csv")

    with open("../data/intermediate/report_urls.pickle", "wb") as f:
        pickle.dump(report_urls, f)
    d = {k: v for k, v in report_pages.items()}
    with open("../data/intermediate/report_pages.pickle", "wb") as f:
        pickle.dump(d, f)

In [None]:
run(ciks)

In [None]:
# TEST TEST

url = "https://www.sec.gov/edgar/browse/?CIK=351569"

driver = webdriver.Chrome()
driver.get(url)

In [None]:
expend_10k_button(driver)

In [None]:
click_view_all_10k(driver)

In [None]:
input_search_10k(driver)

In [None]:
link_elems = driver.find_elements_by_class_name("document-link")
annual_report_elems = [
    e
    for e in link_elems
    if "Annual report" in e.text and "right" in e.get_attribute("data-placement")
]

for a in annual_report_elems:
    print(a.get_property("href"), a.text)

annual_report_elem = annual_report_elems[0]

In [None]:
page = requests.get(
    annual_report_elem.get_property("href"), headers=headers, timeout=5
)
page.status_code

In [None]:
soup = BeautifulSoup(page.content)

In [None]:
date_str = driver.find_elements_by_xpath(
    '//a[@data-index="reportDate"]'
)[0].text

date = datetime.fromisoformat(date_str)
date

In [None]:
annual_report_elem.click()

In [None]:
driver.switch_to.window(driver.window_handles[1])

In [None]:
driver.close()

In [None]:
menu_elem = driver.find_element_by_xpath('//a[@id="menu-dropdown-link"]')
menu_elem.click()

In [None]:
html_elem = driver.find_element_by_id("form-information-html")
href = html_elem.get_attribute("href")

In [None]:
href

In [None]:
page2 = requests.get(href, headers=headers)

In [None]:
soup2 = BeautifulSoup(page2.content, "html.parser")

In [None]:
print(soup.title)
print(soup2.title)

In [None]:
soup.get_text()[-1000:]

In [None]:
soup2.get_text()[-1000:]

In [None]:
find_value_in_table(soup2, 'total_current_assets')

In [None]:
entries = ["total_current_assets", "total_current_liabilities"]

find_value_in_table(soup, entries[1])

In [None]:
key = entries[0]
key_elem = None
key_text = key.strip().replace("\n", "_").replace(" ", "_").lower()
for e in soup.findAll("td"):
    text = e.text.strip().replace("\n", "_").replace(" ", "_").lower()
    print(text)
    if key_text == text:
        key_elem = e

In [None]:
soup

In [None]:
annual_report_elem.get_property("href")

In [None]:
# TEST txt

url = "https://www.sec.gov/Archives/edgar/data/0000839759/000104746909002862/a2191671z10-k.txt"
page = requests.get(url, headers=headers)

In [None]:
key_text = "total assets"
key_text = key_text.strip().lower().replace(" ", "_")
text = page.text.lower().replace(" ", "_")
occurences = [m.start() for m in re.finditer(key_text, text)]
if len(occurences) > 0:
    occurence = occurences[0]
    line = text[occurence : occurence + 200].replace(",", "")
    values = re.findall(r"[0-9]+", line)
    if len(values) > 0:
        value = values[0]
        try:
            value = float(value)
        except ValueError:
            value = None

In [None]:
value

In [None]:
d = {0: [0, 1], 1: [0, 1, 2]}
del d

In [None]:
d