# Mexican federal budget pre-processing pipeline

## Instructions

To you run the notebook:

1. choose a unique `ITERATION_LABEL` for each pipeline run
2. specify and describe your input files (`INPUT_FILES`)
3. make sure your column mapping (`COLUMN_ALIASES`) is correct
3. run the whole notebook by clicking on __Kernel > Restart & Run All__

## Settings

Choose a unique iteration label for each pipeline run.

In [1]:
ITERATION_LABEL = 'iteration-12-new-data'

Put your input files inside the `pipeline.in` folder and describe them here.

In [2]:
INPUT_FILES = {
#     2010: {'name': 'Cuenta_Publica_2010.csv', 'encoding': 'windows-1252'},
#     2011: {'name': 'Cuenta_Publica_2011.csv', 'encoding': 'windows-1252'},
#     2012: {'name': 'Cuenta_Publica_2012.csv', 'encoding': 'windows-1252'},
#     2013: {'name': 'Cuenta_Publica_2013.csv', 'encoding': 'windows-1252'},
#     2014: {'name': 'Cuenta_Publica_2014.csv', 'encoding': 'windows-1252'},
#     2015: {'name': 'Cuenta_Publica_2015.csv', 'encoding': 'windows-1252'},
#     2016: {'name': '2016_2T_Gasto_OS.csv', 'encoding': 'windows-1252'} # cp850 for the original 2016 file
    2008: {'name': 'Cuenta_Publica_2008.csv', 'encoding': 'windows-1252'},
    2009: {'name': 'Cuenta_Publica_2009.csv', 'encoding': 'windows-1252'},
    2010: {'name': 'Cuenta_Publica_2010.csv', 'encoding': 'windows-1252'},
    2011: {'name': 'Cuenta_Publica_2011.csv', 'encoding': 'windows-1252'},
    2012: {'name': 'Cuenta_Publica_2012.csv', 'encoding': 'windows-1252'},
    2013: {'name': 'CP_2013.csv', 'encoding': 'windows-1252'},
    2014: {'name': 'CP_2014.csv', 'encoding': 'windows-1252'},
    2015: {'name': 'CP_2015.csv', 'encoding': 'windows-1252'},
}


If your input files don't all have the same column names, define your mapping here. 

In [3]:
COLUMN_ALIASES = {
    'Actividad Institucional': ['AI'],
    'Adefas': ['ADEFAS'],
    'Aprobado': [
        'PEF_2016',
        'Importe Presupuesto de Egresos de la Federación',
        'Importe Presupuesto de Egresos de la Federación (PEF)'
    ],
    'Ciclo': None,
    'Clave de cartera': ['CLAVE_CARTERA'],
    'Descripción de Fuente de Financiamiento': ['FUENTE_FINAN_DESCRIPCION'],
    'Descripción de Función': ['FUNCIONL_DESCRIPCION'],
    'Descripción de Grupo Funcional': [
        'Descripción de Finalidad',
        'GRUPO_FUN_DESCRIPCION',
        'Descripción de Grupo Funcional'
    ],
    'Descripción de Objeto del Gasto': ['CONCEPTO_DESCRIPCION'],
    'Descripción de Programa Presupuestario': ['PROGR_PRES_DESCRIPCION'],
    'Descripción de Ramo': ['RAMO_DESCRIPCION'],
    'Descripción de Reasignacion': ['REASIGNACION_DESCRIPCION'],
    'Descripción de Subfunción': ['SUBFUNCIONL_DESCRIPCION', 'Descripción de subfunción'],
    'Descripción de Tipo de Gasto': ['TIPO_GASTO_DESCRIPCION'],
    'Descripción de Unidad Responsable': ['UNIDAD_DESCRIPCION'],
    'Descripción de la Actividad Institucional': [
        'ACTIVIDAD_INST_DESCRIPCION',
        'Descripción de Actividad Institucional'
    ],
    'Descripción de Entidad Federativa': ['Descripción de la entidad federativa', 'ENTIDAD_FED_DESCRIPCION'],
    'Descripción de la modalidad del programa presupuestario': [
        'MODALIDAD_DESCRIPCION',
        'Descripción del Identificador del Programa Presupuestario',
        'Descripción del Identificador de Programa Presupuestario'
    ],
    'Devengado': None,
    'Ejercicio': None,
    'Ejercido': None,
    'Entidad Federativa': ['EF'],
    'Fuente de Financiamiento': ['FF', 'Fuente de Finaciamiento'],
    'Función': ['FN'],
    'Grupo Funcional': [
        'Finalidad', 'GF', 'Grupo Funcional'
    ],
    'Modalidad del Programa presupuestario': [
        'MOD',
        'Identificador de Programa Presupuestario',
        'Identificador del Programa Presupuestario'
    ],
    'Modificado': None,
    'Objeto del Gasto': ['CONCEPTO'],
    'Pagado': None,
    'Programa Presupuestario': ['PP'],
    'Ramo': ['RAMO'],
    'Reasignacion': ['RA'],
    'Subfunción': ['SF'],
    'Tipo de Gasto': ['TG'],
    'Unidad Responsable': ['UNIDAD'],
    'Capitulo': None,
    'Concepto': None,
    'Partida Genérica': None,
    'Partida Específica': None,
    'Descripción de Capitulo': None,
    'Descripción de Concepto': None,
    'Descripción de Partida Genérica': None,
    'Descripción de Partida Específica': ['Descripcion de Partida Específica'],    
}

