## Corporate insider trading data ##
This notebook contains a few functions to parse forms 3, 4 and 5 (see the [SEC FAQ](https://www.sec.gov/fast-answers/answersform345htm.html) for details about these forms). The function *extract_insider_data()* takes the form and return a dataframe in which each row is a transaction reported in the form (see outline). The text from the request serves as input for the extract_insider_data(). The input is cleaned and later passes through three diffrent functions that extract the data from the form.

* extract_insider_data(link_request)
  * clean_text_xml(file) # remove text and preserve xml
  * -extract_identity(page) # get data about firm and filler
  * -extract_derivative(page) # gathe data about table 2
  * -extract_nonderivative(page) # gather data about table 1
  * -get_identity(file) # gather identification data from forms without xml
  * return - dataframe

Caveats:
Some fillings are purely text. In this case, the function extracts only the identification part of the form.  
Function extract_data() is a parser. You need to feed a SEC link into it. There are many python and r packages to get a direct link to the fillings.

In [14]:
# required
import pandas as pd
from bs4 import BeautifulSoup as bs
import requests

In [15]:
def extract_insider_data(link_request): 
    link_content = str(link_request.content)
    text = clean_text_xml(link_content)
    if len(text) >= 1:
        page = bs(text, "xml")
        data_info = extract_identity(page)
        data_derivatives = extract_derivative(page)  
        data_nonderivatives = extract_nonderivative(page)
        data_1 = pd.merge(data_info, data_derivatives, on = ["execCIK"])  
        data_2 = pd.merge(data_info, data_nonderivatives, on = ["execCIK"])  
        data = pd.merge(data_1, data_2, how = "outer")
        try:
            footnotes = page.find("footnotes").text
        except:
            footnotes = "" 
        data["footnotes"] = footnotes
    else:
        data = get_identity(link_request)
    return(data)

In [16]:
def clean_text_xml(file):
    page = file.split("\\n")
    cleaned_page = ""
    counter = 0
    document_start = re.compile('\<XML\>')
    document_end = re.compile('\<\/XML\>')
    for line in page:
        if counter == 0:
            if document_start.search(line) is not None:
                counter = counter + 1  
            else:
                continue
        else:
            if document_end.search(line) is not None:
                counter = 0
            else:
                cleaned_page = cleaned_page + line + " "
    return(cleaned_page)

In [17]:
def extract_identity(page):
    execData = dict()
    try:
        execData["CIK"] = page.find("issuerCik").text
    except:
        execData["CIK"] = ""
    try:
        execData["Firm"] = page.find("issuerName").text
    except:
        execData["Firm"] = ""    
    try:
        execData["Ticker"] = page.find("issuerTradingSymbol").text
    except:
        execData["Ticker"] = ""        
    try:
        execData["execCIK"] = page.find("rptOwnerCik").text
    except:
        execData["execCIK"] = ""        
    try:
        execData["personName"] = page.find("rptOwnerName").text
    except:
        execData["personName"] = ""    
    try:
        execData["type"] = page.find("documentType").text   
    except:
        execData["type"] = ""
    try:
        execData["reported_data"] = page.find("periodOfReport").text
    except:
        execData["reported_data"] = ""
    try:
        execData["isDirector"] = page.find("isDirector").text
    except:
        execData["isDirector"] = ""      
    try:
        execData["isOfficer"] = page.find("isOfficer").text
    except:
        execData["isOfficer"] = ""      
    try:
        execData["isTenOwner"] = page.find("isTenPercentOwner").text
    except:
        execData["isTenOwner"] = ""      
    try:
        execData["isOther"] = page.find("isOther").text
    except:
        execData["isOther"] = ""   
    try:
        execData["execTitle"] = page.find("officerTitle").text
    except:
        execData["execTitle"] = ""   
    try:
        execData["otherText"] = page.find("otherText").text
    except:
        execData["otherText"] = "" 
    try:
        execData["notSubjectToSection16"] = page.find("notSubjectToSection16").text
    except:
        execData["notSubjectToSection16"] = ""
    dt = pd.DataFrame.from_dict([execData])
    return(dt)

In [18]:
def extract_derivative(page):
    execData = dict()
    derivative_table = page.find("derivativeTable")
    var_list = list()
    control = 0
    if derivative_table is not None:
        try:
            v1 = pd.Series([i.text for i in derivative_table.find_all("securityTitle")], name = "securityTitle")
            var_list.append(v1)
        except:
            pass
        try:
            v2 = pd.Series([i.text for i in derivative_table.find_all("conversionOrExercisePrice")], name = "conversionOrExercisePrice")
            var_list.append(v2)
        except:
            pass
        try:
            v3 = pd.Series([i.text for i in derivative_table.find_all("transactionDate")], name = "transactionDate")
            var_list.append(v3)
        except:
            pass
        try:
            v4 = pd.Series([i.text for i in derivative_table.find_all("transactionCode")], name ="transactionCode")
            var_list.append(v4)
        except:
            pass
        try:
            v5 = pd.Series([i.text for i in derivative_table.find_all("equitySwapInvolved")], name ="equitySwapInvolved")
            var_list.append(v5)
        except:
            pass
        try:
            v6 = pd.Series([i.text for i in derivative_table.find_all("transactionShares")],name ="transactionShares")
            var_list.append(v6)
        except:
            pass
        try:
            v7 = pd.Series([i.text for i in derivative_table.find_all("transactionPricePerShare")], name ="transactionPricePerShare")
            var_list.append(v7)
        except:
            pass
        try:
            v8 = pd.Series([i.text for i in derivative_table.find_all("transactionAcquiredDisposedCode")], name ="transactionAcquiredDisposedCode")
            var_list.append(v8)
        except:
            pass
        try:
            v9 = pd.Series([i.text for i in derivative_table.find_all("exerciseDate")], "exerciseDate")
            var_list.append(v9)
        except:
            pass       
        try:
            v10 = pd.Series([i.text for i in derivative_table.find_all("expirationDate")], name ="expirationDate")
            var_list.append(v10)
        except:
            pass        
        try:
            v11 = pd.Series([i.text for i in derivative_table.find_all("underlyingSecurityTitle")], name = "underlyingSecurityTitle")
            var_list.append(v11)
        except:
            pass    
        try:
            v12 = pd.Series([i.text for i in derivative_table.find_all("underlyingSecurityShares")], name = "underlyingSecurityShares")
            var_list.append(v12)
        except:
            pass  
        try:
            v13 = pd.Series([i.text for i in derivative_table.find_all("sharesOwnedFollowingTransaction")], name = "sharesOwnedFollowingTransaction")
            var_list.append(v13)
        except:
            pass  
        try:
            v14 = pd.Series([i.text for i in derivative_table.find_all("directOrIndirectOwnership")], name = "directOrIndirectOwnership")
            var_list.append(v14)
        except:
            pass
        dt = pd.concat(var_list, axis = 1)
        dt["table"] = 2   
        dt["execCIK"] = page.find("rptOwnerCik").text
    else:
        dt = pd.DataFrame()
        dt["execCIK"] = page.find("rptOwnerCik").text
    dt = dt.fillna("")
    return(dt)    

In [19]:
def extract_nonderivative(page):
    execData = dict()
    var_list = list()    
    nonderivative_table = page.find("nonDerivativeTable")
    if nonderivative_table is not None:
        try:
            v1 = pd.Series([i.text for i in nonderivative_table.find_all("securityTitle")], name = "securityTitle")
            var_list.append(v1)
        except:
            pass  
        try:
            v2 = pd.Series([i.text for i in nonderivative_table.find_all("conversionOrExercisePrice")], name = "conversionOrExercisePrice")
            var_list.append(v2)
        except:
            pass          
        try:
            v3 = pd.Series([i.text for i in nonderivative_table.find_all("transactionDate")], name = "transactionDate")
            var_list.append(v3)
        except:
            pass          
        try:
            v4 = pd.Series([i.text for i in nonderivative_table.find_all("transactionFormType")], name = "transactionFormType")
            var_list.append(v4)
        except:
            pass          
        try:
            v5 = pd.Series([i.text for i in nonderivative_table.find_all("transactionCode")], name = "transactionCode")
            var_list.append(v5)
        except:
            pass          
        try:
            v6 = pd.Series([i.text for i in nonderivative_table.find_all("equitySwapInvolved")], name = "equitySwapInvolved")
            var_list.append(v6)
        except:
            pass          
        try:
            v7 = pd.Series([i.text for i in nonderivative_table.find_all("transactionShares")], name = "transactionShares")
            var_list.append(v7)
        except:
            pass          
        try:
            v8 = pd.Series([i.text for i in nonderivative_table.find_all("transactionPricePerShare")], name = "transactionPricePerShare")
            var_list.append(v8)
        except:
            pass          
        try:
            v9 = pd.Series([i.text for i in nonderivative_table.find_all("transactionAcquiredDisposedCode")], name = "transactionAcquiredDisposedCode")
            var_list.append(v9)
        except:
            pass          
        try:
            v10 = pd.Series([i.text for i in nonderivative_table.find_all("sharesOwnedFollowingTransaction")], name = "sharesOwnedFollowingTransaction")
            var_list.append(v10)
        except:
            pass
        try:
            v11 = pd.Series([i.text for i in nonderivative_table.find_all("directOrIndirectOwnership")], name = "directOrIndirectOwnership")
            var_list.append(v11)
        except:
            pass
        dt = pd.concat(var_list, axis = 1)
        dt["table"] = 1         
        dt["execCIK"] = page.find("rptOwnerCik").text
    else:
        dt = pd.DataFrame()
        dt["execCIK"] = page.find("rptOwnerCik").text
    dt = dt.fillna("")
    return(dt)  

In [20]:
def get_identity(link_request):
    execData = dict()
    submission = re.compile("(CONFORMED SUBMISSION TYPE:)(.+)")
    date = re.compile("(CONFORMED PERIOD OF REPORT:)\s+(\d\d\d\d)(\d\d)(\d\d)")
    reporting = re.compile("(REPORTING-OWNER:|<REPORTING-OWNER>)")
    cik = re.compile("(CENTRAL INDEX KEY:)(.+)")
    relationship = re.compile("(RELATIONSHIP:|RELATIONSHIP)(.+)")
    company = re.compile("(SUBJECT COMPANY:|ISSUER:).+")
    name = re.compile("(COMPANY CONFORMED NAME:)(.+)")
    counter = 0
    text = link_request.text.split("\n")
    for i in text:
        if submission.search(i) is not None:
            execData["type"] = submission.search(i).group(2).strip()
        elif date.search(i) is not None:
            execData["reported_data"] = date.search(i).group(2)+"-"+date.search(i).group(3)+"-"+date.search(i).group(4)
        elif reporting.search(i) is not None:
            counter = 1
        elif counter == 1:
            if cik.search(i) is not None:
                execData["execCIK"] = cik.search(i).group(2).strip()
            elif relationship.search(i) is not None:
                rel = relationship.search(i).group(2).strip()
                rel = re.sub("\<", "", rel)
                rel = re.sub("\>", "", rel)
                if rel == "DIRECTOR":
                    execData["isDirector"] = 1
                elif rel == "OFFICER":
                    execData["isOfficer"] = 1
                elif rel == "OWNER":
                    execData["isTenOwner"] = 1
                else:
                    execData["isOther"] = 1
            elif name.search(i) is not None:
                execData["personName"] = name.search(i).group(2).strip()
            elif company.search(i) is not None:
                counter = 2
        elif company.search(i) is not None:
            counter = 2
        elif counter == 2:
            if name.search(i) is not None:
                execData["Firm"] = name.search(i).group(2).strip()
            elif cik.search(i) is not None:
                execData["CIK"] = cik.search(i).group(2).strip()  
            elif reporting.search(i) is not None:
                counter = 1
    data = pd.DataFrame.from_dict([execData])      
    return(data)  
        