In [1]:
import pandas as pd
import numpy as np
import pickle
import re
from bs4 import BeautifulSoup

## XBRL元データ抽出

In [2]:
def make_original_xbrl(edinet_path, catcher_path):
    """EDIENTと有報キャッチャーのXBRLから統合データフレームを返す

    Args:
        edinet_path (str): EDINETから取得したXBRL解析後ファイルのパス
        catcher_path (str): EDINETから取得したXBRL解析後ファイルのパス

    Returns:
        pandas.DataFrame: XBRLデータフレーム
    """

    # ファイル読込
    with open(edinet_path, mode='rb') as file:
        df_stk_1 = pickle.load(file)
    with open(catcher_path, mode='rb') as file:
        df_stk_2 = pickle.load(file)

    # HTMLが存在する書類を返す
    df_stk = pd.concat([df_stk_1, df_stk_2], axis=0)
    df_stk = df_stk.query('not stk_html == 0')

    return df_stk

## 注釈テーブルを整形する

In [3]:
def annotation_original_to_df(html):
    """1社について、注釈テーブルを作成する

    Args:
        html (str): 注釈文章が入ったHTML

    Returns:
        pandas.DataFrame: 注釈データフレーム
    """
    # 注釈文章をpタグで分割したままのデータフレームを出力
    soup  = BeautifulSoup(html, 'html.parser')
    div_last = soup.findAll('div')[-1]
    annotation = div_last.findNextSiblings() #<p>タグ（等）が個数分だけ入る
    annot_list = [Translate(a.text).roundtonum().value.rstrip(' ').rstrip('　').replace('\u3000', '').replace('\xa0', '')
        for a in annotation] # 丸数字の変換とUTF-8文字の削除
    annot_list = [a for a in annot_list if a]
    df = pd.DataFrame(annot_list).T

    return df

In [4]:
def merge_annotations(df_stk):
    # 全企業について注釈テーブルを作成する
    df_list = []
    for code, name, ymd, html in zip(df_stk['edinet_code'], df_stk['firm_name'], df_stk['filling_ymd'], df_stk['stk_html']):
        annotation_df = annotation_original_to_df(html)
        firm_df = pd.concat([pd.DataFrame([code, name, ymd]).T, annotation_df], axis=1)
        columns_len = len(firm_df.columns)
        new_columns = [str(i) for i in range(columns_len)]
        firm_df.set_axis(labels=new_columns, axis=1, inplace=True)
        df_list.append(firm_df)

    df = pd.concat(df_list, axis=0)

    return df

In [5]:
edinet_path = 'C:/Users/koeci/Google ドライブ/MBA/ワークショップ/intermediate/tsutsumi_stockholders/xbrl_parsed_0924.pickle'
catcher_path = 'C:/Users/koeci/Google ドライブ/MBA/ワークショップ/intermediate/tsutsumi_stockholders/xbrl_parsed_1015.pickle'
df_stk = make_original_xbrl(edinet_path, catcher_path)

In [None]:
annotations_df = merge_annotations(df_stk=df_stk)
path = 'C:/Users/koeci/Google ドライブ/MBA/ワークショップ/output/tsutsumi_stockholders/OriginalAnnotation.csv'
# annotations_df.to_csv(path, encoding='cp932', header=False, index=False)

### 注釈テーブルの最終版を作成

In [6]:
def make_annotation_comp(annotation):
    df = annotation.copy()
    for col in annotation.columns:
        df[col] = df[col].apply(lambda x: x.lstrip().rstrip().replace('?', '') if x == x and x is not None else np.nan)

    return df

In [194]:
path = '../../output/tsutsumi_stockholders/Annotation.csv'
annotation = pd.read_csv(path, encoding='cp932')
annotation = make_annotation_comp(annotation)
save_path = '../../output/tsutsumi_stockholders/Annotation_comp.csv'
annotation.to_csv(save_path, encoding='cp932', index=False, header=True)

