In [20]:
import pandas as pd
import re
import xmltodict
import flatdict
from pathlib import Path
import os
import sys

In [21]:
# f4data.py
class Form4Data:
    """
    Create a class for holding formatted Form-4 data
    
    self.df: pandas DataFrame
    
    """
       
    issuer_col_name = ["issuerCik", "issuerName", "issuerTradingSymbol"]
    
    reporting_col_name = [
        "reportingOwnerId.rptOwnerCik",
        "reportingOwnerId.rptOwnerName",
        "reportingOwnerAddress.rptOwnerStreet1",
        "reportingOwnerAddress.rptOwnerStreet2",
        "reportingOwnerAddress.rptOwnerCity",
        "reportingOwnerAddress.rptOwnerState",
        "reportingOwnerAddress.rptOwnerZipCode",
        "reportingOwnerAddress.rptOwnerStateDescription",
        "reportingOwnerRelationship.isDirector",
        "reportingOwnerRelationship.isOfficer",
        "reportingOwnerRelationship.isTenPercentOwner",
        "reportingOwnerRelationship.isOther",
        "reportingOwnerRelationship.officerTitle",
        "reportingOwnerRelationship.otherText"
        ]

    nonderivative_col_name = [
        "securityTitle.value",
        "transactionDate.value",
        "deemedExecutionDate.value",
        "transactionCoding.transactionCode",
        "transactionTimeliness.value",
        "transactionAmounts.transactionShares.value",
        "transactionAmounts.transactionAcquiredDisposedCode.value",
        "transactionAmounts.transactionPricePerShare.value",
        "postTransactionAmounts.sharesOwnedFollowingTransaction.value",
        "ownershipNature.directOrIndirectOwnership.value",
        "ownershipNature.natureOfOwnership.value",
        "footnote"
        ]
        
    derivative_col_name = [
        "securityTitle.value",
        "conversionOrExercisePrice.value",
        "transactionDate.value",
        "deemedExecutionDate.value",
        "transactionCoding.transactionCode",
        "transactionTimeliness.value",
        "transactionAmounts.transactionAcquiredDisposedCode.value",
        "transactionAmounts.transactionShares.value",
        "exerciseDate.value",
        "expirationDate.value",
        "underlyingSecurity.underlyingSecurityTitle.value",
        "underlyingSecurity.underlyingSecurityShares.value",
        "transactionAmounts.transactionPricePerShare.value",
        "postTransactionAmounts.sharesOwnedFollowingTransaction.value",
        "ownershipNature.directOrIndirectOwnership.value",
        "ownershipNature.natureOfOwnership.value",
        "footnote"
        ]
   
    footnotes_col_name = ["footnote_"]
        
    def __init__(self, df):
        self.df =df
    
    
    @classmethod
    def from_txt(cls, table_name, orig_df):
        """
        This function creates a DataFrame, with standardized column names, and dropped redundant entries
        
        table_name: string, name of database to create
        orig_df:    pandas DataFrame, full table contains all the data columns        
        """
        if table_name == "nonDerivative":
            column_list = cls.issuer_col_name + cls.reporting_col_name + cls.nonderivative_col_name
        elif table_name == "derivative":
            column_list = cls.issuer_col_name + cls.reporting_col_name + cls.derivative_col_name
        elif table_name == "footnotes":
            column_list = cls.issuer_col_name + cls.reporting_col_name + cls.footnotes_col_name
        else:
            raise ValueError("Unknown table name!")
            
        # could do some more checking with column_names after concat         
        empty_df = pd.DataFrame(columns=column_list)
        df  = pd.concat([empty_df,orig_df])[column_list]

        cls._check_col_name(empty_df, orig_df)
        
        return cls(df)

    
    @staticmethod
    def _check_col_name(empty_df, orig_df):
        """
        This function checks if the code is reading new unknown column names
        
        empty_df: pandas DataFrame, contains empty DataFrame with columns that have standard names
        orig_df:  pandas DataFrame, full table contains all the data columns 
        """

        # Exclude already known list of outliers:
        #         footnote
        #         transactionCoding.transactionFormType (4 for these forms, not included in database)
        #         transactionCoding.equitySwapInvolved (field not used by form 4)
        #         transactionTimeliness (if it's still here, it's a duplicate)
        #         deemedExecutionDate, empty field, populated field should be deemedExecutionDate.value
        #         transactionTimeliness, empty field, pupulated field should be transactionTimeliness.value

        out_list = list(set(orig_df.columns.values) - set(empty_df.columns.values))
        for coln in out_list:
            i = coln.lower()
            if ("footnote" not in i and "equityswap" not in i
            and "formtype" not in i and "transactiontimeliness" not in i
            and "transactiontimeliness" not in i and "deemedexecutiondate" not in i):
                assert("Warning: unmatched column name: "+coln)
   
        return
    
    
    @classmethod
    def from_csv(cls, input_path, filename):
        """
        This function load data from .csv file

        input_path:  Path obj, input directory
        filename:    string, full filename of .csv file 
        """
        input_file_loc  = input_path / filename
        df = pd.read_csv(input_file_loc)

        return cls(df)
    
    
    def check_10b5(self, text):
        """
        This function checks if 10b5 is mentioned in the footnote text.
        text:   string
        return: boolean
        """
        return "10b5" in text if isinstance(text, str) else False
    
    
    def add_has_10b5(self):
        tmp = self.df['footnote'].apply(self.check_10b5)
        tmp.name = "has_10b5"
        self.df = pd.concat([self.df, tmp], axis=1)
        
        return
        

