In [None]:
#the code is to replicate the paper "Annual report readability, current earnings,and earnings persistence" by Feng Li, 2008
#using the data from 2016 to 2021
#collect my sample as follows: 
#(1) start with the intersection of CRSP-COMPUSTAT firm-years.
#(2) match GVKEY (from COMPUSTAT) and PERMNO (from CRSP) with the Central Index Key (CIK) used by SEC online Edgar system. Firms without matching CIK are dropped. 
#(3) download the 10-K filings from Edgar for every remaining firm-year. Those firm-years that do not have electronic 10-K filings on Edgar are then excluded.
#(4) For each 10-K file, all the heading items, paragraphs that have fewer than one line, and tables are deleted and those 10-K filings that have less than 3,000 words or 100 lines of remaining text are dropped. The calculation of the annual report readability is based on the remaining text. Details of these steps are presented in Appendix A. It is important to delete the tables and financial statements in this step— since the readability indices are designed for text rather than for numbers or tables. 
#(5) Finally, firm-years that have operating earnings (scaled by book value of assets) greater than 1 or less than 1 are deleted from the sample. This yields a sample of * firm-years with annual report filing dates between 2016 and 2021. Since most of the firms have a December fiscal year end, my sample mainly covers the fiscal years 2015–2020.

In [2]:
import pandas as pd
import os
from bs4 import BeautifulSoup
import re
import numpy as np
from nltk.tokenize import sent_tokenize, word_tokenize

In [None]:
# 3. Clean the 10-K filings. and extract the item 7 of the 10-K filings
# 4. Calculate the readability indices.
# 5. Merge the fog&length with the bogdate

In [None]:
####################
# 3. Clean the 10-K filings. and extract the item 7 of the 10-K filings
####################

In [None]:
def clean_10k_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read()

        # Remove SEC header and other unwanted tags
        content = re.sub(r'<SEC-HEADER>.*?</SEC-HEADER>', '', content, flags=re.DOTALL)
        content = re.sub(r'<TABLE>.*?</TABLE>', '', content, flags=re.DOTALL)
        content = re.sub(r'<S>.*?</S>', '', content, flags=re.DOTALL)
        content = re.sub(r'<C>.*?</C>', '', content, flags=re.DOTALL)
        content = re.sub(r'<.*?>', '', content)  
        content = re.sub(r'&.*?;', '', content)  


        # Remove paragraphs with more than 50% non-alphabetic characters 
        # Remove paragraphs with fewer than 1 line
        paragraphs = content.split('\n')  # Split content into paragraphs
        

        cleaned_paragraphs = [
            paragraph for paragraph in paragraphs
            if len(paragraph) > 0 and len(re.findall(r'[^a-zA-Z]', paragraph)) / len(paragraph) <= 0.5  and len(paragraph.splitlines()) > 0]

        content = '\n\n'.join(cleaned_paragraphs)  # Reassemble the cleaned paragraphs

        word_count = len(content.split())
        line_count = len(re.split(r'(?<=\.)\s+', content))

        if word_count < 3000 or line_count < 100:
            return None

        return content

    except Exception as e:
        print(f"Error processing file {file_path}: {e}")
        return None

In [4]:
def process_10k_files(base_directory, year_range, quarters):
    for year in year_range:
        for quarter in quarters:
            quarter_directory = os.path.join(base_directory, str(year), quarter)
            
            if not os.path.exists(quarter_directory):
                os.makedirs(quarter_directory)

            for file_name in os.listdir(quarter_directory):
                if file_name.endswith('.txt'): 
                    file_path = os.path.join(quarter_directory, file_name)
                    cleaned_text = clean_10k_file(file_path)

                    if cleaned_text:
                        output_file_path = os.path.join(base_directory, 'filings', str(year), quarter, f'cleaned_{file_name}')
                        
                        os.makedirs(os.path.dirname(output_file_path), exist_ok=True)

                        try:
                            with open(output_file_path, 'w', encoding='utf-8') as output_file:
                                output_file.write(cleaned_text)
                            print(f"Processed and saved: {output_file_path}")
                        except Exception as e:
                            print(f"Error writing cleaned file {output_file_path}: {e}")

In [None]:
#process_10k_files('C:/Users/chenw/Desktop/ws/filings', range(2016, 2023), ['QTR1', 'QTR2', 'QTR3', 'QTR4'])

