In [0]:
from unidecode import unidecode
import pandas as pd
import numpy as np
import re
import os
from datetime import datetime

In [0]:
def format_as_character(x):
    """
    Formats string list in a standarized format: removing multiple spaces and non alphanumeric characters, standarizing words to not have accents and making all letters lowercase
    
    Parameters:
    x: string list to format
    
    Returns:
    String list formatted
    """
    # remove accents and make lowercase
    x = [unidecode(str(x)).lower() for x in x]
    # remove leading and tailing white spaces
    x = pd.Series(x).str.strip()
    # replace multiple spaces with one
    x = pd.Series([re.sub(r"\s+", " ", x) for x in x])
    # remove non alphanumeric characters
    x = pd.Series([re.sub(r"[^a-zA-Z0-9_ ]", " ", x) for x in x])
    # replace yet again multiple spaces with one
    x = pd.Series([re.sub(r"\s+", " ", x) for x in x])
    # remove leading and tailing white spaces yet again
    x = x.str.strip()
    # replace spaces with underscores
    x = pd.Series([re.sub(" ", "_", x) for x in x])
    # replace string nans with numpy nans
    x = [x if x != 'nan' else np.NaN for x in list(x)]

    return x


In [0]:
def list_files(path, full_path = False):
  
    """

    Returns list of files in the input path. 

    Parameters:
    path: path for which to get list of files.
    full_path: flag indicating whether to return the full path.

    Returns:
    List of files on path. 

    """
  
    if full_path:
        file_names = [os.path.join(path, f) for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
    else:
        file_names = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]

    # make sure to remove temporary files
    file_names = [x for x in file_names if not x.startswith('~$')]

    return file_names

In [0]:
def get_output_file_name(config, file_name, file_dir = None, is_model = False):
  
    """

    Generates the full path for an output file. It creates file_dir if it does not exist in the config dictionary, and adds a time stamp before the file name. If the output is a model, the timestamp includes hours minutes and seconds, and is restricted to the day otherwise.

    Parameters:
    config: config dictionary as defined in get_config.
    file_name: base file name.
    file_dir: name of the folder in the outputs folder in which to save the output.
    is_model: indicates whether the output is from modelling

    Returns:
    File name of the form {file_dir}/{time_stamp}_{file_name}

    """
    
    if is_model:
        assert file_dir is None
        full_path = config['paths']['models_folder']
        if not os.path.isdir(full_path):
          os.makedirs(full_path) 
    else:
        full_path = config['paths']['output']

    if file_dir is not None:
        full_path = os.path.join(full_path, file_dir)
        if not os.path.isdir(full_path):
            os.makedirs(full_path)

    if is_model:
        full_path = os.path.join(full_path, '{}_{}'.format(datetime.today().strftime('%Y%m%d_%H%M%S'), file_name))
    else:
        full_path = os.path.join(full_path, '{}_{}'.format(datetime.today().strftime('%Y%m%d'), file_name))

    return full_path   

In [0]:
def save_dict_to_excel(dict_to_save, config, output_file = None, output_folder = None, full_file_name = None, index = False, is_model = False):
  
    """

    Saves a dictionary of data frames into one excel with several sheets. The keys of the dictionary define the names of the sheets. The output path is obtained throgh get_output_file_name

    Parameters:
    dict_to_save: dictionary of data frames to be saved.
    output_file: base file name.
    output_folder: name of the folder in the outputs folder in which to save the output.
    config: config dictionary as defined in get_config.
    index: indicates whether the to include the index in the excel sheets.

    Returns:
    Nothing

    """  
  
    if full_file_name is None:
      full_file_name = get_output_file_name(config, output_file, output_folder, is_model)
    writer = pd.ExcelWriter(full_file_name, engine='xlsxwriter')
    for i in dict_to_save.keys():
        dict_to_save[i].to_excel(writer, sheet_name = i, index = index)
    writer.save()
    return None

In [0]:
def get_columns_dict(config, table_name):

    """

    Extracts information on a specific dataset from the config dictionary.

    Parameters:
    config: config dictionary as defined in get_config.
    table_name: table name for which to extract the information.

    Returns:
    Dictionary with the columns to drop, to keep, to rename, the id columns, the NA replacements to perform and the column formats as saved in the config dictionary.

    """
  
    columns_dict = {
        'to_drop': get_table_info(config, table_name, 'cols_to_drop'),
        'to_keep': get_table_info(config, table_name, 'cols_to_keep'),
        'new_names': get_table_info(config, table_name, 'renaming_cols'),
        'id_cols': get_table_info(config, table_name, 'id_cols'),
        'na_replacements': get_table_info(config, table_name, 'fill_nas'),
        'formats': get_table_formats(config, table_name)
    }

    return columns_dict