## 株主テーブルの修正前テーブルを作成

In [8]:
def make_stockholders_origin(html):
    """修正前の株主テーブルを作成する

    Args:
        html (str): BeautifulSoupで解析するHTML

    Returns:
        pandas.DataFrame: HTMLの表を読み込んだデータフレーム（複数ページにまたがる場合は結合する）
    """
    df_list = pd.read_html(html, header=0)
    if len(df_list) >= 2:
        df = pd.concat(df_list)
    else:
        df = df_list[0]

    # 列名を修正する
    cols = df.columns
    if len(cols) >= 1:
        if cols[0] == 'Unnamed: 0':
            new_cols = df.iloc[0, :]
            df.drop(index=0, inplace=True)
            df.set_axis(labels=new_cols, axis=1, inplace=True)

    len_cols = len(df.columns)
    new_cols = [i for i in range(3, 3 + len_cols)] # 左横に3つ（edinet_code, firm_name, filling_ymd）が入るので3から
    df.set_axis(labels=new_cols, axis=1, inplace=True)

    return df

In [9]:
def save_stockholders_original(df_stk):
    # 全企業について株主テーブルを作成する
    df_list = []
    for code, name, ymd, html in zip(df_stk['edinet_code'], df_stk['firm_name'], df_stk['filling_ymd'], df_stk['stk_html']):
        stockholders = make_stockholders_origin(html) # 株主テーブルの作成
        stockholders.insert(0, 'edinet_code', code)
        stockholders.insert(1, 'firm_name', name)
        stockholders.insert(2, 'filling_ymd', ymd)
        columns_len = len(stockholders.columns)
        new_columns = [str(i) for i in range(columns_len)]
        stockholders.set_axis(labels=new_columns, axis=1, inplace=True) # concatする際に列名重複エラーが出るので、番号列名に変える
        df_list.append(stockholders)

    df = pd.concat(df_list, axis=0)

    # 作業用に注釈番号を分割した列を持っておく
    stockholders_names = df.iloc[:, 3]
    stockholders_names = stockholders_names.apply(lambda x: str(x).replace('注', '※')) # 注を※に変えて変換しやすくする

    a_num_list = []
    for s_name in stockholders_names:
        name_split = str(s_name).split('※')
        if len(name_split) >= 2:
            a_num = name_split[1].replace('(', '').replace('（', '').replace(')', '').replace('）', '').replace('※', '')
            a_num = Translate(a_num).roundtonum().zentohan().value.replace('、', ',').replace('.', ',') # 全角半角、囲い数字、","の修正
        else:
            a_num = np.nan
        a_num_list.append(a_num)

    a_num_split_list = [str(a).split(',') for a in a_num_list]
    a_num_df = pd.DataFrame(a_num_split_list)

    # 株主テーブルに結合する
    df.reset_index(inplace=True)
    a_num_df.set_axis(labels=[str(i) for i in range(len(df.columns), len(df.columns) + len(a_num_df.columns))], axis=1, inplace=True)
    df = pd.concat([df, a_num_df], axis=1)

    return df

In [238]:
df = save_stockholders_original(df_stk=df_stk)
save_path = 'C:/Users/koeci/Google ドライブ/MBA/ワークショップ/output/tsutsumi_stockholders/stockholders_before_fix.xlsx'
df.to_excel(save_path, encoding='utf-8', index=False)

## 手作業で注釈テーブル・株主テーブルを修正したあとの操作

### 手作業で整形した株主テーブルをさらに整形する