In [22]:
# proc_form4.py

def proc_form4txt(input_path, output_path, filename, output_filename):
    """
    This function processes the xml text for correct reading later using flatdict:
    1. Add empty xml elements to xml, so that flatdict can process items consistently
    2. Merge "Holding" to "Transaction", so that no separate form is needed
    3. Edit <footnotes> for flatdict to read properly
    
    input_path:      Path obj, input directory
    output_path:     Path obj, outnput directory
    filename:    string, full filename of Form-4.txt for pre-processing
    output_filename: string, full filename of file written: Form-4.txt.mod 
    """
    
    input_file_loc  = input_path / filename
    output_file_loc = output_path / output_filename

    input_file  = open(input_file_loc, 'r')
    lines   = input_file.readlines()
    
    output_file = open(output_file_loc,'w')
    for line in lines:
        # add empty xml elements so that flatdic can process all as a list
        if r'</nonDerivativeTable>' in line and r'<nonDerivativeTable></nonDerivativeTable>' not in line:
            output_file.write(r'<nonDerivativeTransaction></nonDerivativeTransaction>' + "\n") 
        if r'</derivativeTable>' in line and r'<derivativeTable></derivativeTable>' not in line:
            output_file.write(r'<derivativeTransaction></derivativeTransaction>' + "\n") 
        if r'</footnotes>' in line and r'<footnotes></footnotes>' not in line:
            output_file.write(line.replace(r'</footnotes>', r'<footnote><footnote_>  </footnote_></footnote></footnotes>'))
            continue
            
        # "Holding" and "Transaction" are slight variation of same table
        if 'nonDerivativeHolding' in line:
            output_file.write(line.replace('nonDerivativeHolding', 'nonDerivativeTransaction'))
            continue
        if 'derivativeHolding' in line:
            output_file.write(line.replace('derivativeHolding', 'derivativeTransaction'))
            continue
        
        # add additional nesting in footnote, so that flatdic process and separate the notes
        if r'<footnote ' in line:
            output_file.write(line.replace(' id', '><footnote_>id').replace(r'</footnote>', r'</footnote_></footnote>'))
            continue
        if r'</footnote>' in line:
            output_file.write(line.replace(r'</footnote>', r'</footnote_></footnote>'))
            continue
              
        output_file.write(line)
        
    output_file.close()
    input_file.close()
    
    return


def form4xml_to_flatdict(filepath, filename):
    """
    This function reads the processed form4.txt.mod, extract xml information and convert to a flatdict object.
    
    filepath: Path obj, file directory
    filename: string, full filename of file written: Form-4.txt.mod 
    return:   flatdict obj
    """
    
    with open(filepath / filename) as f:
        data = f.read()

    # extract file around ownershipDocument 
    matcher = re.compile(r'<\?xml.*ownershipDocument>', flags=re.MULTILINE|re.DOTALL)
    matches = matcher.search(data)
    xml     = matches.group(0)

    # load entire xml to dict object
    xmldict = xmltodict.parse(xml)

    # use flatdict tool to flatten levels of dictionary for easy indexing
    return flatdict.FlatDict(xmldict["ownershipDocument"], delimiter='.')


