In [None]:
# 株たんのプレミアム会員限定のデータを収集してくる

In [None]:
from pathlib import Path
import re
import csv
import datetime
import time

from selenium import webdriver
from selenium.webdriver.common.by import By
from pydantic import BaseModel, Field
import requests 
from bs4 import BeautifulSoup
import xlrd

import stock

In [None]:
def save_code_list_to_csv(code_list_xls_path: Path, output_csv_path: Path):
    workbook = xlrd.open_workbook(code_list_xls_path)
    sheets = workbook.sheets()

    rows = []
    for i in range(sheets[0].nrows):
        rows.append([str(col).replace(".0", "") for col in sheets[0].row_values(i)[1:]])
    workbook.release_resources()

    output_csv_path.parent.mkdir(exist_ok=True, parents=True)
    with open(output_csv_path, "w", encoding="utf-8") as f:
        csv_writer = csv.writer(f)
        csv_writer.writerows(rows)

In [None]:
code_list_xls = Path("../data/data_j.xls")
code_list_csv = Path("../data/data_j.csv")
# save_code_list_to_csv(code_list_xls, code_list_csv)

In [None]:
class Stock(BaseModel):
    code: str
    name: str
    market_division: str
    industry_code33: str
    industry_division33: str
    industry_code17: str
    industry_division17: str
    scale_code: str
    scale_division: str

    def to_csv_row(self):
        return "{},{},{},{},{},{},{},{},{}".format(
            self.code, self.name, self.market_division, self.industry_code33, self.industry_division33,
            self.industry_code17, self.industry_division17, self.scale_code, self.scale_division
        )
    
    @classmethod
    def from_csv_row(cls, row: list[str]):
        return cls(
            code=row[0], name=row[1], market_division=row[2],
            industry_code33=row[3], industry_division33=row[4],
            industry_code17=row[5], industry_division17=row[6],
            scale_code=row[7], scale_division=row[8]
        )
    
def load_stock_list_csv(csv_path: Path) -> list[Stock]:
    with open(csv_path, "r" , encoding="utf-8") as f:
        cols = list(csv.reader(f))[1:]
    return [Stock.from_csv_row(row) for row in cols]

In [None]:
def convert_to_number(val_str):
    if val_str == "－":
        return None
    val_str = val_str.replace(",", "")
    match = re.search("-*\d+\.*\d*", val_str)
    if match is None:
        return 0
    if "." in val_str:
        return float(match.group(0))
    return int(match.group(0))

In [None]:
class FinancialStatement(BaseModel):
    code: str
    year: int
    month: int
    duration: int
    announce_date: datetime.datetime | None
    is_prediction: bool
    total_revenue: int | None
    operating_income: int | None
    ordinary_profit: int | None
    net_income: int | None
    eps: float | None
    divident: float | None

    # @classmethod
    # def from_html_row(cls, row: list[str]):
    #     pass
    def to_csv_row(self):
        return "{},{},{},{},{},{},{},{},{},{},{},{}".format(
            self.code, self.year, self.month, self.duration, "" if self.announce_date is None else self.announce_date.strftime("%y/%m/%d"),
            self.is_prediction, self.total_revenue, self.operating_income, self.ordinary_profit, self.net_income, 
            self.eps, self.divident
        )
    
    @staticmethod
    def get_csv_header():
        return "year,month,duration,annoounce_date,is_prediction,total_revenue,operating_income,ordinary_profit,net_income,eps,divident"

def results_to_csv(results: list[FinancialStatement], output_path: Path):
    """
    """
    rows = [res.to_csv_row for res in results]

    with open(output_path, "w", encoding="utf-8") as f:
        csv_writer = csv.writer(f)
        csv_writer.writerow(FinancialStatement.get_csv_header())
        csv_writer.writerows(rows)

In [None]:
def get_annual_results(soup: BeautifulSoup, code: str):
    headers = [
        "決算期",
        "売上高",
        "営業益",
        "経常益",
        "最終益",
        "修正1株益",
        "修正1株配",
        "発表日",
    ]

    year_result_div = soup.find("div", {"class": "fin_year_result_d"})
    if year_result_div is None:
        return []
    regex = re.compile("(\d+)\.(\d+)")
    table = year_result_div.find("table")
    prev_year = -1
    prev_month = -1
    indices = [headers.index(header.text) for header in table.find("thead").find_all("th")]
    indices = [idx if idx < indices[0] else idx - 1 for idx in indices[1:]]

    results = []
    for row in table.find("tbody").find_all("tr"):
        th = row.find("th") 
        if th is None:
            continue
        match = regex.search(th.text)
        if match is None:
            continue
        year, month = int(match.group(1)), int(match.group(2))
        duration = 12
        if prev_year > 0 and prev_month > 0:
            duration = (year - prev_year) * 12 + month - prev_month

        cols = [col.text for col in row.find_all("td")]

        results.append(FinancialStatement(
            code=code,
            year=year,
            month=month,
            duration=duration,
            announce_date=None if cols[indices[6]] == "－" else datetime.datetime.strptime(cols[indices[6]], "%y/%m/%d"),
            is_prediction= "予" in th.text,
            total_revenue=convert_to_number(cols[indices[0]]),
            operating_income=convert_to_number(cols[indices[1]]),
            ordinary_profit=convert_to_number(cols[indices[2]]),
            net_income=convert_to_number(cols[indices[3]]),
            eps=convert_to_number(cols[indices[4]]),
            divident=convert_to_number(cols[indices[5]]),
        ))
        prev_year, prev_month = year, month
    return results