In [0]:
def get_table_formats(config, table_name):
  
    """

    Extracts column formats for a specific dataset from the config dictionary.

    Parameters:
    config: config dictionary as defined in get_config.
    table_name: table name for which to extract the information.

    Returns:
    Dictionary with the different table formats to assign.

    """
    
    
    table_formats = get_table_info(config, table_name, 'column_formats')

    if table_formats is not None:
        formats_declared = list(table_formats.keys())

        columns_declared = []
        for i in formats_declared:
            if i == 'dummify':
                pass
            else:
                columns_declared.extend(table_formats[i])

        assert len(columns_declared) == len(
            set(columns_declared)), 'There are columns duplicated in the format section for {}'.format(table_name)

        allowed_formats = ['numeric', 'character', 'dummify', 'forced_numeric', 'id']
        assert all([i in allowed_formats for i in
                    formats_declared]), 'The get_table_formats only accepts numeric, forced numeric, character, id, and dummy formats'

    return table_formats

In [0]:
def get_table_info(config, table_name, info_label):
  
    """

    Extracts a specific piece of information (e.g., columns to drop) from the params specified for the data set in the config dictionary

    Parameters:
    config: config dictionary as defined in get_config.
    table_name: table name for which to extract the information.
    info_label: information to extract.

    Returns:
    Information for one action (e.g., drop columns) specified for the inputted data set in the config dictionary

    """  
  
    assert table_name in config['data_params'].keys(), 'The table specified is not in the configuration file'
    if info_label in config['data_params'][table_name].keys():
        return config['data_params'][table_name][info_label]
    else:
        return None


In [0]:
def clean_df(df, config, table_name, columns_dict=None, check_column_existance=True, verbose=False):
  
    """

    Performs several cleaning actions on the inputted data set, and returns the cleaned version. This actions are:

    - Using format_as_character to format the column names.
    - Rename the columns as specified in the config file.
    - Drop columns as specified in the config file.
    - Replaces NAs as specified in the config file.
    - Keeps columns as specified in the config file.
    - Formatting columns in different formats:
        - character (using format_as_character)
        - numeric
        - forced numeric
        - id (replacing NAs with -1)

    Parameters:
    df: raw data frame.
    config: config dictionary as defined in get_config.
    table_name: data source name, used for getting the parameters of the data set from the confing file..
    columns_dict: used in case the user wants to override information in the config file.
    check_column_existance: flag indicaing whether to tests that the columns that should be treated exist (throws an error if they don't).
    verbose: flag indicating wether to print some statements.

    Returns:
    Cleaned dataset, with the parameters specified in config.

    """
  
    # TODO: add option to omit cleaning the df (to get the completely raw data)

    # format column names
    df.columns = format_as_character(df.columns)

    # create dictionary with columns, unless specified in arguments
    if config is not None:
        columns_dict = get_columns_dict(config, table_name)

    # if there is a replacement dictionary for names, apply it to the data frame
    if columns_dict['new_names'] is not None:
        # in this case tests are not necessary
        df.rename(mapper=columns_dict['new_names'], axis='columns', inplace=True)

    if columns_dict['to_drop'] is not None:
        if check_column_existance:
            # test that the columns to drop are in the data frame
            test_columns_are_in_df(df, columns_dict['to_drop'])
        else:
            columns_dict['to_drop'] = keep_existing_cols(df, columns_dict['to_drop'])

        # drop the columns specified in the json
        df.drop(columns=columns_dict['to_drop'], inplace=True)

    if columns_dict['na_replacements'] is not None:
        # test that the columns defined in the previous dictionary exist
        test_columns_are_in_df(df, list(columns_dict['na_replacements'].keys()))

        # replace nas accordingly to previous dictionary
        df.fillna(value=columns_dict['na_replacements'], inplace=True)

    if columns_dict['to_keep'] is not None:
        if len(columns_dict['to_keep']) > 0:
            # make sure the columns specified in the dictionary are in the data frame - this test has to be run always
            test_columns_are_in_df(df, columns_dict['to_keep'])
            df = df[columns_dict['to_keep']]

    if columns_dict['formats'] is not None:
        if 'character' in columns_dict['formats'].keys():
            for char_col in columns_dict['formats']['character']:
                if char_col not in list(df.columns):
                    print('Column {} has not been found in an instance of {}'.format(char_col, table_name))
                else:
                    if verbose:
                        print('Formatting column {} from {} as character'.format(char_col, table_name))
                    df.loc[:, char_col] = format_as_character(df[char_col])
        if 'numeric' in columns_dict['formats'].keys():
            for num_col in columns_dict['formats']['numeric']:
                if num_col not in list(df.columns):
                    print('Column {} has not been found in an instance of {}'.format(num_col, table_name))
                else:
                    if verbose:
                        print('Formatting column {} from {} as numeric'.format(num_col, table_name))
                    df.loc[:, num_col] = df[num_col].astype(float)
        if 'dummify' in columns_dict['formats'].keys():
            new_df = pd.get_dummies(df, dummy_na=True, columns=columns_dict['formats']['dummify'])
            assert new_df.shape[0] == df.shape[0]
            assert new_df.shape[1] > df.shape[1]
            df = new_df
            if verbose:
                print('Columns {} have been dummified'.format(', '.join(columns_dict['formats']['dummify'])))
        if 'forced_numeric' in columns_dict['formats'].keys():
            for num_col in columns_dict['formats']['forced_numeric']:
                if num_col not in list(df.columns):
                    print('Column {} has not been found in an instance of {}'.format(num_col, table_name))
                else:
                    if verbose:
                        print('Formatting forcebly column {} from {} as numeric'.format(num_col, table_name))
                    df.loc[:, num_col] = pd.to_numeric(df[num_col], errors='coerce')
        if 'id' in columns_dict['formats'].keys():
            for id_col in columns_dict['formats']['id']:
                if id_col not in list(df.columns):
                    print('Column {} has not been found in an instance of {}'.format(num_col, table_name))
                else:
                    if verbose:
                        print('Formatting forcebly column {} from {} as id'.format(num_col, table_name))
                    df.loc[:, id_col] = pd.to_numeric(df[id_col], errors = 'coerce').fillna(-1).astype(int).astype(str)