The following hierarchical categories will have IDs prefixed with the parent categories:

In [4]:
HIERARCHIES = {
    'functional': [
        'Grupo Funcional', 
        'Función', 
        'Subfunción', 
        'Actividad Institucional'
    ],
    'administrative': [
        'Ramo', 
        'Unidad Responsable'
    ],
    'activities': [
        'Modalidad del Programa presupuestario', 
        'Programa Presupuestario'
    ],
}

The following columns are unsused and removed at the end of the pipeline:

In [5]:
REMOVE_OUTPUT_COLUMNS = [
    'Reasignacion',
    'Objeto del Gasto',
    'Descripción de Reasignacion',
    'Descripción de Objeto del Gasto'
]

In [6]:
REMOVE_INPUT_COLUMNS = {
    2016: [
        'Adefas',
        'Partida Específica',
        'Partida Genérica',
        'Descripción de Partida Genérica',
        'Descripcion de Partida Específica',
        'Ejercicio',
        'Devengado',
        'Ejercido',
    ]
}

That's it. Now just run the notebook from beginning to end.

## Imports

In [7]:
from sys import stdout
from pandas import read_csv, concat, DataFrame, ExcelWriter, ExcelFile, Series
from numpy import nan, isnan
from os.path import join, isdir
from os import mkdir
from json import dumps, loads
from pprint import pprint

## Configuration

In [8]:
BASENAME = 'mexican_federal_budget'
INPUT_FOLDER = 'pipeline.in.v2'
OUTPUT_FOLDER = 'pipeline.out'
ITERATION_FOLDER = join(OUTPUT_FOLDER, ITERATION_LABEL)
MERGED_FILE = join(ITERATION_FOLDER, BASENAME + '.merged.csv')
CATALOGS_FOLDER = 'objeto_del_gasto.catalog'
CATALOGS_FILE = 'objeto_del_gasto.catalog.xlsx'

In [9]:
if isdir(ITERATION_FOLDER):
    raise ValueError('Please enter a unique iteration label')
    
mkdir(ITERATION_FOLDER)

## Encoding inspection

Detect the file encodings of the input files using the `cChardet` utility library. __Warning:__ it's not always accurate. This is meant only as an indication only. In the end, encodings will be taken from `INPUT_FILES`.

In [10]:
def detect_encodings():
    """Detect CSV file encoding with the cChardet library"""

    try:
        import cchardet as chardet
    except ImportError:
        cChardet = 'https://github.com/PyYoshi/cChardet'
        print('Encoding inspection skipped: install %s', cChardet)
        return

    results = {}
    results_file = join(OUTPUT_FOLDER, ITERATION_LABEL, 'encodings.detected.json')
    
    for year, file in sorted(INPUT_FILES.items()):
        datafile = join(INPUT_FOLDER, file['name'])
        
        with open(datafile, 'rb') as f:
            text = f.read()
            
        result = chardet.detect(text)
        results.update({year: result})
        print(year, 'Inspected', file['name'], result)
    
    with open(results_file, 'w+') as json:
        json.write(dumps(results, indent=4))
        print('\nSaved encoding detection report to', results_file)
        