In [None]:
def get_annual_predictions(soup: BeautifulSoup, code: str):
    headers = [
        "決算期", 
        "修正日",
        "－\xa0",
        "修正方向",
        "売上高",
        "営業益",
        "経常益",
        "最終益",
        "修正配当"
    ]

    year_result_div = soup.find("div", {"class": "fin_year_forecast_d"})
    if year_result_div is None:
        return []
    regex = re.compile("(\d+)\.(\d+)")
    table = year_result_div.find("table")
    prev_year = -1
    prev_month = -1
    indices = [headers.index(header.text) for header in table.find("thead").find_all("th")]
    indices = [idx if idx < indices[0] else idx - 1 for idx in indices[1:]]

    results = []
    for row in table.find("tbody").find_all("tr", recursive=False):
        try:
            match = regex.search(row.find_all("td", recursive=False)[1].text)
        except:
            return []
        
        if match is not None:
            year, month = int(match.group(1)), int(match.group(2))
            duration = 12
            if prev_year > 0 and prev_month > 0:
                duration = (year - prev_year) * 12 + month - prev_month
            cols = [col.text for col in row.find_all("td", recursive=False)[2:]]
        else:
            cols = [col.text for col in row.find_all("td", recursive=False)[1:]]
        
        if cols[indices[1]] == "実":
            continue

        results.append(FinancialStatement(
            code=code,
            year=year,
            month=month,
            duration=duration,
            announce_date=None if cols[indices[0]] == "－" else datetime.datetime.strptime(cols[indices[0]], "%y/%m/%d"),
            is_prediction=True,
            total_revenue=convert_to_number(cols[indices[3]]),
            operating_income=convert_to_number(cols[indices[4]]),
            ordinary_profit=convert_to_number(cols[indices[5]]),
            net_income=convert_to_number(cols[indices[6]]),
            eps=None,
            divident=convert_to_number(cols[indices[7]]),
        ))
        prev_year, prev_month = year, month
    return results

In [None]:
def get_quarter_results(soup: BeautifulSoup, code: str):
    headers = [
        "決算期",
        "売上高",
        "営業益",
        "経常益",
        "最終益",
        "修正1株益",
        "売上営業損益率",
        "発表日",
    ]

    quarter_result_div = soup.find("div", {"class": "fin_quarter_result_d"})
    if quarter_result_div is None:
        return []
    regex = re.compile("(\d+)\.(\d+)-(\d+)")
    table = quarter_result_div.find("table")
    indices = [headers.index(header.text) for header in table.find("thead").find_all("th")]
    indices = [idx if idx < indices[0] else idx - 1 for idx in indices[1:]]

    results = []
    for row in table.find("tbody").find_all("tr"):
        th = row.find("th") 
        if th is None:
            continue
        match = regex.search(th.text)
        if match is None:
            continue
        year, start_month, end_month = int(match.group(1)), int(match.group(2)), int(match.group(3))
        duration = end_month + 1 - start_month
        if duration < 0:
            duration += 12

        cols = [col.text for col in row.find_all("td")]

        results.append(FinancialStatement(
            code=code,
            year=2000 + year,
            month=end_month,
            duration=duration,
            announce_date=None if cols[indices[6]] == "－" else datetime.datetime.strptime(cols[indices[6]], "%y/%m/%d"),
            is_prediction= "予" in th.text,
            total_revenue=convert_to_number(cols[indices[0]]),
            operating_income=convert_to_number(cols[indices[1]]),
            ordinary_profit=convert_to_number(cols[indices[2]]),
            net_income=convert_to_number(cols[indices[3]]),
            eps=convert_to_number(cols[indices[4]]),
            divident=None,
        ))
    return results

In [None]:
stocks = load_stock_list_csv(code_list_csv)
driver = stock.kabutan.generate_logined_driver()

In [None]:
output_dir = Path("../data/financial/")
output_dir.mkdir(exist_ok=True, parents=True)

for stock in stocks:
    output_path = output_dir / "{}.csv".format(stock.code)
    if output_path.exists():
        continue

    driver.get(f"https://kabutan.jp/stock/finance?code={stock.code}")
    soup = BeautifulSoup(driver.page_source)
    results = get_annual_results(soup, stock.code)
    results += get_annual_predictions(soup, stock.code)
    results += get_quarter_results(soup, stock.code)
    csv_rows = "\n".join([res.to_csv_row() for res in results])
    
    with_header = not output_path.exists()
    with open(output_path, "a", encoding="utf-8") as f:
        if with_header:
            f.write(FinancialStatement.get_csv_header() + "\n")
        f.write(csv_rows)
    print(f"Save to {output_path}. Number : {len(results)}")
    time.sleep(1.0)


In [None]:
driver.page_source