# Extract from repeating and non-repeating groups into multiple output streams

Sometimes you have no choice but extract from three different forms at the same time to get 
the same info. This somewhat more complicated example does that with data structures that are replaceable, somewhat.

We're reading into two files: employees_detailedYYYY.csv and diversionsYYYY.csv.

The example illustrates how to capture repeating groups and non-repeating entries from form parts. This use the StreamExtractor class to keep everything straight.


In [1]:
output_streams = {
    'employees': {  
        'filename':'employees_detailed', # will output to employees_detailedYYYY.csv where year is specified below
        'headers':["year", "ein", "object_id", "name", "business_name1", "business_name2", "title", "org_comp", "related_comp", "other_cmp", "form", "source", "IndvdlTrstOrDrctrInd","InstttnlTrstInd","OffcrInd","KyEmplyInd","HghstCmpnstdEmplyInd","FrmrOfcrDrctrTrstInd"]
    },
    'diversions': {  
        'filename':'diversions', # will output to diversionsYYYY.csv where year is specified below
        'headers':["year", "ein", "object_id", "taxpayer_name", "diversion_ind"]
    }
}

In [2]:

# The format we're using is this
# The stream_key used must be defined in the output stream above.
# data_capture_dict = {
#     '<Form name>': {

#         'groups': {
#              '<group name>': {
#                 'stream_key': 'employees',  # 'stream_key' specifies where the output goes--must exist as a key in output_streams
#                 'ein': {'header':'ein'},
#                 'object_id': {'header':'object_id'}
#         'parts': {
#              '<part_name>': {
#                 'stream_key': 'employees',  # 'stream_key' specifies where the output goes--must exist as a key in output_streams
#                 'ein': {'header':'ein'},
#                 'object_id': {'header':'object_id'}