# detect_encodings()

## Load files

In [11]:
def read_columns(file, encoding):
    """Return clean CSV file headers"""
    
    with open(file, encoding=encoding) as csv:
        header = csv.readline()
        return header.replace('\n', '').split(',')

In [12]:
def force_strings(columns):
    """Return string enforcement for each column of a CSV file"""
    
    for column in columns:
        yield column, str

In [13]:
def load_csv_files():
    """Load raw data (CSV) files"""
    
    batch = {}
    
    for year, file in sorted(INPUT_FILES.items()):
        filepath = join(INPUT_FOLDER, file['name'])
        column_names = read_columns(filepath, file['encoding'])
        column_types = dict(force_strings(column_names))
        
        batch[year] = read_csv(filepath, encoding=file['encoding'], dtype=column_types)
        print('Loaded', file['name'], 'with encoding', file['encoding'])
    
    print()
    stdout.flush()

    for year in sorted(INPUT_FILES.keys()):
        if year in REMOVE_INPUT_COLUMNS:
            for column in REMOVE_INPUT_COLUMNS[year]:
                try:
                    del batch[year][column]
                    print(year, 'deleted', column)
                except KeyError:
                    print(year, column, 'not found in', file['name'])

        stdout.flush()

    return batch

## Clean the data

In [14]:
def strip_cell_padding(batch):
    for year in sorted(batch.keys()):
        for column in batch[year].columns:
            batch[year].rename(columns={column: column.strip()}, inplace=True)
            batch[year][column] = batch[year][column].apply(lambda x: x.strip() if x is not nan else x)
        print(year, 'stripped cell paddings')
        stdout.flush()

In [15]:
def delete_empty_columns(batch):
    for year in batch.keys():
        for column in batch[year].columns:
            if 'Unnamed:' in column:
                try:
                    del batch[year][column]
                    print(year, column, 'deleted')
                    stdout.flush()
                except KeyError:
                    pass  

In [16]:
def count_missing_values(batch):
    collector = {}
    table = []

    for column in get_union_of_columns(batch):
        row = {'Column': column}
        collector.update({column: []})
        
        for year in batch.keys():
            if column in batch[year].columns:
                is_empty = batch[year][column].isnull()
                empty_lines = batch[year].where(is_empty).dropna(how='all')
                collector[column].extend(empty_lines.to_dict(orient='records'))
                nb_empty_cells = len(empty_lines)
            else:
                nb_empty_cells = nan
                
            row.update({year: nb_empty_cells})
            if nb_empty_cells not in (nan, 0):
                print(year, 'found', nb_empty_cells, 'missing values in', column)

        table.append(row)
        
    ordered_columns = ['Column']
    ordered_columns.extend(sorted(batch.keys()))
    empty_values_overview_table = DataFrame(table).reindex_axis(ordered_columns, axis=1)
    
    return empty_values_overview_table, collector

In [17]:
def count_duplicates(batch):
    for year, df in sorted(batch.items()):
        nb_duplicate_lines = df.duplicated().apply(lambda x: 1 if x is True else 0).sum()
        print(year, 'found', nb_duplicate_lines, 'duplicate lines')

## Alias column names

In [18]:
def get_union_of_columns(batch):
    union = set()
    for year in batch.keys():
        union = union | set(batch[year].columns)
    return union

In [19]:
from yaml import load

def load_aliases(file):
    with open(file) as yaml:
        aliases = load(yaml.read())
        return aliases

In [20]:
def map_columns_to_aliases(batch, list_of_aliases):
    for year in sorted(batch.keys()):
        for column in sorted(batch[year].columns):
            if not column in list_of_aliases:
                for reference, aliases in list_of_aliases.items():
                    if aliases:
                        if column in aliases:
                            batch[year].rename(columns={column: reference}, inplace=True)
                            print(year, column, 'replaced with', reference)
                            stdout.flush()
                            break  
                else:
                    print(year, 'NO ALIAS REGISTERED FOR', column)
                    stdout.flush()

