The FHFA PUDB files are as space-padded, fixed-width columns, and the coding/data dictionary is in PDF files.

I downloaded the PUDB archives and data dictionaries using [downloader.ipynb](downloader.ipynb), extracted the (somewhat noisy) tables using `tabula`, and use them to automate CSV generation from the PUDB txt files.

In [None]:
import csv
import numpy as np
import pandas as pd
import re
import struct
import zipfile

In [None]:
def split_codes(val):
    for code in re.findall('[0-9.]+ = .*?(?=[0-9.]+ = |$)', val):
        fld = re.split(' = ', code)
        if len(fld) == 2 and re.match(r'^[\d.]+$', fld[0]):
            yield((fld[0].strip(), fld[1].strip()))
            

In [None]:
def split_recs(row):
    fld_nums = re.match('(\d+)-(\d+)$', row['Field #'])
    fld_names = re.match('(.*) (\d+)-(\d+)$', row['Field Name'])
    if fld_nums and fld_names:
        num_lo = int(fld_nums.group(1))
        num_hi = int(fld_nums.group(2))
        name_pfx = fld_names.group(1)
        name_lo = int(fld_names.group(2))
        name_hi = int(fld_names.group(3))
        for num_idx, name_idx in zip(
            range(num_lo, num_hi + 1),
            range(name_lo, name_hi + 1)):
            ret = row.copy()
            ret['Field Name'] = name_pfx + " " + str(name_idx)
            ret['Field #'] = num_idx
            yield ret
    else:
        ret = row.copy()
        ret['Field #'] = int(ret['Field #'])
        yield ret

In [None]:
def get_mf_metadata(filename):
    mf_metadata = pd.read_csv(filename, encoding='iso8859-1')
    mf_metadata['Values'] = mf_metadata['Values'].fillna('').apply(lambda val: dict(split_codes(val)))
    mf_metadata = mf_metadata[
        mf_metadata['Field #'].str.match(r'[\d-]+$') &
        ~pd.isnull(mf_metadata['Field #'])
    ]
    mf_metadata['Field #'] = mf_metadata['Field #'].astype(int)
    mf_metadata = mf_metadata.set_index('Field #').sort_index()
    return mf_metadata

In [None]:
def get_sf_metadata(filename):
    sf_metadata = pd.read_csv(filename, encoding='iso8859-1')
    sf_metadata['Values'] = sf_metadata['Values'].fillna('').apply(lambda val: dict(split_codes(val)))
    out = []
    sf_metadata = sf_metadata[
        sf_metadata['Field #'].str.match(r'[\d-]+$') &
        ~pd.isnull(sf_metadata['Field #'])
    ]
    sf_metadata.apply(
        lambda row: out.extend(list(split_recs(row))),
        axis=1)
    sf_metadata = pd.DataFrame(out).reset_index(drop=True)
    sf_metadata['Field #'] = sf_metadata['Field #'].astype(int)
    sf_metadata = sf_metadata.set_index('Field #').sort_index()
    return sf_metadata

Next we can load the data using the transformed metadata.

In [None]:
def fhfa_reader(path, metadata, decode, zip=None):
    line_fmt = ''
    for field in metadata.index:
        line_fmt += 'x{}s'.format(metadata.loc[field, 'Field Width'])
    line_fmt = line_fmt[1:]
    with (zip and zipfile.ZipFile(zip).open(path)) or open(path, 'rb') as f:
        for lineno, line in enumerate(f, 1):
            try:
                match = tuple(x.strip().decode('utf-8') for x in struct.unpack(line_fmt, line.strip()))
            except struct.error as e:
                raise ValueError(str(e) + " line length is {}".format(len(line.strip())))
                
            if not match:
                print("Failed to match line ", lineno)
            else:
                if decode:
                    yield tuple(metadata.loc[idx, 'Values'].get(i, i) for idx,i in enumerate(match, 1))
                else:
                    yield match

In [None]:
def fhfa_to_csv(basename, metadata, decode=False, zip=None):
    with open(basename+'.csv', 'w') as out:
        cout = csv.writer(out)
        cout.writerow(tuple(metadata['Field Name'].values))
        cout.writerows(fhfa_reader(basename + '.txt', metadata, decode, zip))

In [None]:
def do_year(year):
    print("Processing data for", year)
    mf_metadata = get_mf_metadata('{}_Multifamily_Census_Tract_File.csv'.format(year))
    sf_metadata = get_sf_metadata('{}_Single_Family_Census_Tract_File.csv'.format(year))
    print("  Freddie Mac multifamily")
    fhfa_to_csv('fhlmc_mf{}c_loans'.format(year), mf_metadata, zip='{}_MFCensusTract{}.zip'.format(year, year))
    print("  Fannie Mae multifamily")
    fhfa_to_csv('fnma_mf{}c_loans'.format(year), mf_metadata, zip='{}_MFCensusTract{}.zip'.format(year, year))
    print("  Freddie Mac single family")
    fhfa_to_csv('fhlmc_sf{}c_loans'.format(year), sf_metadata, zip='{}_SFCensusTractFRE{}.zip'.format(year, year))
    print("  Fannie Mae single family")
    fhfa_to_csv('fnma_sf{}c_loans'.format(year), sf_metadata, zip='{}_SFCensusTractFNM{}.zip'.format(year, year))    

In [None]:
%%time
for year in range(2008, 2017):
    %time do_year(year)