In [None]:
import pandas as pd
import numpy as np

In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
def convert_str_to_int(x):
    try:
        return(int(float(x)))
    except:
        return(np.nan)

# Data

## Consultas 

In [None]:
consulta_train = pd.read_parquet('/opt/airflow/datasets/absenteeism/consulta_train.parquet')
consultamarcacao_train = pd.read_parquet('/opt/airflow/datasets/absenteeism/consultamarcacao_train.parquet')

## cod_secretariado
cod_sec = {'0': 'SECR. UNICO', '1': 'UNIDADE 1', '2':'UNIDADE 2', '3':'UNIDADE 3 / ESPINHO', '99': 'MULTIBANCO',
           '5': 'CRN', '4':'CUIDADOS CONTINUADOS', '98':'MEDSOFT GESTÃO ATEND'}

consultamarcacao_train['Secretariado']=consultamarcacao_train.COD_SECRETARIADO.astype('str').map(cod_sec)
consulta_train['EstadoMarcação']='Realizada'
consulta_train['ResponsávelDesmarcação']=None
consulta_train.HoraId = consulta_train.HoraId.astype('float')
consultamarcacao_train['HoraId'] = consultamarcacao_train['HoraConsulta'].str[:2].astype('float')

In [None]:
col_keep = ['NUM_SEQUENCIAL', 'EspecialidadeSKey', 'EstadoMarcação', 'DataMarcaçãoConsulta', 'DataConsulta',
       'HoraId', 'ResponsávelDesmarcação', 'TipoConsulta', 'TipoVaga', 'TipoAgenda',
           'Secretariado', 'UnidadeSaudeProvenienciaSKey']

In [None]:
data=pd.concat([consultamarcacao_train[col_keep], consulta_train[col_keep]]).reset_index(drop=True)

In [None]:
data.TipoVaga = np.where(data.TipoVaga=='Primeira', 'Primeira','Subsequente')
data.TipoAgenda = np.where(data.TipoAgenda.isin(['Especialidade', 'Sem Agendamento']), data.TipoAgenda, 'Medico')
data.UnidadeSaudeProvenienciaSKey=data.UnidadeSaudeProvenienciaSKey.astype('float')

In [None]:
data[data.UnidadeSaudeProvenienciaSKey.isna()].groupby('EstadoMarcação')['NUM_SEQUENCIAL'].count()

In [None]:
data.UnidadeSaudeProvenienciaSKey.isna().sum()

In [None]:
assert data.HoraId.nunique()<= 24 , 'Duplicated data ...'
assert data.TipoConsulta.nunique() == 4 , 'Duplicated data ...'
assert data.TipoVaga.nunique() == 2 , 'Duplicated data ...'
assert data.TipoAgenda.nunique() == 3 , 'Duplicated data ...'
assert data[data.Secretariado!='nan'].Secretariado.nunique() == 4 , 'Duplicated data ...'

In [None]:
data = data.drop_duplicates(keep='first').reset_index(drop=True)

In [None]:
data[['NUM_SEQUENCIAL','EspecialidadeSKey']] = data[['NUM_SEQUENCIAL','EspecialidadeSKey']].astype('float').astype('int')

In [None]:
especialidade = pd.read_csv('/opt/airflow/datasets/absenteeism/Especialidade.txt', sep=',', encoding="ISO-8859-1", on_bad_lines= "skip")
especialidade.columns = ['EspecialidadeSKey', 'GrupoEspecialidadeSKey', 'CodEspecialidade',
       'Código Especialidade', 'Especialidade', 'CodGrupoEspecialidade',
       'GrupoEspecialidade', 'Especialidade Módulo', 'CodECLIN', 'Estado']
especialidade.EspecialidadeSKey=especialidade.EspecialidadeSKey.astype('float').astype('int')

In [None]:
data = data.merge(especialidade[['EspecialidadeSKey','Especialidade','GrupoEspecialidade']], 
                                       on='EspecialidadeSKey', 
                                       how='inner')\
                                .drop(columns=['EspecialidadeSKey'])

In [None]:
del especialidade

In [None]:
# U - urgencia diferida, C - Centro Saúde, O - Outros, H - hospital
unidadesaudeprov = pd.read_csv('/opt/airflow/datasets/absenteeism/unisau.txt', sep='|', encoding="ISO-8859-1")
unidadesaudeprov.UnidadeSaudeProvenienciaSKey = unidadesaudeprov.UnidadeSaudeProvenienciaSKey.astype('float').astype('int')
data = data.merge(unidadesaudeprov[['UnidadeSaudeProvenienciaSKey', 'Unidade Proveniência', 'Tipo Unidade Proveniência']], on = 'UnidadeSaudeProvenienciaSKey', how='left').drop(columns=['UnidadeSaudeProvenienciaSKey'])

In [None]:
del unidadesaudeprov

