In [1]:
import re
import csv
import json
import bisect
import logging
import traceback
import numpy as np
import pandas as pd

import orjson

#import dask.dataframe as dd
#from dask.distributed import Client

In [2]:
logging.basicConfig(
    filename='../logs/app.log',
    filemode='w',
    format='%(asctime)s - %(levelname)s - %(message)s')

logger = logging.getLogger('data_analysis')

### Load

In [3]:
path = '../data/order_table_202208191534.csv'
df = pd.read_csv(path, sep=';',
    parse_dates=['created_at_irpf', 'created_at_loan', 'time_stamp'],
    dtype={'bank_code_pl': str,
            'branch_number_pl': str,
            'loan_id': str})

In [4]:
presumed_income_dict = json.load(open('../config/presumed_income_dict.json', "r"))

In [5]:
path = '../data/Dicionario_Grafia_Banco_SRF-v14.sav'
bank_df = pd.read_spss(path)
bank_df = bank_df.rename(columns={
    'BankName': 'bank',
    'Codigo_Banco': 'bank_code'})

In [6]:
path = '../data/bank_branch.parquet'
branch_df = pd.read_parquet(path)
branch_df = branch_df.rename(columns={
    'Bank': 'bank_code',
    'Branch': 'branch'})

In [7]:
df.head()

Unnamed: 0,person_id,loan_id,irpf_id,time_stamp,created_at_irpf,created_at_loan,safra_created,product_code,state,bank_code_pl,branch_number_pl,rev,value,ordem
0,2422669,12819402.0,32892967,2022-03-31 12:22:41.446,2022-03-31,2022-03-31,202203,PERSONAL,3.0,237.0,4.0,10,"{""rev"":10,""objType"":""IrpfPersonInfoT"",""personI...",1
1,14420823,,32887005,2022-03-28 16:10:33.509,2022-03-28,NaT,202203,,,,,8,"{""rev"":8,""objType"":""IrpfPersonInfoT"",""personId...",1
2,3653088,12833729.0,32931346,2022-04-17 11:23:49.224,2022-04-17,2022-04-17,202204,PERSONAL,3.0,341.0,,6,"{""rev"":6,""objType"":""IrpfPersonInfoT"",""personId...",1
3,14927146,12853159.0,33011755,2022-05-16 09:50:04.143,2022-05-16,2022-05-16,202205,PERSONAL,3.0,33.0,,4,"{""rev"":4,""objType"":""IrpfPersonInfoT"",""personId...",1
4,15333793,12849754.0,32995607,2022-05-12 09:48:12.840,2022-05-12,2022-05-12,202205,PERSONAL_GERU,28.0,260.0,1.0,10,"{""rev"":10,""objType"":""IrpfPersonInfoT"",""personI...",1


### Transformation

In [8]:
### general json and dict functions

def get_json_value(df: pd.DataFrame, col: str):
    '''Takes a pandas dataframe and a string column-name.
    Extracts json object from specified column in dataframe.
    Returns original dataframe joined with normalized json as columns.'''

    try:
        df = df.copy()
    except Exception as e:
        logger.debug(str(e))
        raise(e)

    try:
        data = pd.json_normalize(
            df[col].apply(
                orjson.loads), max_level=0)
    except KeyError as e:
        logger.debug(str(e))
        return df
    else:
        col_lst = data.columns.difference(df.columns)
        return df.join(data[col_lst])

def extract_value_dict(data: dict, key: str, default=np.nan):
    '''Function receives dictionary with key string
    and returns value. If default is provided, returns
    default value when key does not exist, otherwise returns nan.'''

    try:
        status = data.get(key, default)
    except AttributeError as e:
        logger.debug(str(e))
        raise(e)

    return status

def map_normalize_dict(df: pd.DataFrame, col: str, map:dict):
    '''Receives Pandas dataframe, column name and
    dictionary containing new column names as keys and dict
    keys as values. Normalizes dict column in dataframe and returns
    original dataframe with new columns.'''

    df = df.copy()

    for new_col_name, dict_key in map.items():
        df.loc[:, new_col_name] = df[col].apply(
            lambda x: extract_value_dict(
                data=x, key=dict_key))
                
    return df.fillna(value=np.nan)

In [9]:
### general array fuctions

def find_le(ls, x):
    '''Receives ordered list a and element x.
    Returns rightmost value less than or equal to x.'''

    i = bisect.bisect_right(ls, x)
    if i:
        return ls[i-1]
    raise ValueError

In [10]:
### text functions

def apply_regex_series(series: pd.Series, regex: re.Pattern, handle_nan=True):
    '''Receives Pandas Series and regex and returns a numpy array containing
    1 for every match and 0 for no match. Use handle_nan parameter if you want
    to return 0 when value is nan, otherwise nan is passed to regex.'''

    if handle_nan:
        return (np.where(
            series.str.contains(regex) & series.notna(), 1, 0))
    else:
        return (np.where(
            series.str.contains(regex), 1, 0))

In [11]:
### irpf functions