#                     df.loc[:, id_col] = df[id_col].fillna('-1').astype(float, errors = 'ignore').astype(int).astype(str).fillna('-1')
                    pct_nas = (df[id_col] == '-1').mean()
                    if pct_nas > 0:
                        print('For table {} and column {}, the percentage of NAs is {}'.format(table_name, id_col, str(pct_nas)))

    return df


In [0]:
def keep_existing_cols(df, cols):
  
    """

    Given a data frame and a list of column names, gets the columns that are in the data frame

    Parameters:
    df: data frame where the columns kept exist.
    cols: list of column names.

    Returns:
    List of column names.

    """
    
    return list(set(cols).intersection(set(df.columns)))

In [0]:
def get_files_to_read(config, table_name):
  
    """

    Gives the files to read for reading a specific data source.

    Parameters:
    config: config dictionary as defined in get_config.
    table_name: name of the data source to read.

    Returns:
    List of full path file names.

    """
  
    files_to_read = list_files(config['paths']['raw'][table_name], full_path=True)
    if len(files_to_read) == 1:
        files_to_read = files_to_read[0]
    return files_to_read

In [0]:
def read_and_clean_files(config, table_name, verbose = False):
  
    """

    Reads and cleans all files related to the specified data source

    Parameters:
    config: config dictionary as defined in get_config.
    table_name: name of the data source to read.

    Returns:
    Data Frame consisting of all files appropiately treated and concatenated.

    """
  
    file_names = get_files_to_read(config, table_name)
    output_df = pd.DataFrame({})
    termination = file_names.split('.')[-1]
    if (type(file_names) == str):
        if termination == 'csv':
            output_df = pd.read_csv(file_names, dtype = str)
        else:
            output_df = pd.read_excel(file_names)
        output_df = clean_df(output_df, config, table_name, check_column_existance = False, verbose = verbose)
    else:
        for i in file_names:
            print('Reading {} ...'.format(i))
            if termination == 'csv':
                temp_df = pd.read_csv(i, dtype = str)
            else:
                temp_df = pd.read_excel(i)
            print('Cleaning {} ...'.format(i))
            temp_df = clean_df(temp_df, config, table_name, check_column_existance = False, verbose = verbose)
            output_df = pd.concat([output_df, temp_df], sort = False)

    return output_df

In [0]:
def get_raw_df(config, table_name, verbose = False):
    
    """

    Wrapper for executing the get_raw function for the specified data set. The function name is read from the config dictionary.

    Parameters:
    config: config dictionary as defined in get_config.
    table_name: name of the data source to read.

    Returns:
    Data Frame consisting on the output of the get_raw function for the specified data set.

    """
    
    # make sure the table name exists in the config file
    assert table_name in config['data_params'].keys(), 'Unknown table passed'
    # extract function name
    fun_name = config['data_params'][table_name]['functions']['raw']
    # make sure the function is defined in the global environment
    assert fun_name in globals().keys(), 'The function specified in the config file is not defined'
    # exectute function to retrieve data
    df = globals()[fun_name](config, verbose = verbose)
    return df