def flatdict_to_df(table_d):
    """
    This function takes a flat dictionary object and process it as follows: 
    If there is only 1 item, it is a dictionary. Convert it to a pandas DF object
    If there are more than 1 item, it will be a list. We flat it further and convert it to a pandas DF object
    
    table_d: could be list or a pandas DataFrame that has two rows (one of them contains column names)
    return:  pandas DataFrame with column names
    """
    
    if isinstance(table_d, list):
        d_list = []
        for i in table_d:
            d_list.append(flatdict.FlatDict(i, delimiter='.'))
        
        return pd.DataFrame(d_list)
    
    else:
        tmp_df   = pd.DataFrame(table_d.items()).T
        col_name = tmp_df.iloc[0] 
    
        return tmp_df.drop([0]).reset_index(drop=True).rename(col_name, axis=1)


def form4df_to_csv(filepath, full_dict, issuer_df, reportingOwner_df):
    """
    This function takes read-in information and save it to .csv database
    
    filepath:          Path obj, directory for file
    full_dict:         flatdict, contains full flatdic read from xml 
    issuer_df:         pandas DataFrame, contains issuer info
    reportingOwner_df: pandas DataFrame, contains reporting owner info
    """

    exist_nonDer = False
    exist_der = False

    
    # work on footnotes
    if "footnotes.footnote" in full_dict.keys():
        footnotes_df  = flatdict_to_df(full_dict["footnotes.footnote"])
        footnotes_dict = footnotes_to_dict(footnotes_df["footnote_"])
    
    # work on nonDerivativeTable
    if "nonDerivativeTable.nonDerivativeTransaction" in full_dict.keys():
        nonDerivativeTable_df = flatdict_to_df(full_dict["nonDerivativeTable.nonDerivativeTransaction"])
        
        # check if there is footnote information to add
        col_has_footnote = [col for col in nonDerivativeTable_df.columns if 'footnote' in col]
        if col_has_footnote:
            nonDerivativeTable_df = get_footnote_info(nonDerivativeTable_df, footnotes_dict, col_has_footnote)
        
        # add information about issuer and owner to the tables
        nonDerivative_cDF = concat_abtoC(issuer_df, reportingOwner_df, nonDerivativeTable_df)
        # remove the last row that was added for flatdict reading
        f4_nonDerivative  = Form4Data.from_txt("nonDerivative", nonDerivative_cDF.iloc[:-1])
        save_df_to_csv(filepath, "nonDerivative.csv", f4_nonDerivative.df)
        exist_nonDer = True
        
    # work on derivativeTable
    if "derivativeTable.derivativeTransaction" in full_dict.keys():
        derivativeTable_df= flatdict_to_df(full_dict["derivativeTable.derivativeTransaction"])
        
        # check if there is footnote information to add
        col_has_footnote = [col for col in derivativeTable_df.columns if 'footnote' in col]
        if col_has_footnote:
            derivativeTable_df = get_footnote_info(derivativeTable_df, footnotes_dict, col_has_footnote)
                
        derivative_cDF    = concat_abtoC(issuer_df, reportingOwner_df, derivativeTable_df)
        f4_derivative     = Form4Data.from_txt("derivative", derivative_cDF.iloc[:-1])
        save_df_to_csv(filepath, "derivative.csv", f4_derivative.df)
        exist_der = True
    
    # work on footnotes
    if "footnotes.footnote" in full_dict.keys():
        footnotes_cDF = concat_abtoC(issuer_df, reportingOwner_df, footnotes_df)
        f4_footnotes  = Form4Data.from_txt("footnotes", footnotes_cDF.iloc[:-1])

        # add transaction date to footnotes table
        row = len(f4_footnotes.df.index)
        if exist_nonDer:
            transactionDate = [f4_nonDerivative.df["transactionDate.value"].iloc[0]]*row
        elif exist_der:
            transactionDate = [f4_derivative.df["transactionDate.value"].iloc[0]]*row
        else:
            raise Exception("Try to find transaction date for footnotes. Empty entries for nonDerivative and derivative tables.")
        
        footnotes_withdate = f4_footnotes.df
        footnotes_withdate["transactionDate"] = transactionDate
        save_df_to_csv(filepath, "footnotes.csv", footnotes_withdate)
    
    return