In [None]:
define_grupo_esp = ['Pediatria', 'Cirurgia', 'Cirurgia Pediatrica', 'Cirurgia Vascular', 'Ortopedia',  'Dermatologia',
                   'Oftalmologia', 'Otorrino', 'Peneumologia', 'Neurologia']

In [None]:
data.DataConsulta=pd.to_datetime(data.DataConsulta)
data.DataMarcaçãoConsulta=pd.to_datetime(data.DataMarcaçãoConsulta)

### Foco da classificação
Consultas médicas, primeiras, vindos dos centros de saúde

In [None]:
label_data = data[data.EstadoMarcação.isin(['Realizada','Não efetivada e Não Desmarcada']) 
                  & (data.TipoConsulta=='Consulta Médica') 
                  & (data.GrupoEspecialidade.isin(define_grupo_esp))].drop_duplicates( \
        subset=['NUM_SEQUENCIAL', 'GrupoEspecialidade', 'DataConsulta'], keep='first').reset_index(drop=True)

In [None]:
def get_time_info(historica_general_data):
    historica_general_data.loc[:,'Diff_marcacao_consulta'] = (historica_general_data.loc[:,'DataConsulta']-historica_general_data.loc[:,'DataMarcaçãoConsulta']).dt.days
    historica_general_data.loc[:, 'Diff_marcacao_consulta'] = np.where(historica_general_data.loc[:, 'Diff_marcacao_consulta']<0,0,historica_general_data.loc[:,'Diff_marcacao_consulta'])
    historica_general_data.loc[:,'dia_consulta'] = historica_general_data.loc[:,'DataConsulta'].dt.day_name()
    historica_general_data.loc[:,'month_consulta'] = historica_general_data.loc[:, 'DataConsulta'].dt.month
    historica_general_data['Diff_marcacao_consulta'] = (historica_general_data.DataConsulta-historica_general_data.DataMarcaçãoConsulta).dt.days
    historica_general_data.Diff_marcacao_consulta = np.where(historica_general_data.Diff_marcacao_consulta<0,0,historica_general_data.Diff_marcacao_consulta)
    
    return(historica_general_data)

In [None]:
def get_historic_info(historica_data, current_date, extra=''):
    instituicao = 1 if historica_data[historica_data.ResponsávelDesmarcação=='Instituição'].shape[0]>=1 else 0
    n_esp = historica_data.Especialidade.nunique()
    median_diff_dias_marcacao_consulta = historica_data[~historica_data.DataMarcaçãoConsulta.isna()].Diff_marcacao_consulta.median()
    max_diff_dias_marcacao_consulta = historica_data[~historica_data.DataMarcaçãoConsulta.isna()].Diff_marcacao_consulta.max()
    min_diff_dias_marcacao_consulta = historica_data[~historica_data.DataMarcaçãoConsulta.isna()].Diff_marcacao_consulta.min()
    primeira_interecao = (current_date-historica_data.DataConsulta.min()).days
    last_interecao = (current_date-historica_data.DataConsulta.max()).days
    last_miss = 1 if (historica_data[historica_data.DataConsulta==historica_data.DataConsulta.max()].EstadoMarcação=='Não efetivada e Não Desmarcada').sum()>=1 else 0
    last_realizada = 1 if (historica_data[historica_data.DataConsulta==historica_data.DataConsulta.max()].EstadoMarcação=='Realizada').sum()>=1 else 0

    define_grupo_esp = pd.pivot_table(historica_data.groupby(['EstadoMarcação'], as_index=False)['TipoVaga'].count(),
                   values='TipoVaga', 
                   columns=['EstadoMarcação'], aggfunc=np.sum)
    define_grupo_esp = pd.DataFrame(define_grupo_esp.to_records()).fillna(0)
    for i in ['Realizada', 'Não efetivada e Não Desmarcada']:
        if i not in define_grupo_esp.columns.tolist():
            define_grupo_esp[i] = 0
    define_grupo_esp.loc[:,'Não efetivada e Não Desmarcada']=np.where((define_grupo_esp['Não efetivada e Não Desmarcada']==0)
                                                                       & (define_grupo_esp.Realizada==0),0.00001,define_grupo_esp['Não efetivada e Não Desmarcada'])
    define_grupo_esp['assiduidade'] = define_grupo_esp.Realizada/(define_grupo_esp.Realizada+define_grupo_esp['Não efetivada e Não Desmarcada'])
    
    return({f'{extra}assiduidade':define_grupo_esp.assiduidade.item(), f'{extra}instituicao': instituicao, 
            f'{extra}num_especialidade': n_esp, f'{extra}median_diff_dias_marcacao_consulta':median_diff_dias_marcacao_consulta,
            f'{extra}max_diff_dias_marcacao_consulta':max_diff_dias_marcacao_consulta,
            f'{extra}min_diff_dias_marcacao_consulta': min_diff_dias_marcacao_consulta,
            f'{extra}primeira_interecao_dias': primeira_interecao, f'{extra}last_interecao_dias': last_interecao, 
            f'{extra}last_miss': last_miss, f'{extra}last_realizada': last_realizada})