data_capture_dict = {
    'IRS990': {
        'parts': {
            'part_vi': {
                'stream_key': 'diversions',  # 'stream_key' specifies where the output goes--must exist as a key in output_streams
                'ein': {'header':'ein'},
                'object_id': {'header':'object_id'},
                'MtrlDvrsnOrMssInd':{'header':'diversion_ind'},
            }
        },
        ## The remaining logic is for capturing salaries wherever they appear in 
        ## the 990, 990PF and 990EZ
        'groups': {
             'Frm990PrtVIISctnA': {
                'stream_key': 'employees',  # 'stream_key' specifies where the output goes--must exist as a key in output_streams
                'ein': {'header':'ein'},
                'object_id': {'header':'object_id'},
                'PrsnNm': {'header':'name'},
                'BsnssNmLn1Txt':{'header':'business_name1'},
                'BsnssNmLn2Txt':{'header':'business_name2'},
                'TtlTxt': {'header':'title'},
                'RprtblCmpFrmOrgAmt': {
                    'header':'org_comp',
                    'default':0  # set numeric if missing
                },
                'RprtblCmpFrmRltdOrgAmt': {
                    'header':'related_comp',
                    'default':0
                },
                'OthrCmpnstnAmt':{
                    'header':'other_cmp',
                    'default':0
                },
                'IndvdlTrstOrDrctrInd':{'header':'IndvdlTrstOrDrctrInd'},
                'InstttnlTrstInd':{'header':'InstttnlTrstInd'},
                'OffcrInd':{'header':'OffcrInd'},
                'KyEmplyInd':{'header':'KyEmplyInd'},
                'HghstCmpnstdEmplyInd':{'header':'HghstCmpnstdEmplyInd'},
                'FrmrOfcrDrctrTrstInd':{'header':'FrmrOfcrDrctrTrstInd'}
            }
        }
    },
    'IRS990EZ': {
        'groups': {
            'EZOffcrDrctrTrstEmpl': {
                'stream_key': 'employees',
                'ein': {'header':'ein'},
                'object_id': {'header':'object_id'},
                'PrsnNm': {'header':'name'},
                'BsnssNmLn1': {'header':'business_name1'},
                'BsnssNmLn2': {'header':'business_name2'},


                'TtlTxt': {'header':'title'},
                'CmpnstnAmt': {
                    'header':'org_comp',
                    'default':0
                },
                'composite': {  # other compensation includes benefits and other allowances for EZ, PF filers
                    'other_cmp': {
                        'EmplyBnftPrgrmAmt': {
                            'default':0
                        },
                        'ExpnsAccntOthrAllwncAmt': {
                            'default':0
                        }
                    }
                }
            },
            'EZCmpnstnHghstPdEmpl': {
                'stream_key': 'employees',
                'ein': {'header':'ein'},
                'object_id': {'header':'object_id'},
                'PrsnNm': {'header':'name'},
                'TtlTxt': {'header':'title'},
                'CmpnstnAmt': {
                    'header':'org_comp',
                    'default':0
                },
                'composite': {
                    'other_cmp': {
                        'EmplyBnftsAmt': {
                            'default':0
                        },
                        'ExpnsAccntAmt': {
                            'default':0
                        }
                    }
                }
            }
        }
    },
    'IRS990PF': {
        'groups': {
            'PFOffcrDrTrstKyEmpl': {
                'stream_key': 'employees',

                'ein': {'header':'ein'},
                'object_id': {'header':'object_id'},
                'OffcrDrTrstKyEmpl_PrsnNm': {'header':'name'},
                'OffcrDrTrstKyEmpl_BsnssNmLn1': {'header':'business_name1'},
                'OffcrDrTrstKyEmpl_BsnssNmLn2': {'header':'business_name2'},
                'OffcrDrTrstKyEmpl_TtlTxt': {'header':'title'},
                'OffcrDrTrstKyEmpl_CmpnstnAmt': {
                    'header':'org_comp',
                    'default':0  # set numeric if missing
                },
                'composite': {
                    'other_cmp': {
                        'OffcrDrTrstKyEmpl_EmplyBnftPrgrmAmt': {
                            'default':0
                        },
                        'OffcrDrTrstKyEmpl_ExpnsAccntOthrAllwncAmt': {
                            'default':0
                        }
                    }
                }
            },
            'PFCmpnstnHghstPdEmpl': {
                'stream_key': 'employees',

                'ein': {'header':'ein'},
                'object_id': {'header':'object_id'},
                'CmpnstnHghstPdEmpl_PrsnNm': {'header':'name'},
                'CmpnstnHghstPdEmpl_TtlTxt': {'header':'title'},
                'CmpnstnHghstPdEmpl_CmpnstnAmt': {
                    'header':'org_comp',
                    'default':0  # set numeric if missing
                },
                'composite': {
                    'other_cmp': {
                        'CmpnstnHghstPdEmpl_EmplyBnftsAmt': {
                            'default':0
                        },
                        'CmpnstnHghstPdEmpl_ExpnsAccntAmt': {
                            'default':0
                        }
                    }
                }
            }
        }
    },
    'IRS990ScheduleJ': {
        'groups': {
            'SkdJRltdOrgOffcrTrstKyEmpl': {
                'stream_key': 'employees',
                'ein': {'header':'ein'},
                'object_id': {'header':'object_id'},
                'PrsnNm': {'header':'name'},
                'BsnssNmLn1Txt': {'header':'business_name1'},
                'BsnssNmLn2Txt': {'header':'business_name2'},
                'TtlTxt': {'header':'title'},
                'TtlCmpnstnFlngOrgAmt': {
                    'header':'org_comp',
                    'default':0
                },
                'TtlCmpnstnRltdOrgsAmt': {
                    'header':'related_comp',
                    'default':0
                },
            }
        }
    }
}

In [3]:
from stream_extractor import StreamExtractor
import unicodecsv as csv

In [4]:
YEAR = 2017  # THIS MUST AGREE WITH OUR OTHER DATA
extractor = StreamExtractor(output_streams, data_capture_dict, YEAR)

Initializing output stream employees_detailed2017.csv
Initializing output stream diversions2017.csv


In [5]:
# read the whole file of orgs with efilings from part 1 here, it's not very long
file_rows = [] 
# We're using the output of part 1
with open('pdxefilers.csv', 'rb') as infile:
    reader = csv.DictReader(infile)
    for row in reader:
        file_rows.append(row)

In [6]:
for filing_count, row in enumerate(file_rows):
    this_object_id = row['OBJECT_ID']
    # don't need to pass taxpayer name in, but makes the output more readable
    extractor.run_filing(this_object_id, taxpayer_name=row['TAXPAYER_NAME'])
    extractor.run_filing(this_object_id)
    filing_count += 1
    if filing_count % 100 == 0:
        print("Processed %s filings" % filing_count)

Processed 100 filings
Processed 200 filings
Processed 300 filings
Processed 400 filings
Processed 500 filings
Processed 600 filings
Processed 700 filings
Processed 800 filings
Processed 900 filings
Processed 1000 filings
Processed 1100 filings
Processed 1200 filings
Processed 1300 filings
Processed 1400 filings
Processed 1500 filings
Processed 1600 filings
Processed 1700 filings
Processed 1800 filings


The output should be in employees_detailed2017.csv and diversions2017.csv