In [21]:
def build_overview(batch):
    table = []
    
    for column in get_union_of_columns(batch):
        row = {'Column': column}
        for year in batch.keys():
            row.update({year: column in batch[year].columns})
        table.append(row)
        
    ordered_columns = ['Column']
    ordered_columns.extend(sorted(batch.keys()))
    
    overview = DataFrame(table).reindex_axis(ordered_columns, axis=1)
    print('Column mapping overview: done')
    return overview

## Check expenditure sums

There's a little cleaning to do on the amount columns (zeros represented by a dash). Assume thousands are seperated by a comma.

In [22]:
EXPENDITURE_COLUMNS = [
    'Ejercido', 
    'Devengado', 
    'Aprobado', 
    'Pagado', 
    'Modificado', 
    'Adefas', 
    'Ejercicio'
]
count = 0

def clean_expenditure_columns(batch):
    check_sums = []

    for column in EXPENDITURE_COLUMNS:
        row = {'Column': column}
        
        for year in sorted(batch.keys()):
            try:
                series = batch[year][column]
                
                # I'm assuming a single '-' represents zero
                series = series.apply(lambda x: '0' if x == '-' else x)
                try:
                    series = series.apply(lambda x: x.replace(',', '') if x is not nan else x)    
                except AttributeError:
                    if count < 10:
                        print(year, column)
                batch[year][column] = series.astype(float)
                check_sum = batch[year][column].sum()
                
                print(year, 'cleaned and summed', column, '=', check_sum, 'pesos')
                
            except KeyError:
                check_sum = nan
                
            row.update({year: check_sum})
        
        check_sums.append(row)

    ordered_columns = ['Column']
    ordered_columns.extend(sorted(batch.keys()))
    return DataFrame(check_sums).reindex_axis(ordered_columns, axis=1)    

## Objeto del Gasto Column split

In [23]:
from os.path import join

def generate_catalog(file):
    
    catalog_ = {}
    catalog_file = ExcelFile(file)
    INDEX_COLUMN = 0
    
    for sheet in catalog_file.sheet_names:
        if sheet != 'Concatenated':
            name = sheet.lower().replace(' ', '_')
            output = join('objeto_del_gasto.catalog', name + '.csv')

            df = catalog_file.parse(sheet).dropna()
            index = df.columns[INDEX_COLUMN]

            df[index] =  df[index].astype(str)
            df.set_index(index, inplace=True)
            df = df.groupby(df.index).first()
            df.sort_index(inplace=True)
            
            message = 'Loaded catalog {sheet} into "{name}" ({nb} lines)'
            parameters = dict(sheet=sheet, name=name, nb=len(df))

            print(message.format(**parameters))
            catalog_[name] = df['DESCRIPCION']
    
    print()
    return catalog_

__Note!__ Years are hard coded in the script below.

In [24]:
def split_objeto_del_gasto(batch):
    catalog = generate_catalog(CATALOGS_FILE)
    missing_in_catalog = []
    
    def has_digits(n, N):
        return not isinstance(n, float) and len(n) >= N 
            

    def lookup(n, table, year):
        try:
            return catalog[table].loc[n]
        except KeyError:
            missing_in_catalog.append({'year': year, 'table': table, 'ID': n})
            return nan
        except TypeError:
            # n is nan
            return nan
    
    for year in sorted(batch.keys()):
        if year == 2016:
            print('Skipping', year, 'because the raw CSV already has the required columns')
        
        else:
            objeto = batch[year]['Objeto del Gasto'].astype(str)

            batch[year]['Capitulo'] = objeto.apply(lambda x: x[0] + '000' if x not in (nan, 'nan') else nan)
            batch[year]['Concepto'] = objeto.apply(lambda x: x[:2] + '00' if x not in (nan, 'nan') else nan)
            batch[year]['Descripción de Capitulo'] = batch[year]['Capitulo'].map(lambda x: lookup(x, 'capitulo', year))  
            batch[year]['Descripción de Concepto'] = batch[year]['Concepto'].map(lambda x: lookup(x, 'concepto', year))  
            
            nb_generica_digits = 4 if year in (2008, 2009, 2010) else 3
            
            # Skip the LAST year of the dataset (currently 2016) it has split columns already
            batch[year]['Partida Genérica'] = objeto.apply(lambda x: x[:nb_generica_digits] if has_digits(x, 4) else nan)
            batch[year]['Descripción de Partida Genérica'] = batch[year]['Partida Genérica'].map(lambda x: lookup(x, 'partida_generica', year))  
            
            if year not in (2008, 2009, 2010):
                batch[year]['Partida Específica'] = objeto.apply(lambda x: x if has_digits(x, 5) else nan)
                batch[year]['Descripción de Partida Específica'] = batch[year]['Partida Específica'].map(lambda x: lookup(x, 'partida_específica', year) if has_digits(x, 5) else nan)  
            else:
                batch[year]['Partida Específica'] = nan
                batch[year]['Descripción de Partida Específica'] = nan

            print(year, 'broke down "Objeto del Gasto" column')
        
    return DataFrame(missing_in_catalog).drop_duplicates(['ID', 'table'])