In [192]:
def find_mda(content):
    
    beginning_patterns = [
    # (1)/(3) 
    r'(?i)^(?!.*\bsee\b)(?!.*\brefer\b)\s*7\.\s*management\s*[\'\u2019]?\s*(?:s?\s*)?discussion', 
    # (2)/(4)
    r'(?i)^(?!.*\bsee\b)(?!.*\brefer\b)\s*item\s+7',
    r'(?i)^(?!.*\bsee\b)(?!.*\brefer\b)\s*i\s*t\s*e\s*m\s+7']

    ending_patterns =[
    r'^\s*Financial\s+Statements', 
    r'(?i)^(?!.*\bsee\b)(?!.*\brefer\b)\s*item\s+8', 
    r'^\s*Supplementary\s+Data',  
    r'^\s*SUMMARY\s+OF\s+SELECTED\s+FINANCIAL\s+DATA',
    r'(?i)^(?!.*\bsee\b)(?!.*\brefer\b)\s*i\s*t\s*e\s*m\s+8']

    lines = content.splitlines()
    lines = [line.strip() for line in lines if line.strip() != '']

    # Convert the cleaned lines into a DataFrame
    lines_df = pd.DataFrame(lines, columns=['text'])

    mda_sections = []

    i = 0
    while i < len(lines_df):
        mda_start = None
        
        for j in range(i, len(lines_df)):
            for pattern in beginning_patterns:
                if re.search(pattern, lines_df['text'][j], re.IGNORECASE):
                    mda_start = j
                    i = j + 1  
                    break
            if mda_start is not None:
                break

        if mda_start is None:
            break
        
        mda_end = None
        # Find the closest mda_end after mda_start
        for j in range(mda_start, len(lines_df)):
            for pattern in ending_patterns:
                if re.search(pattern, lines_df['text'][j], re.IGNORECASE):
                    mda_end = j
                    break
            if mda_end is not None:
                break
        

        if mda_end is not None:
            mda_section_lines = lines_df.iloc[mda_start:mda_end + 1]  
            mda_sections.append({
                'mda_start': mda_start,
                'mda_end': mda_end,
                'mda_section_lines': mda_section_lines['text'].tolist()  
            })
            i = mda_end + 1  

    if not mda_sections:
        print("No MD&A sections found.")
    else:
        print(f"Found {len(mda_sections)} MD&A sections.")

    return mda_sections

In [None]:
def process_mda_files(base_directory, year_range, quarters):
    for year in year_range:
        for quarter in quarters:
            quarter_directory = os.path.join(base_directory, str(year), quarter)
            
            if not os.path.exists(quarter_directory):
                os.makedirs(quarter_directory)

            for file_name in os.listdir(quarter_directory):
                if file_name.endswith('.txt'): 
                    file_path = os.path.join(quarter_directory, file_name)

                    # Read the content from the file
                    with open(file_path, 'r', encoding='utf-8') as file:
                        content = file.read()

                    # Get MD&A sections
                    mda_sections = find_mda(content)

                    if mda_sections:
                        for section in mda_sections:
                            combined_content = '\n'.join(section['mda_section_lines'])
                            output_file_path = os.path.join(base_directory, 'mda', str(year), quarter, f'mda_{file_name}')
                            
                            os.makedirs(os.path.dirname(output_file_path), exist_ok=True)

                            try:
                                # Write the cleaned MD&A section to the output file
                                with open(output_file_path, 'w', encoding='utf-8') as output_file:
                                    output_file.write(combined_content)
                                print(f"Processed and saved: {output_file_path}")
                            except Exception as e:
                                print(f"Error writing MD&A file {output_file_path}: {e}")

In [None]:
base_directory = 'C:/Users/chenw/Desktop/ws'
process_mda_files(base_directory, range(2016, 2024), [f'QTR{i}' for i in range(1, 5)])