In [0]:
def get_raw_tenant_history_data(config, verbose = False):
  
    """

    Reads and cleans tenant history data.

    Parameters:
    config: config dictionary as defined in get_config.
    verbose: flag indicating whether to print certain messages.

    Returns:
    Cleaned tenant history data.

    """
    
    raw_tenant_history = read_and_clean_files(config, 'tenant_history')
    date_cols = [x for x in raw_tenant_history.columns if x.startswith('dt')]
    for i in date_cols:
        raw_tenant_history[i] = pd.to_datetime(raw_tenant_history[i])
        raw_tenant_history[i] = pd.to_datetime(raw_tenant_history[i].dt.date)
    return raw_tenant_history

In [0]:
def get_raw_resi_detail_data(config, verbose = False):
  
    """

    Reads and cleans residential detail data.

    Parameters:
    config: config dictionary as defined in get_config.
    verbose: flag indicating whether to print certain messages.

    Returns:
    Cleaned residential detail data.

    """
  
  
    resi_det = read_and_clean_files(config, 'residential_detail')
    date_cols = [x for x in resi_det.columns if x.endswith('date')]
    for i in date_cols:
        resi_det[i] = pd.to_datetime(resi_det[i], errors = 'coerce')
        resi_det[i] = pd.to_datetime(resi_det[i].dt.date)
    return resi_det

In [0]:
def get_raw_unitxref_data(config, verbose = False):
  
  # function not used for the model

    unitxref = read_and_clean_files(config, 'unitxref')

    date_cols = [x for x in unitxref.columns if x.startswith('dt')]
    for i in date_cols:
        unitxref[i] = pd.to_datetime(unitxref[i])
        unitxref[i] = pd.to_datetime(unitxref[i].dt.date)

    return unitxref

In [0]:
def get_raw_prop_data(config, verbose = False):
  
    """

    Reads and cleans property data.

    Parameters:
    config: config dictionary as defined in get_config.
    verbose: flag indicating whether to print certain messages.

    Returns:
    Cleaned property data.

    """
    
    
    prop = read_and_clean_files(config, 'property')

    return prop


In [0]:
def get_raw_unit_data(config, verbose = False):
  
    """

    Reads and cleans unit data.

    Parameters:
    config: config dictionary as defined in get_config.
    verbose: flag indicating whether to print certain messages.

    Returns:
    Cleaned unit data.

    """
  
    unit = read_and_clean_files(config, 'unit')

    return unit

In [0]:
def get_raw_unit_type_data(config, verbose = False):
  
    """

    Reads and cleans unit type data.

    Parameters:
    config: config dictionary as defined in get_config.
    verbose: flag indicating whether to print certain messages.

    Returns:
    Cleaned unit type data.

    """
    
    unit_type = read_and_clean_files(config, 'unit_type')

    return unit_type    

In [0]:

def get_raw_commamendments_data(config, verbose = False):
  
  # function not used for the model
  
    amme = read_and_clean_files(config, 'commamendments')
    date_cols = [x for x in amme.columns if x.startswith('dt')]
    for i in date_cols:
        amme[i] = pd.to_datetime(amme[i])
        amme[i] = pd.to_datetime(amme[i].dt.date)
    return amme 

In [0]:
def get_raw_coords_data(config, verbose = False):
  
    """

    Reads and cleans coordinates data.

    Parameters:
    config: config dictionary as defined in get_config.
    verbose: flag indicating whether to print certain messages.

    Returns:
    Cleaned coordinates data.

    """
  
    coords = read_and_clean_files(config, 'coords')
    return coords    

In [0]:
def get_raw_sr_data(config, verbose = False):
  
    """

    Reads and cleans Service Requests data.

    Parameters:
    config: config dictionary as defined in get_config.
    verbose: flag indicating whether to print certain messages.

    Returns:
    Cleaned Service Requests data.

    """
  
    sr = read_and_clean_files(config, 'service_requests')
    date_cols = [x for x in sr.columns if x.startswith('dt')]
    for i in date_cols:
        sr[i] = pd.to_datetime(sr[i], errors = 'coerce')
        sr[i] = pd.to_datetime(sr[i].dt.date, errors = 'coerce')
    return sr

In [0]:
def get_ind_acc(config):
  
  """

  Gets array with individual account IDs (htenant).

  Parameters:
  config: config dictionary as defined in get_config.

  Returns:
  Array with individual account IDs (htenant).

  """

  resi_detail = get_raw_df(config, 'residential_detail')
  ind_acc = resi_detail.loc[resi_detail.type == 'individual', 'hcode'].values
  return ind_acc