## Prefix IDs 
Disambiguating sub-categories may require prefixing their IDs with their parents' IDs.

In [25]:
def prefix_ids(batch):
    for year in batch.keys():       
        for hierarchy, levels in HIERARCHIES.items():
            prefix = batch[year]['Ciclo'].apply(lambda x: '')
            for n, level in enumerate(levels):
                dash = '.' if n > 0 else ''
                prefix = prefix + dash + batch[year][level]  
                batch[year][level] = prefix
                
                print(year, 'prefixed', hierarchy, 'level', n, level)
                stdout.flush()

## Remove unused columns

In [26]:
def remove_unused_columns(batch):
    for year, budget in batch.items():
        for column in REMOVE_OUTPUT_COLUMNS:
            try:
                del budget[column]
                print(year, 'deleted', column)
            except KeyError:
                pass

##  Pipeline

In [27]:
def do_pipeline():

    def echo_section(section):
        print('\n', section, '\n')

    echo_section('Loading files')
    datasets = load_csv_files()
    
    echo_section('Delete empty columns')
    delete_empty_columns(datasets)

    echo_section('Stripping padding from cells')
    strip_cell_padding(datasets)
    
    echo_section('Counting duplicate lines (NOT de-duplicating)')
    count_duplicates(datasets)
    
    echo_section('Mapping column to aliases')
    map_columns_to_aliases(datasets, COLUMN_ALIASES)

    echo_section('Counting missing values')
    missing_values_report, bad_records = count_missing_values(datasets)
    
    echo_section('Building column mapping overview')
    column_mapping_report = build_overview(datasets)
    
    echo_section('Cleaning expenditure columns')
    sums_report = clean_expenditure_columns(datasets)
    
    echo_section('Breaking down Objeto del Gasto column')
    missing_catalog_ids = split_objeto_del_gasto(datasets)
        
    echo_section('Prefixing IDs of certain category hierarchies')
    prefix_ids(datasets)

    echo_section('Removing unused columns')
    remove_unused_columns(datasets)

    echo_section('Saving pipeline configuration')

    reports_file = join(ITERATION_FOLDER, BASENAME + '.reports.xlsx')
    writer = ExcelWriter(reports_file)    
    missing_values_report.to_excel(writer, 'missing values', encoding='utf-8', index=False)
    column_mapping_report.to_excel(writer, 'column mapping', encoding='utf-8', index=False)
    sums_report.to_excel(writer, 'check sums', encoding='utf-8', index=False)
    missing_catalog_ids.to_excel(writer, 'missing_catalog_IDs', encoding='utf-8', index=False)    
    print('Saved 4 reports to', reports_file)    

    aliases_file = join(ITERATION_FOLDER, BASENAME + '.aliases.json')
    inputs_file = join(ITERATION_FOLDER, BASENAME + '.inputs.json')
    levels_file = join(ITERATION_FOLDER, BASENAME + '.levels.json')
    bad_records_file = join(ITERATION_FOLDER, BASENAME + '.missing.json')

    with open(bad_records_file, 'w+') as json:
        json.write(dumps(bad_records, indent=4))
        
    with open(aliases_file, 'w+') as json:
        json.write(dumps(COLUMN_ALIASES, indent=4))
        
    with open(levels_file, 'w+') as json:
        json.write(dumps(HIERARCHIES, indent=4))
        
    with open(inputs_file, 'w+') as json:
        json.write(dumps(INPUT_FILES, indent=4))
    
    print('Saved input configuration to', inputs_file)    
    print('Saved column aliases to', aliases_file) 
    print('Saved bad records (those with empty cells) to', bad_records_file)    
    print('Saved hierarchy levels used for prefixing to', levels_file) 
    
    echo_section('Pipeline run "%s" done' % ITERATION_LABEL)

    return datasets, missing_catalog_ids, column_mapping_report, missing_values_report, sums_report