Found 1 MD&A sections.
Processed and saved: C:/Users/chenw/Desktop/ws\mda\2016\QTR1\mda_cleaned_20160104_10-K_edgar_data_718924_0001437749-16-023002.txt
Found 1 MD&A sections.
Processed and saved: C:/Users/chenw/Desktop/ws\mda\2016\QTR1\mda_cleaned_20160107_10-K_edgar_data_810136_0001140361-16-047150.txt
Found 2 MD&A sections.
Processed and saved: C:/Users/chenw/Desktop/ws\mda\2016\QTR1\mda_cleaned_20160108_10-K_edgar_data_315374_0001144204-16-074937.txt
Processed and saved: C:/Users/chenw/Desktop/ws\mda\2016\QTR1\mda_cleaned_20160108_10-K_edgar_data_315374_0001144204-16-074937.txt
Found 2 MD&A sections.
Processed and saved: C:/Users/chenw/Desktop/ws\mda\2016\QTR1\mda_cleaned_20160108_10-K_edgar_data_886128_0000886128-16-000032.txt
Processed and saved: C:/Users/chenw/Desktop/ws\mda\2016\QTR1\mda_cleaned_20160108_10-K_edgar_data_886128_0000886128-16-000032.txt
Found 2 MD&A sections.
Processed and saved: C:/Users/chenw/Desktop/ws\mda\2016\QTR1\mda_cleaned_20160111_10-K_edgar_data_1342423

In [None]:
####################
# 4. Calculate the readability indices.
####################

In [None]:
from readability import Readability
import nltk
nltk.download('punkt_tab')
#fog index calculation reffered to "https://pypi.org/project/py-readability-metrics/#gunning-fog"

In [None]:
#1)Fog  = 0.4 * (words per sentence + percentage of complex words)                        
#2)Length = log(num words in the document)

In [10]:
firm_year = pd.read_csv('firm_year.csv' )
firm_year['filing_date'] = pd.to_datetime(firm_year['filing_date'], errors='coerce').dt.strftime('%Y/%m/%d')

In [None]:
def calculation(df, base_directory, prefix, suffix):
    df[f'{suffix}_fog_index'] = 0.0
    df[f'{suffix}_length'] = 0.0
    df[f'{suffix}_words'] = 0
    
    for index, row in df.iterrows():
        cik = str(row['cik'])
        filing_date = row['filing_date']
        year = str(row['year'])
        quarter = 'QTR' + str((int(filing_date.split('/')[1]) - 1) // 3 + 1)  
        files_directory = os.path.join(base_directory, year, quarter)
        filename_start = f"{prefix}_{filing_date.replace('/', '')}_10-K_edgar_data_{cik}_"
        
        for file in os.listdir(files_directory):
            if file.startswith(filename_start) and file.endswith('.txt'):
                file_path = os.path.join(files_directory, file)
                
                with open(file_path, 'r', encoding='utf-8') as file_content:
                    content = file_content.read()
                    r = Readability(content)
                    fog_index = r.gunning_fog().score if len(content.split()) > 100 else np.nan
                    num_words = len(content.split())  
                    length = np.log(num_words) if num_words > 0 else 0 

                    df.at[index, f'{suffix}_fog_index'] = fog_index
                    df.at[index, f'{suffix}_length'] = length
                    df.at[index, f'{suffix}_words'] = num_words
                break  

full = 'D:/ws/10_k'
mda = 'D:/ws/mda'

calculation(firm_year, full, 'cleaned', 'full')
calculation(firm_year, mda, 'mda_cleaned', 'mda')


In [19]:
firm_year.to_csv('firm_year_cleaned.csv', index=False)

In [None]:
################
# 5. Merge the fog&length with the bogdate by gvkey and filedate
# PS: bogindex is downloaded from the website "https://sites.google.com/iu.edu/professorbrianpmiller/bog-data" 
# and is described and validated in Bonsall, Leone, Miller and Rennekamp (2017)
#################

In [11]:
firm_year = pd.read_csv('firm_year_cleaned.csv')
firm_year['gvkey'] = firm_year['gvkey'].astype(str)
firm_year['cik'] = firm_year['cik'].astype(str)
firm_year['filing_date'] = pd.to_datetime(firm_year['filing_date'], errors='coerce').dt.strftime('%Y/%m/%d')

bog = pd.read_csv('bogdata.csv')
bog['gvkey'] = bog['gvkey'].astype(str)
bog['cik'] = bog['cik'].astype(str)
bog['filedate'] = pd.to_datetime(bog['filedate'], errors='coerce').dt.strftime('%Y/%m/%d')

merge = pd.merge(firm_year, bog, how='left', left_on=['gvkey', 'filing_date','cik'], right_on=['gvkey', 'filedate','cik'])
merge = merge[merge['year'].between(2016, 2021)]

In [13]:
merge.to_csv("merge.csv", index= False)