In [None]:
import csv
import glob
import shutil
import tarfile
import time
from google.cloud import storage, bigquery
from xml.etree import ElementTree as ET

In [None]:
# Columns that we want to extract from the XML files
columns = [
    "ein",
    "return_timestamp",
    "tax_period_end_date",
    "business_name",
    "address",
    "city",
    "state",
    "zip",
    "phone",
    "business_officer_name",
    "business_officer_title",
    "website",
    "formation_year",
    "mission",
    "voting_members_count",
    "total_employee_count",
    "total_volunteer_count",
    "revenue_from_grants",
    "revenue_from_program_services",
    "revenue_from_investment",
    "revenue_from_other",
    "total_revenue",
    "expenses_for_grants",
    "expenses_for_members",
    "expenses_for_employees",
    "expenses_for_professional_fundraising",
    "expenses_for_other",
    "total_expenses",
    "principal_office_salary",
    "states_where_form_990_filed",
    "states_where_form_990_filed_count",
    "description",
    "prog_srvc_accom_acty_2_grp_desc",
    "prog_srvc_accom_acty_3_grp_desc",
    "prog_srvc_accom_acty_other_grp_desc",
    "mission_desc",
    "other_expenses_group_desc",
]

In [None]:
# Data extraction function
def process_xml(file):
    relevant_data = {}
    root = ET.parse(file).getroot()

    # Remove namespace from all tags in the root
    for elem in root.iter():
        if "}" in elem.tag:
            elem.tag = elem.tag.split("}", 1)[1]

    # ReturnHeader and sub elements
    return_header = root.find("ReturnHeader")
    filer = return_header.find("Filer")
    business_name_element = filer.find("BusinessName")
    us_address = filer.find("USAddress")
    business_officer = return_header.find("BusinessOfficerGrp")

    # ReturnHeader columns
    ein = filer.find("EIN").text
    return_timestamp = return_header.find("ReturnTs").text
    tax_period_end_date = return_header.find("TaxPeriodEndDt").text
    business_name = business_name_element.find("BusinessNameLine1Txt").text
    if us_address is not None:
        address = us_address.find("AddressLine1Txt").text
        city = us_address.find("CityNm").text
        state = us_address.find("StateAbbreviationCd").text
        zip_code = us_address.find("ZIPCd").text
    else:
        address = None
        city = None
        state = None
        zip_code = None
    phone = filer.find("PhoneNum").text if filer.find("PhoneNum") is not None else None
    business_officer_name = business_officer.find("PersonNm").text
    business_officer_title = business_officer.find("PersonTitleTxt").text

    # ReturnHeader column assignment
    relevant_data["ein"] = ein
    relevant_data["return_timestamp"] = return_timestamp
    relevant_data["tax_period_end_date"] = tax_period_end_date
    relevant_data["business_name"] = business_name
    relevant_data["address"] = address
    relevant_data["city"] = city
    relevant_data["state"] = state
    relevant_data["zip"] = zip_code
    relevant_data["phone"] = phone
    relevant_data["business_officer_name"] = business_officer_name
    relevant_data["business_officer_title"] = business_officer_title

    # ReturnData sub elements
    return_data = root.find("ReturnData")
    irs990 = return_data.find("IRS990")
    # Not all files have irs 990 data, assign the rest of the columns to None and return
    if irs990 is None:
        relevant_data["website"] = None
        relevant_data["formation_year"] = None
        relevant_data["mission"] = None
        relevant_data["voting_members_count"] = None
        relevant_data["total_employee_count"] = None
        relevant_data["total_volunteer_count"] = None
        relevant_data["revenue_from_grants"] = None
        relevant_data["revenue_from_program_services"] = None
        relevant_data["revenue_from_investment"] = None
        relevant_data["revenue_from_other"] = None
        relevant_data["total_revenue"] = None
        relevant_data["expenses_for_grants"] = None
        relevant_data["expenses_for_members"] = None
        relevant_data["expenses_for_employees"] = None
        relevant_data["expenses_for_professional_fundraising"] = None
        relevant_data["expenses_for_other"] = None
        relevant_data["total_expenses"] = None
        relevant_data["principal_office_salary"] = None
        relevant_data["states_where_form_990_filed"] = None
        relevant_data["states_where_form_990_filed_count"] = None
        relevant_data["description"] = None
        relevant_data["prog_srvc_accom_acty_2_grp_desc"] = None
        relevant_data["prog_srvc_accom_acty_3_grp_desc"] = None
        relevant_data["prog_srvc_accom_acty_other_grp_desc"] = None
        relevant_data["mission_desc"] = None
        relevant_data["other_expenses_group_desc"] = None
        # output = json.dumps(relevant_data)
        return relevant_data

    form990_part_vii_section_a_grp = irs990.find("Form990PartVIISectionAGrp")
    prog_srvc_accom_acty_2_grp = irs990.find("ProgSrvcAccomActy2Grp")
    prog_srvc_accom_acty_3_grp = irs990.find("ProgSrvcAccomActy3Grp")
    prog_srvc_accom_acty_other_grp = irs990.find("ProgSrvcAccomActyOtherGrp")
    other_expenses_grp = irs990.find("OtherExpensesGrp")

    # ReturnData columns
    website = irs990.find("WebsiteAddressTxt").text if irs990.find("WebsiteAddressTxt") is not None else None
    formation_year = irs990.find("FormationYr").text if irs990.find("FormationYr") is not None else None
    mission = irs990.find("ActivityOrMissionDesc").text
    voting_members_count = irs990.find("VotingMembersGoverningBodyCnt").text
    total_employee_count = irs990.find("TotalEmployeeCnt").text
    total_volunteer_count = (
        irs990.find("TotalVolunteersCnt").text if irs990.find("TotalVolunteersCnt") is not None else None
    )
    revenue_from_grants = irs990.find("CYContributionsGrantsAmt").text
    revenue_from_program_services = irs990.find("CYProgramServiceRevenueAmt").text
    revenue_from_investment = irs990.find("CYInvestmentIncomeAmt").text
    revenue_from_other = irs990.find("CYOtherRevenueAmt").text
    total_revenue = irs990.find("CYTotalRevenueAmt").text
    expenses_for_grants = irs990.find("CYGrantsAndSimilarPaidAmt").text
    expenses_for_members = irs990.find("CYBenefitsPaidToMembersAmt").text
    expenses_for_employees = irs990.find("CYSalariesCompEmpBnftPaidAmt").text
    expenses_for_professional_fundraising = irs990.find("CYTotalProfFndrsngExpnsAmt").text
    expenses_for_other = irs990.find("CYOtherExpensesAmt").text
    total_expenses = irs990.find("CYTotalExpensesAmt").text
    principal_office_salary = (
        form990_part_vii_section_a_grp.find("ReportableCompFromOrgAmt").text
        if form990_part_vii_section_a_grp.find("ReportableCompFromOrgAmt") is not None
        else None
    )
    states_where_form_990_filed_list = irs990.findall("StatesWhereCopyOfReturnIsFldCd")
    states_where_form_990_filed_count = len(states_where_form_990_filed_list)
    states_where_form_990_filed = ",".join(sorted([state.text for state in states_where_form_990_filed_list]))
    description = irs990.find("Desc").text
    if prog_srvc_accom_acty_2_grp is not None:
        prog_srvc_accom_acty_2_grp_desc_element = prog_srvc_accom_acty_2_grp.find("Desc")
        if prog_srvc_accom_acty_2_grp_desc_element is not None:
            prog_srvc_accom_acty_2_grp_desc = prog_srvc_accom_acty_2_grp_desc_element.text
        else:
            prog_srvc_accom_acty_2_grp_desc = None
    else:
        prog_srvc_accom_acty_2_grp_desc = None

    if prog_srvc_accom_acty_3_grp is not None:
        prog_srvc_accom_acty_3_grp_element = prog_srvc_accom_acty_3_grp.find("Desc")
        if prog_srvc_accom_acty_3_grp_element is not None:
            prog_srvc_accom_acty_3_grp_desc = prog_srvc_accom_acty_3_grp_element.text
        else:
            prog_srvc_accom_acty_3_grp_desc = None
    else:
        prog_srvc_accom_acty_3_grp_desc = None

    if prog_srvc_accom_acty_other_grp is not None:
        prog_srvc_accom_acty_other_grp_element = prog_srvc_accom_acty_other_grp.find("Desc")
        if prog_srvc_accom_acty_other_grp_element is not None:
            prog_srvc_accom_acty_other_grp_desc = prog_srvc_accom_acty_other_grp_element.text
        else:
            prog_srvc_accom_acty_other_grp_desc = None
    else:
        prog_srvc_accom_acty_other_grp_desc = None

    mission_desc = irs990.find("MissionDesc").text if irs990.find("MissionDesc") is not None else None

    if other_expenses_grp is not None:
        other_expenses_grp_element = other_expenses_grp.find("Desc")
        if other_expenses_grp_element is not None:
            other_expenses_group_desc = other_expenses_grp_element.text
        else:
            other_expenses_group_desc = None
    else:
        other_expenses_group_desc = None

    # ReturnData column assignment
    relevant_data["website"] = website
    relevant_data["formation_year"] = formation_year
    relevant_data["mission"] = mission
    relevant_data["voting_members_count"] = voting_members_count
    relevant_data["total_employee_count"] = total_employee_count
    relevant_data["total_volunteer_count"] = total_volunteer_count
    relevant_data["revenue_from_grants"] = revenue_from_grants
    relevant_data["revenue_from_program_services"] = revenue_from_program_services
    relevant_data["revenue_from_investment"] = revenue_from_investment
    relevant_data["revenue_from_other"] = revenue_from_other
    relevant_data["total_revenue"] = total_revenue
    relevant_data["expenses_for_grants"] = expenses_for_grants
    relevant_data["expenses_for_members"] = expenses_for_members
    relevant_data["expenses_for_employees"] = expenses_for_employees
    relevant_data["expenses_for_professional_fundraising"] = expenses_for_professional_fundraising
    relevant_data["expenses_for_other"] = expenses_for_other
    relevant_data["total_expenses"] = total_expenses
    relevant_data["principal_office_salary"] = principal_office_salary
    relevant_data["states_where_form_990_filed"] = states_where_form_990_filed
    relevant_data["states_where_form_990_filed_count"] = states_where_form_990_filed_count
    relevant_data["description"] = description
    relevant_data["prog_srvc_accom_acty_2_grp_desc"] = prog_srvc_accom_acty_2_grp_desc
    relevant_data["prog_srvc_accom_acty_3_grp_desc"] = prog_srvc_accom_acty_3_grp_desc
    relevant_data["prog_srvc_accom_acty_other_grp_desc"] = prog_srvc_accom_acty_other_grp_desc
    relevant_data["mission_desc"] = mission_desc
    relevant_data["other_expenses_group_desc"] = other_expenses_group_desc

    # output = json.dumps(relevant_data)
    return relevant_data