def concat_abtoC(a, b, c):
    """
    This function takes three pandas DF objects a b and c (a and b has one row and c may have multiple rows) to:
    1. Duplicate lines of a and b to the same number of rows in c
    2. Merge a b and c to a large DF along axis=1
    
    a:      pandas DataFrame, one row only
    b:      pandas DataFrame, one row only
    c:      pandas DataFrame, may have multiple rows
    return: pandas DataFrame
    """
    
    n   = len(c.index)
    t1  = pd.concat([a]*n,ignore_index=True)
    t2  = pd.concat([b]*n,ignore_index=True)
    t12 = pd.concat([t1.reset_index(drop=True),t2.reset_index(drop=True)], axis=1)
    
    return pd.concat([t12.reset_index(drop=True),c.reset_index(drop=True)], axis=1)


def save_df_to_csv(filepath, filename, df):
    """
    This function takes a pandas DF and save to specific filename in filepath.
    Append if file already exists.
    
    filepath: Path obj, directory for file
    filename: string, full filename for csv output   
    df:       pandas DataFrame
    """
    
    fileloc = filepath / filename
    file_present = os.path.isfile(fileloc) 
    if file_present:
        df.to_csv(fileloc, index=False, mode='a', header=False)    
    else:
        df.to_csv(fileloc, index=False)
        
    return
                                           
                                           


In [23]:
def footnotes_to_dict(footnotes_df):
    """
    This function converts footnote column to a dictionary
    footnotes_df:  pandas Series
    return:        dict obj
    """
    
    footnote_dict = {}
    
    for i, value in footnotes_df.iloc[:-1].items():
        s_tmp, s_text = value.split(r'>')
        s_id = s_tmp.split(r'"')[1]
           
        footnote_dict[s_id] = s_id + r': ' + s_text

    return footnote_dict
    
    
def get_footnote_info(df, footnotes_dict, col_has_footnote):
    """
    This function 
    1. Read the index location and id value from the footnoteat column.
    2. Create a footnote column, append footnote entry to the correct location.
    df:               pandas DataFrame, target dataframe to append footnote column
    footnotes_dict:   dictonary obj, contains id -> string details
    col_has_footnote: list of column names with footnote
    return:           pandas DataFrame
    """
           
    # for each footnote column, save index and footnote key
    f_index = []
    f_value= []
    
    for i in col_has_footnote:
        # iterate over values inside footnote column
        for idx, value in df[i].items():
            # more than one footnotes in this tag
            if isinstance(value, list):
                for j in value:
                    f_index.append(idx)
                    f_value.append(footnotes_dict[j["@id"]])
                    
            else:
                if "F" in str(value):
                    f_index.append(idx)
                    f_value.append(footnotes_dict[value])
                    
    tmp_df = pd.Series(f_value, index = f_index, name='footnote')
    footnote_col = tmp_df.groupby(level=0).transform(lambda x: ' '.join(x)).drop_duplicates()
    
    return pd.concat([df,footnote_col], axis=1)
    

In [24]:
# edgar_form4.py
def form4_to_csv(input_path, output_path, filename):
    """
    This is the main function that reads form 4 file and process it and save it to .csv database
    
    input_path:   Path obj, directory for input files
    output_path:  Path obj, directory for output files
    filename: string, full filename of Form-4.txt file
    """
    
    # pre-processing .txt file, so that xml can be formatted properly with flatdict
    output_filename = filename + '.mod'
    proc_form4txt(input_path, output_path, filename, output_filename)
    
    # extract xml information to flatdict object
    full_dict = form4xml_to_flatdict(output_path, output_filename)

    # create subsections from the full dictionary
    # issuer and reportingOwner first; hopefully these fields are populated
    issuer_df = flatdict_to_df(full_dict["issuer"])
          
    if isinstance(full_dict["reportingOwner"], list):
        for item in full_dict["reportingOwner"]:
            tmp = flatdict.FlatDict(item, delimiter='.')
            reportingOwner_df = flatdict_to_df(tmp)
            form4df_to_csv(output_path, full_dict, issuer_df, reportingOwner_df)
        
        # # DEBUG only: use only one reporting Owner for multiple owner cases
        # item=full_dict["reportingOwner"][0]
        # tmp = flatdict.FlatDict(item, delimiter='.')
        # reportingOwner_df = flatdict_to_df(tmp)
        # form4df_to_csv(output_path, full_dict, issuer_df, reportingOwner_df)
        # # DEBUG only

    else:
        reportingOwner_df = flatdict_to_df(full_dict["reportingOwner"])
        form4df_to_csv(output_path, full_dict, issuer_df, reportingOwner_df)
        
    return