## Run the pipeline

In [28]:
budgets, missing_ids, column_mapping, missing_values, sums = do_pipeline()


 Loading files 

Loaded Cuenta_Publica_2008.csv with encoding windows-1252
Loaded Cuenta_Publica_2009.csv with encoding windows-1252
Loaded Cuenta_Publica_2010.csv with encoding windows-1252
Loaded Cuenta_Publica_2011.csv with encoding windows-1252
Loaded Cuenta_Publica_2012.csv with encoding windows-1252
Loaded CP_2013.csv with encoding windows-1252
Loaded CP_2014.csv with encoding windows-1252
Loaded CP_2015.csv with encoding windows-1252


 Delete empty columns 

2009 Unnamed: 25 deleted
2009 Unnamed: 26 deleted
2009 Unnamed: 27 deleted
2009 Unnamed: 28 deleted
2009 Unnamed: 29 deleted
2009 Unnamed: 30 deleted
2009 Unnamed: 31 deleted
2009 Unnamed: 32 deleted
2009 Unnamed: 33 deleted
2009 Unnamed: 34 deleted
2009 Unnamed: 35 deleted
2009 Unnamed: 36 deleted
2009 Unnamed: 37 deleted
2009 Unnamed: 38 deleted
2009 Unnamed: 39 deleted
2011 Unnamed: 25 deleted
2011 Unnamed: 26 deleted
2011 Unnamed: 27 deleted
2011 Unnamed: 28 deleted
2011 Unnamed: 29 deleted
2011 Unnamed: 30 deleted
201

In [29]:
from gc import collect
collect()

3057

In [None]:
for year, budget in budgets.items():
    filepath = MERGED_FILE.replace('merged', str(year))
    budget.to_csv(filepath, encoding='utf-8', index=False)
    print('Saved', filepath)
    stdout.flush()

Saved pipeline.out/iteration-12-new-data/mexican_federal_budget.2008.csv
Saved pipeline.out/iteration-12-new-data/mexican_federal_budget.2009.csv
Saved pipeline.out/iteration-12-new-data/mexican_federal_budget.2010.csv
Saved pipeline.out/iteration-12-new-data/mexican_federal_budget.2011.csv
Saved pipeline.out/iteration-12-new-data/mexican_federal_budget.2012.csv
Saved pipeline.out/iteration-12-new-data/mexican_federal_budget.2013.csv
Saved pipeline.out/iteration-12-new-data/mexican_federal_budget.2014.csv
Saved pipeline.out/iteration-12-new-data/mexican_federal_budget.2015.csv


In [None]:
merged = concat(list(budgets.values()))
merged.to_csv(MERGED_FILE, encoding='utf-8', index=False)
print('Saved merged dataset to', MERGED_FILE)    

## Quality control

In [None]:
sorted(list(budget.columns))

In [None]:
merged.sample(n=10)

In [None]:
objeto_breakdown = [
    'Ciclo', 
    'Capitulo', 'Concepto', 
    'Partida Específica', 
    'Partida Genérica'
]
merged[objeto_breakdown].sample(n=20)

In [None]:
print('Total: missing', len(missing_ids), 'catalog IDs to breakdown the "Objeto del Gasto" column')
print('Tables:', dict(missing_ids.groupby('table').count()['ID']))
print('Years:', dict(missing_ids.groupby('year').count()['ID']))
try:
    missing_ids.sample(n=20)
except ValueError:
    pass

In [None]:
missing_ids.info()

In [None]:
column_mapping

In [None]:
missing_values

In [None]:
sums

In [None]:
merged.sample(n=20) 

In [None]:
breakdown = [
    'Ciclo', 
    'Capitulo', 
    'Concepto', 
    'Partida Genérica',        
    'Partida Específica', 
    'Descripción de Capitulo',
    'Descripción de Concepto', 
    'Descripción de Partida Genérica',
    'Descripción de Partida Específica'
]

merged[breakdown].sample(n=200)

In [None]:
merged.info()