In [None]:
# Input parameters including the year we want to process
project_id = "INSERT PROJECT ID"
bucket_name = "INSERT BUCKET NAME"
prefix = "irs-form-990"
sample = ["sample"]
years = ["2017", "2018", "2019", "2020", "2021", "2022"]
csv_file_name = "irs-990.csv"

In [None]:
# Initialize a storage client to download the compressed XML file for that year
storage_client = storage.Client(project=project_id)
bucket = storage_client.get_bucket(bucket_name)


Processing one year of data takes ~3 hours

In [None]:
# Process each file, saving the output in a CSV file.
with open(csv_file_name, "w", newline="", encoding="utf-8") as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=columns)
    writer.writeheader()
    for year in years:
        compressed_file = bucket.blob(f"{prefix}/compressed/xml-{year}.tar.gz")
        # Download the compressed file
        compressed_file.download_to_filename(f"xml-{year}.tar.gz")
        # Extract the XML files
        with tarfile.open(f"xml-{year}.tar.gz", "r:gz") as tar:
            tar.extractall(year)
        file_list = glob.glob(f"{year}/*.xml")
        i = 0
        for file in file_list:
            row = process_xml(file)
            writer.writerow(row)
            if i % 100000 == 0:
                print(year, i, time.strftime("%H:%M:%S", time.localtime()))
            i += 1
        shutil.rmtree(year)