def run_form4(input_path, output_path, list_order):
    """
    This function calls the main function form4_to_csv
    
    input_path:   string, directory for input files
    output_path:  string, directory for output files
    list_order: boolean, read the .txt files in the order specified by a file "list_txt"
    """
    
    if not list_order:
        directory = os.fsencode(input_path)
        for file in os.listdir(directory):
            filename = os.fsdecode(file)
            if filename.endswith(".txt"): 
                print(filename)
                form4_to_csv(Path(input_path), Path(output_path), filename)
    else:
        fileloc = Path(input_path) / "list_txt"
        input_file  = open(fileloc, 'r')
        lines   = input_file.readlines()

        for line in lines:
            filename = line.strip()
            print(filename)
            form4_to_csv(Path(input_path), Path(output_path), filename)

        input_file.close()

    return


In [25]:
input_path = Path("./test_jup")
output_path = Path("./test_jup/scr")
output_path.mkdir(exist_ok=True)
filename ="912728_4_0000912728-20-000168.txt"
filename ="1109354_3_0001179110-20-009902.txt"
# filename ="1505512_4_0001209191-20-061962.txt"
# filename ="1412408_1_0000899243-20-001780.txt"
form4_to_csv(input_path, output_path, filename)


In [26]:
input_path = Path("./test_jup")
output_path = Path("./test_jup/scr")
output_path.mkdir(exist_ok=True)

# run_form4(input_path, output_path, True)

In [27]:
# nd = pd.read_csv("./test_jup/scr/nonDerivative.csv")
# d = pd.read_csv("./test_jup/scr/derivative.csv")
# f = pd.read_csv("./test_jup/scr/footnotes.csv")
# display(nd)


In [28]:
output_path = Path("./test_jup/scr")
filename = "nonDerivative.csv"
# pd.read_csv("./test_jup/scr/nonDerivative.csv")
a = Form4Data.from_csv(output_path, filename)
display(a.df)
a.add_has_10b5()
# display(a.df)
display(a.df.loc[a.df["has_10b5"]])

Unnamed: 0,issuerCik,issuerName,issuerTradingSymbol,reportingOwnerId.rptOwnerCik,reportingOwnerId.rptOwnerName,reportingOwnerAddress.rptOwnerStreet1,reportingOwnerAddress.rptOwnerStreet2,reportingOwnerAddress.rptOwnerCity,reportingOwnerAddress.rptOwnerState,reportingOwnerAddress.rptOwnerZipCode,...,deemedExecutionDate.value,transactionCoding.transactionCode,transactionTimeliness.value,transactionAmounts.transactionShares.value,transactionAmounts.transactionAcquiredDisposedCode.value,transactionAmounts.transactionPricePerShare.value,postTransactionAmounts.sharesOwnedFollowingTransaction.value,ownershipNature.directOrIndirectOwnership.value,ownershipNature.natureOfOwnership.value,footnote
0,1109354,BRUKER CORP,BRKR,1675108,Friend Cynthia M,BRUKER CORPORATION,40 MANNING ROAD,BILLERICA,MA,1821,...,,M,,800,A,26.17,12657,D,,
1,1109354,BRUKER CORP,BRKR,1675108,Friend Cynthia M,BRUKER CORPORATION,40 MANNING ROAD,BILLERICA,MA,1821,...,,S,,800,D,37.5813,11857,D,,F2: The price reported in Column 4 is a weight...


Unnamed: 0,issuerCik,issuerName,issuerTradingSymbol,reportingOwnerId.rptOwnerCik,reportingOwnerId.rptOwnerName,reportingOwnerAddress.rptOwnerStreet1,reportingOwnerAddress.rptOwnerStreet2,reportingOwnerAddress.rptOwnerCity,reportingOwnerAddress.rptOwnerState,reportingOwnerAddress.rptOwnerZipCode,...,transactionCoding.transactionCode,transactionTimeliness.value,transactionAmounts.transactionShares.value,transactionAmounts.transactionAcquiredDisposedCode.value,transactionAmounts.transactionPricePerShare.value,postTransactionAmounts.sharesOwnedFollowingTransaction.value,ownershipNature.directOrIndirectOwnership.value,ownershipNature.natureOfOwnership.value,footnote,has_10b5
1,1109354,BRUKER CORP,BRKR,1675108,Friend Cynthia M,BRUKER CORPORATION,40 MANNING ROAD,BILLERICA,MA,1821,...,S,,800,D,37.5813,11857,D,,F2: The price reported in Column 4 is a weight...,True