In [11]:
def fix_more_stockholders(stockholders):
    """手作業で整形した株主テーブルを分割処理等にかける

    Args:
        stockholders (pandas.DataFrame): 目視での処理済みデータフレーム

    Returns:
        pandas.DataFrame: 処理を施したデータフレーム
    """
    df = stockholders.copy()

    # stockとstock_rateの（）を分割する
    for col in ['stock', 'stock_rates']:
        fixed_values = df[col].apply(lambda x: Translate(str(x)).zentohan().value.replace('<', '(').replace('>', ')'))

        main_value_list = []
        sub_value_list = []
        for value in fixed_values:
            split_value = value.split('(')
            if len(split_value) >= 2:
                # 元の値
                main_value = split_value[0]
                main_value = main_value.replace('(', '').replace(' ', '').replace(',', '').replace('?', '').replace('%', '')
                # ()内の値
                sub_value = split_value[1]
                sub_value = sub_value.replace(')', '').replace(' ', '').replace(',', '').replace('?', '').replace('%', '')
            else:
                main_value = value.replace('(', '').replace(' ', '').replace(',', '').replace('?', '').replace('%', '')
                sub_value = np.nan

            main_value_list.append(main_value)
            sub_value_list.append(sub_value)
        
        main_colname = col + '_main'
        sub_colname = col + '_sub'
        df[main_colname] = main_value_list
        df[sub_colname] = sub_value_list

    # 注釈番号列を修正する
    print(df.columns)
    a_num_col = [c for c in df.columns if 'a_num_' in c]
    for col in a_num_col:
        df[col] = df[col].apply(lambda x: str(x).replace(' ', '').replace('　', ''))

    # その他の行を修正する
    for col in ['firm_name', 'loc']:
        df[col] = df[col].apply(lambda x: x.replace('?', '') if x == x and x is not None else np.nan)

    return df

In [41]:
# データ読込
stockholder_path = '../../output/tsutsumi_stockholders/Stockholders.csv' # 株主テーブルの読込
stockholders = pd.read_csv(stockholder_path, encoding='cp932', header=0)
new_stockholders = fix_more_stockholders(stockholders=stockholders)
new_stockholders.drop_duplicates(subset=['edinet_code', 'stockholder'], keep='first', inplace=True)
save_path = '../../output/tsutsumi_stockholders/Stockholders_comp.csv'
new_stockholders.to_csv(save_path, encoding='cp932', header=True, index=False)

Index(['edinet_code', 'firm_name', 'filling_ymd', 'stockholder', 'a_num_1',
       'a_num_2', 'a_num_3', 'a_num_4', 'a_num_5', 'loc', 'stock',
       'stock_rates', 'stock_main', 'stock_sub', 'stock_rates_main',
       'stock_rates_sub'],
      dtype='object')


### 株主テーブルの最終チェック

In [42]:
check = new_stockholders['stock_main'].value_counts()

## 最終アウトプットを作成する