In [None]:
def run(irow, data):
    historic_general_data = data[(data.NUM_SEQUENCIAL==irow.NUM_SEQUENCIAL) 
               & (data.DataConsulta<irow.DataConsulta)]
    historic_same_sp_data = data[(data.NUM_SEQUENCIAL==irow.NUM_SEQUENCIAL) 
               & (data.DataConsulta<irow.DataConsulta)
               & (data.GrupoEspecialidade==irow.GrupoEspecialidade)]

    if len(historic_same_sp_data)>0:
        historic_same_sp_data = get_time_info(historic_same_sp_data)
        historic_sp_info = get_historic_info(historic_same_sp_data, irow.DataConsulta, extra='sp_')
    else:
        for ic in ['Diff_marcacao_consulta', 'dia_consulta', 'month_consulta']:            
            historic_same_sp_data[ic] = None
            historic_sp_info = {'sp_assiduidade': 0, 'sp_instituicao': 0, 'sp_num_especialidade': 0, 'sp_median_diff_dias_marcacao_consulta': 0,
             'sp_max_diff_dias_marcacao_consulta': 0, 'sp_min_diff_dias_marcacao_consulta': 0,
             'sp_primeira_interecao_dias': 0, 'sp_last_interecao_dias': 0, 'sp_last_miss': False, 'sp_last_realizada': False}

    if len(historic_general_data)>0:
        historic_general_data = get_time_info(historic_general_data)
        historic_info = get_historic_info(historic_general_data, irow.DataConsulta)
    else:
        for ic in ['Diff_marcacao_consulta', 'dia_consulta', 'month_consulta']:            
            historic_general_data[ic] = None
        historic_info = {'assiduidade': 0, 'instituicao': 0, 'num_especialidade': 0, 'median_diff_dias_marcacao_consulta': 0,
         'max_diff_dias_marcacao_consulta': 0, 'min_diff_dias_marcacao_consulta': 0,
         'primeira_interecao_dias': 0, 'last_interecao_dias': 0, 'last_miss': False, 'last_realizada': False}

    info_appoitment_label = pd.DataFrame(irow[['NUM_SEQUENCIAL','EstadoMarcação', 'DataMarcaçãoConsulta', 'DataConsulta', 'HoraId', 'TipoAgenda', 'Secretariado',
               'GrupoEspecialidade', 'Especialidade', 'Unidade Proveniência', 'Tipo Unidade Proveniência','TipoVaga']]).T.reset_index(drop=True)
    info_appoitment_label.loc[:,'Diff_marcacao_consulta'] = (info_appoitment_label.loc[:,'DataConsulta']-info_appoitment_label.loc[:,'DataMarcaçãoConsulta']).dt.days
    info_appoitment_label=info_appoitment_label.drop(columns=['DataMarcaçãoConsulta'])
    info_appoitment_label.columns = ['NUM_SEQUENCIAL','Label', 'DataConsulta', 'Hora', 'TipoAgenda', 'Secretariado', 'GrupoEspecialidade', 'Especialidade', 'Unidade Proveniência', 'Tipo Unidade Proveniência', 'TipoVaga', 'Diff_marcacao_consulta']
    info_appoitment_label.loc[:,'dia_consulta'] = info_appoitment_label.loc[:,'DataConsulta'].dt.day_name()
    info_appoitment_label.loc[:,'month_consulta'] = info_appoitment_label.loc[:, 'DataConsulta'].dt.month

    info_appoitment_label = info_appoitment_label.merge(pd.DataFrame(historic_info, index=[0]), left_index=True, right_index=True) \
                                .merge(pd.DataFrame(historic_sp_info, index=[0]), left_index=True, right_index=True)
    return(info_appoitment_label)

In [None]:
import multiprocessing as mp

# data_subset = label_data[label_data.DataConsulta >= '2022-11-30']
data_subset = label_data[:100]


pool = mp.Pool(mp.cpu_count()-1)
info_all = [pool.apply(run, args=(doc_data, data[(data.NUM_SEQUENCIAL==doc_data.NUM_SEQUENCIAL)])) \
             for j, doc_data in data_subset.iterrows()]
                    

~~~ Python
import multiprocessing as mp

pool = mp.Pool(mp.cpu_count()-1)
info_all = [pool.apply(run, args=(doc_data, data[(data.NUM_SEQUENCIAL==doc_data.NUM_SEQUENCIAL)])) \
            for j, doc_data in label_data.iterrows()]
~~~             

In [None]:
# pd.concat(info_all).to_parquet('/opt/airflow/datasets/absenteeism/classificacao_1st_especialidade_unidadeprov_all.parquet', compression='gzip')
pd.concat(info_all).to_parquet('/opt/airflow/datasets/absenteeism/classificacao_1st_especialidade_unidadeprov.parquet', compression='gzip')