In [None]:
# Upload the output file to GCS
storage_client = storage.Client(project=project_id)
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(f"{prefix}/csv/{csv_file_name}")
blob.upload_from_filename(csv_file_name)

In [None]:
# BigQuery inputs and target table schema
bigquery_dataset = "original"
bigquery_table = f"irs_990"
table_schema_bq = [
    bigquery.SchemaField("ein", "STRING"),
    bigquery.SchemaField("return_timestamp", "STRING"),
    bigquery.SchemaField("tax_period_end_date", "DATE"),
    bigquery.SchemaField("business_name", "STRING"),
    bigquery.SchemaField("address", "STRING"),
    bigquery.SchemaField("city", "STRING"),
    bigquery.SchemaField("state", "STRING"),
    bigquery.SchemaField("zip", "STRING"),
    bigquery.SchemaField("phone", "STRING"),
    bigquery.SchemaField("business_officer_name", "STRING"),
    bigquery.SchemaField("business_officer_title", "STRING"),
    bigquery.SchemaField("website", "STRING"),
    bigquery.SchemaField("formation_year", "INTEGER"),
    bigquery.SchemaField("mission", "STRING"),
    bigquery.SchemaField("voting_members_count", "INTEGER"),
    bigquery.SchemaField("total_employee_count", "INTEGER"),
    bigquery.SchemaField("total_volunteer_count", "INTEGER"),
    bigquery.SchemaField("revenue_from_grants", "INTEGER"),
    bigquery.SchemaField("revenue_from_program_services", "INTEGER"),
    bigquery.SchemaField("revenue_from_investment", "INTEGER"),
    bigquery.SchemaField("revenue_from_other", "INTEGER"),
    bigquery.SchemaField("total_revenue", "INTEGER"),
    bigquery.SchemaField("expenses_for_grants", "INTEGER"),
    bigquery.SchemaField("expenses_for_members", "INTEGER"),
    bigquery.SchemaField("expenses_for_employees", "INTEGER"),
    bigquery.SchemaField("expenses_for_professional_fundraising", "INTEGER"),
    bigquery.SchemaField("expenses_for_other", "INTEGER"),
    bigquery.SchemaField("total_expenses", "INTEGER"),
    bigquery.SchemaField("principal_office_salary", "INTEGER"),
    bigquery.SchemaField("states_where_form_990_filed", "STRING"),
    bigquery.SchemaField("states_where_form_990_filed_count", "INTEGER"),
    bigquery.SchemaField("description", "STRING"),
    bigquery.SchemaField("prog_srvc_accom_acty_2_grp_desc", "STRING"),
    bigquery.SchemaField("prog_srvc_accom_acty_3_grp_desc", "STRING"),
    bigquery.SchemaField("prog_srvc_accom_acty_other_grp_desc", "STRING"),
    bigquery.SchemaField("mission_desc", "STRING"),
    bigquery.SchemaField("other_expenses_group_desc", "STRING"),
    ]

In [None]:
# Create the target table
bigquery_client = bigquery.Client(project=project_id)
table_ref = bigquery_client.dataset(bigquery_dataset).table(bigquery_table)
table_def = bigquery.Table(table_ref, schema=table_schema_bq)
table = bigquery_client.create_table(table_def, exists_ok=True)

In [None]:
# Make sure the table is empty
query = f"TRUNCATE TABLE `{project_id}.{bigquery_dataset}.{bigquery_table}`"
bigquery_client.query(query)

In [None]:
# Load the csv file to BigQuery
job_config = bigquery.LoadJobConfig(
    schema=table_schema_bq,
    skip_leading_rows=1,
    source_format=bigquery.SourceFormat.CSV,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)

In [None]:
uri = f"gs://{bucket_name}/{prefix}/csv/{csv_file_name}"

In [None]:
load_job = bigquery_client.load_table_from_uri(uri, table_ref, job_config=job_config)
load_job.result()