In [135]:
def make_output(nayose, security_table, annotation, stockholders):
    # すべてを１つのテーブルに収める
    df_comp = pd.merge(left=stockholders, right=nayose, on='edinet_code', how='left')
    df_comp = pd.merge(left=df_comp, right=annotation, on='edinet_code', how='left')

    # 注釈番号と注釈文章を対応させる
    for i in range(len([c for c in df_comp.columns if 'a_num_' in c])): # 注釈番号の数だけfor分を回していく
        value_list = []
        colname = 'a_num_' + str(i + 1)
        for edinet_code, a_num in zip(df_comp['edinet_code'], df_comp[colname]):
            if type(a_num) == float and a_num == a_num and a_num is not None:
                annotation_colname = 'a_' + str(int(a_num))
                try:
                    annot_text = annotation.loc[edinet_code, annotation_colname]
                except:
                    print(f'{edinet_code} was not matched')
                    annot_text = None
            else:
                annot_text = None

            value_list.append(annot_text) # 注釈文章を格納していく

        new_colname = 'a_text_' + str(i + 1)
        df_comp[new_colname] = value_list

    # 証券コードの補完：証券コードNAとそれ以外で分割→補完→マージ
    df_comp_nona = df_comp.copy()[df_comp['security_code'] == df_comp['security_code']]
    df_comp_na = df_comp.copy()[~(df_comp['security_code'] == df_comp['security_code'])]
    fixed_security_code = list(map(lambda x: security_table[x], df_comp_na['edinet_code']))
    df_comp_na['security_code'] = fixed_security_code
    df_comp = pd.concat([df_comp_nona, df_comp_na], axis=0)

    # 各列の最終修正
    df_comp['stock_sub'] = df_comp['stock_sub'].replace('-', '').replace('―', '')
    df_comp['stock_rates_sub'] = df_comp['stock_rates_sub'].replace('-', '').replace('―', '')
    for col in ['edinet_code', 'firm_name', 'filling_ymd', 'stockholder', 'loc', 'stock',
    'stock_rates', 'a_text_1', 'a_text_2', 'a_text_3', 'a_text_4', 'a_text_5']:
        df_comp[col] = df_comp[col].apply(lambda x: str(x).replace('?', ''))

    # 作業用に注釈番号を分割した列を持っておく
    stockholders_names = df_comp['stockholder'].apply(lambda x: str(x).replace('注', '※').replace('(', '※').replace('（', '※').replace('＊', '※'))
    new_stockholders = []
    for row in stockholders_names:
        new_stockholders.append(row.split('※')[0])

    df_comp['stockholder_rem'] = new_stockholders
        
    for i in range(len([c for c in df_comp.columns if 'a_text_' in c])):
        col = 'a_text_' + str(i + 1)
        df_comp[col] = df_comp[col].apply(lambda x: x.rstrip() if x == x and x is not None else np.nan)

    df_comp['stock_main'] = df_comp['stock_main'].apply(lambda x: int(x))
    df_comp['stock_rates_main'] = df_comp['stock_rates_main'].apply(lambda x: float(x))

    return df_comp

In [136]:
nayose_path = '../../data/EDINET/code_list/edinet_security_code.csv'
security_na_path = '../../intermediate/tsutsumi_stockholders/securities_na.csv'
annotation_path = '../../output/tsutsumi_stockholders/Annotation_comp.csv'
stockholder_path = '../../output/tsutsumi_stockholders/Stockholders_comp.csv'

nayose = pd.read_csv(nayose_path, encoding='cp932', dtype={'security_code': str})
security_na = pd.read_csv(security_na_path, encoding='cp932', header=0)
security_table = {e: s for e, s in zip(security_na['edinet_code'], security_na['security_code'])}
annotation = pd.read_csv(annotation_path, encoding='cp932')
a_cols = ['edinet_code'] + [col for col in annotation.columns if 'a_' in col]
annotation = annotation[a_cols]
annotation.set_index(keys='edinet_code', inplace=True)
stockholders = pd.read_csv(stockholder_path, encoding='cp932')

output = make_output(nayose=nayose, security_table=security_table, annotation=annotation, stockholders=stockholders)
output = output[[
    'edinet_code', 'security_code', 'firm_name', 'filling_ymd', 'stockholder', 'stockholder_rem', 'loc', 'stock',
    'stock_rates', 'stock_main', 'stock_sub', 'stock_rates_main',
    'stock_rates_sub', 'a_num_1', 'a_num_2', 'a_num_3', 'a_num_4', 'a_num_5', 'a_text_1', 'a_text_2',
    'a_text_3', 'a_text_4', 'a_text_5'
]]

save_path = '../../output/tsutsumi_stockholders/stockholders_data_v2.csv'
output.to_csv(save_path, encoding='cp932', header=True, index=False)

## テスト

In [75]:
print(output.count()['edinet_code'])
print(output.drop_duplicates(subset=['edinet_code', 'stockholder'])['edinet_code'].count())
print(output[output.duplicated(subset=['edinet_code', 'stockholder']) == True]['edinet_code'].drop_duplicates())

28464
28464
Series([], Name: edinet_code, dtype: object)


In [76]:
check = output.groupby('edinet_code')['stock_rates_main'].sum()
check = output['stock_main'].value_counts()