def explode_dict_col(df: pd.DataFrame, dict_col='riskInfo'):
    '''Explodes risk_dict where each row is a
    tax report year.'''

    df = df.copy()

    df.loc[:, 'tax_report_data'] = (
        df[dict_col].apply(
            lambda x: x.values()))

    df = df.apply(pd.Series.explode).reset_index(drop=True).copy()

    return df

def get_irpf_status(df: pd.DataFrame, text_col: str):
    '''Receives pandas dataframe and column name and applies
    regex to column to generate new columns representing status
    of irpf application. Returns dataframe with new columns.'''

    df = df.copy()

    regex_not_consulted = re.compile(
        r'(?:^\s*$|\bdata\sde\snascimento\sinformada\b'
        r'.*\bestá\sdive|\bnão\scoletado'
        r'|\bocorreu\suma\sinconsistência\s?[.])'
        , re.IGNORECASE)

    regex_not_declared = re.compile(
        r'(?:\bconsta\sapresentação\sde\sdeclaração\sanual'
        r'\sde\sisento\b|\bapresentação\sda\sdeclaração\s'
        r'como\sisento\b|\bdeclaração\sconsta\scomo\sisento\b'
        r'|\bdeclaração\sconsta\scomo\spedido\sde'
        r'\sregularização\b|\bsua\sdeclaração\snão\sconsta'
        r'\sna\sbase\sde\sdados\b|\bainda\snão\sestá\sna'
        r'\sbase\b)', re.IGNORECASE)

    regex_tax_refund = re.compile(
        r'(?:\bsituação\sda\srestituição[:]\screditada\b'
        r'|\bsomente\sserá\spermitida\spor\smeio\sdo\scódigo\sde\sacesso\b'
        r'|\baguardando\sreagendamento\spelo\scontribuinte[.]?'
        r'|\bdevolvida\sà\sreceita\sfederal[,]?\sem\srazão\sdo\snão\sresgate\b'
        r'|\benviada\spara\scrédito\sno\sbanco\b'
        r'|\breagendada\spara\scrédito\sno\sbanco\b'
        r'|\bdados\sda\sliberação\sde\ssua\srestituição\b'
        r'|\bdeclaração\sestá\sna\sbase\sde\sdados\b'
        r'|\bestá\sna\sbase[,]\sutilize\so\sextrato\b'
        r'|\bdeclaração\sjá\sfoi\sprocessada[.]?$'
        r'|\brestituição[:]\saguardando\sdevolução\spelo\sbanco\b)'
        , re.IGNORECASE)
    
    col_list = ['irpf_extraction_error', 'irpf_not_declared', 'irpf_tax_refund']

    df.loc[:, 'irpf_extraction_error'] = apply_regex_series(
        df[text_col], regex_not_consulted, handle_nan=False)
    df.loc[:, 'irpf_not_declared'] = apply_regex_series(
        df[text_col], regex_not_declared)
    df.loc[:, 'irpf_tax_refund'] = apply_regex_series(
        df[text_col], regex_tax_refund)

    df.loc[:, 'irpf_tax_to_pay'] = df[col_list].apply(
        lambda x: 1 not in x.values, axis=1).astype(int)

    return df

def retrieve_stars(y: int, x: int, star_arr: np.array):
    try:
        if y >= 16:
            stars = 5
        else:
            stars = star_arr[y][x]
    except IndexError:
        return -1
    else:
        return stars

def set_star_number(arr_declarations: np.array, arr_refunds: np.array):
    base_arr = [
        [0],
        [1, 1],
        [1, 1, 1],
        [1, 1, 1, 1],
        [1, 1, 1, 2, 2],
        [1, 1, 2, 2, 3, 3],
        [1, 2, 2, 3, 3, 4, 4],
        [2, 2, 3, 3, 4, 4, 4, 5],
        [2, 3, 3, 4, 4, 4, 5, 5, 5],
        [2, 3, 4, 4, 4, 5, 5, 5, 5, 5],
        [3, 4, 4, 4, 5, 5, 5, 5, 5, 5, 5],
        [3, 4, 4, 5, 5, 5, 5, 5, 5, 5, 5, 5],
        [3, 4, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5],
        [4, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5],
        [4, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5],
        [4, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5]]

    max_len = np.array([len(arr) for arr in base_arr]).max()

    default_value = -1

    star_array = [
        np.pad(arr, (0, max_len - len(arr)),
        mode='constant',
        constant_values=default_value) for arr in base_arr]

    #ret_func = np.vectorize(retrieve_stars)
    #return ret_func(arr_declarations, arr_refunds, star_array)
    return np.array([retrieve_stars(y, x, star_array)
                        for y, x in zip(arr_declarations, arr_refunds)])

In [12]:
df['created_at_irpf'] = df['time_stamp']

In [13]:
cols = ['person_id', 'loan_id', 'irpf_id',
        'created_at_irpf', 'product_code',
        'state', 'rev', 'riskInfo', 'bank_code_pl', 'branch_number_pl']

col_key_map = {
    'cpf': 'cpf',
    'full_status_text': 'full_status_text',
    'bank': 'bank',
    'branch': 'branch'}