In [108]:
output = pd.read_csv('../../output/tsutsumi_stockholders/stockholders_data_v2.csv', encoding='cp932', dtype={'stock_main': 'int', 'stock_rates_main': 'float'})
output.dtypes

edinet_code          object
security_code         int64
firm_name            object
filling_ymd          object
stockholder          object
loc                  object
stock                object
stock_rates          object
stock_main            int32
stock_sub           float64
stock_rates_main    float64
stock_rates_sub     float64
a_num_1             float64
a_num_2             float64
a_num_3             float64
a_num_4             float64
a_num_5             float64
a_text_1             object
a_text_2             object
a_text_3             object
a_text_4             object
a_text_5             object
dtype: object

In [196]:
count = output.count()
count.to_csv('../../output/tsutsumi_stockholders/sample_size.csv', encoding='cp932')
count = annotation.count()
count.to_csv('../../output/tsutsumi_stockholders/annotation_sample_size.csv', encoding='cp932')

## 修正クラス

In [13]:
class Translate:
    ZENKAKU = ''.join(chr(0xff01 + i) for i in range(94)) # 全角文字
    HANKAKU = ''.join(chr(0x21 + i) for i in range(94)) # 半角文字
    ZENTOHAN = str.maketrans(ZENKAKU, HANKAKU) # 全角→半角
    HANTOZEN = str.maketrans(HANKAKU, ZENKAKU) # 半角→全角
    ROUNDNUM = ''.join(chr(0x2460 + i) for i in range(9)) # 丸文字
    NUM = ''.join(str(i) for i in range(1, 10)) # 数字
    ROUNDTONUM = str.maketrans(ROUNDNUM, NUM) # 丸文字→数字
    ADHOC_DICT = {'注': '※', '*': '※'} # 手動で整形する文字
    ADHOC_TRANS = str.maketrans(ADHOC_DICT) # 変換テーブル

    def __init__(self, value):
        self.value = value
        

    def zentohan(self, zentohan=True):
        if zentohan is True:
            self.value = self.value.translate(self.ZENTOHAN)
            return self
        elif zentohan is False:
            self.value = self.value.translate(self.HANTOZEN)
            return self
        else:
            print('変換失敗')
            return None


    def roundtonum(self):
        self.value = self.value.translate(self.ROUNDTONUM)
        return self


    def adhoc_trans(self):
        self.value = self.value.translate(self.ADHOC_TRANS)
        return self

In [14]:
class Fix:
    def __init__(self, value):
        self.value = value

    def fix_value(self):
        value_split = self.value.splitlines()
        value_join = ''.join(value_split)
        translate = Translate(value_join)
        value_fixed = translate.zentohan().roundtonum().adhoc_trans().value # Translateオブジェクトのvalue変数(str)を抽出
        value_fixed = value_fixed.replace(' ', '').replace('　', '').replace('\xa0', '').replace('\u3000', '')
        self.value = value_fixed

        return self


    def fix_annotation_pre(self):
        pattern_pre = r'^[0-9]\.|^[0-9]|^\(※\)|^※|^※[0-9]|^\(※\)[0-9]\.|\(※[0-9]\)|^\(|^\)|、[0-9]'
        repl = ''
        annot_fixed = re.sub(pattern=pattern_pre, repl=repl, string=self.value, count=10)
        self.value = annot_fixed

        return self


    def fix_annotation_suff(self):
        pattern_suff = r'[0-9]$|[0-9]\.$|、$|※[0-9]$'
        repl = ''
        annot_fixed = re.sub(pattern=pattern_suff, repl=repl, string=self.value, count=10)
        self.value = annot_fixed

        return self


    def fix_annotation_adhoc(self, pattern, repl):
        """
        指定した文字を指定した文字に置き換える関数
        """
        annot_list = [re.sub(pattern=pattern, repl=repl, string=a) for a in self.value]
        self.value = annot_list

        return self        