bank_df = bank_df.fillna('###')

df['bank_code_pl'] = df['bank_code_pl'].str.zfill(3)
df['branch_number_pl'] = df['bank_code_pl'].str[:4].str.zfill(4)

df = (
    df.pipe(get_json_value, 'value')[cols]
    .pipe(explode_dict_col)
    .pipe(map_normalize_dict, 'tax_report_data', col_key_map)
    .pipe(get_irpf_status, 'full_status_text')
    ).rename(columns={'riskInfo': 'year'})

df = df.merge(bank_df, on='bank', how='left')
df = df.merge(branch_df[['bank_code', 'branch', 'branch_code']],
                on=['bank_code', 'branch'], how='left')
df = df.merge(
    branch_df[['bank_code',
                'branch',
                'branch_code']].rename(
                    columns={'bank_code': 'bank_code_pl',
                            'branch': 'branch_number_pl',
                            'branch_code': 'branch_code_pl'}),
    on=['bank_code_pl', 'branch_number_pl'], how='left')

In [14]:
gp_estr = df.groupby(['cpf', 'created_at_irpf']).agg(
                number_declaration=('tax_report_data', 'count'),
                number_tax_refund=('irpf_tax_refund', 'sum')
                ).reset_index()
        
gp_estr['ESTR'] = set_star_number(gp_estr.number_declaration.values, gp_estr.number_tax_refund.values)

In [15]:
branch_codes = ['PERS', 'STIL', 'PRIM', 'OUTR', 'HSBC', 'VANG', 'UNIC', 'ESPA', 'PRIV']

dtypes = {
    'branch_code': pd.CategoricalDtype(categories=branch_codes)
}

gp_branch = pd.get_dummies(df.astype(dtypes),
                columns=['branch_code'],
                prefix='',
                prefix_sep='').groupby(
                    ['cpf', 'created_at_irpf']
                    )[branch_codes].sum().reset_index()

gp_estr['year'] = gp_estr.created_at_irpf.dt.year

In [16]:
gp_estr = gp_estr.merge(
        gp_branch,
        on=['cpf', 'created_at_irpf'], how='left')
del gp_branch

In [17]:
gp_estr = gp_estr.merge(
                df[['cpf', 'created_at_irpf', 'branch_code_pl']].drop_duplicates(keep='first'),
                on=['cpf', 'created_at_irpf'], how='left')

In [27]:
def get_presumed_income(year: int,
                        irpf_dict: dict,
                        branch_pl: str,
                        star_dict: dict,
                        year_list: list):

    year_d = str(find_le(year_list, year))
    
    presumed_income_set = set()

    for key, value in irpf_dict.items():
        presumed_income_set.add(
            star_dict.get(year_d)
            .get(key)
            .get(str(value))
        )

    if int(irpf_dict.get('ESTR')) > 0:
        declared_branch_incm = (star_dict.get(year_d, {})
                            .get(branch_pl, {})
                            .get('1', 0))

        presumed_income_set.add(
            declared_branch_incm
        )

    return max(presumed_income_set)

def calculate_presumed_income(df: pd.DataFrame, income_dict):

    year_list = [int(x) for x in list(income_dict.keys())]
    year_list.sort()

    branch_codes = ['year', 'ESTR', 'PERS', 'STIL',
                'PRIM', 'OUTR', 'HSBC', 'VANG',
                'UNIC', 'ESPA', 'PRIV', 'branch_code_pl']

    return df.loc[:, branch_codes].apply(
            lambda x: get_presumed_income(
                x[0],
                {'ESTR': str(x[1]),
                'PERS': str(x[2]),
                'STIL': str(x[3]),
                'PRIM': str(x[4]),
                'OUTR': str(x[5]),
                'HSBC': str(x[6]),
                'VANG': str(x[7]),
                'UNIC': str(x[8]),
                'ESPA': str(x[9]),
                'PRIV': str(x[10])},
                x[11],
                income_dict,
                year_list
            ), axis=1, raw=False)

In [22]:
gp_estr.loc[23:25]

Unnamed: 0,cpf,created_at_irpf,number_declaration,number_tax_refund,ESTR,year,PERS,STIL,PRIM,OUTR,HSBC,VANG,UNIC,ESPA,PRIV,branch_code_pl
23,10019576790,2022-05-16 14:36:19.415,10,8,5,2022,0,0,0,0,0,0,8,0,0,
24,10020585977,2022-04-08 08:35:04.184,8,0,2,2022,0,0,0,0,0,0,0,0,0,
25,10020601760,2022-04-06 22:35:59.524,6,1,2,2022,0,0,0,0,0,0,0,0,1,ESPA


In [26]:
calculate_presumed_income(gp_estr.loc[23:25], presumed_income_dict)

TypeError: '>' not supported between instances of 'NoneType' and 'int'

In [None]:
### max(ESTR.get(value), max(Agencias IRPF), max (agencia declarada))




# Se ESTR > 0: dict.get('branch_code').get('1')
# else: dict.get('ESTR').get('0')