In [0]:
from pyspark.sql.types import IntegerType, StringType, DecimalType, LongType
from pyspark.sql.functions import col, regexp_replace, when, upper, to_date, lit

In [0]:
def get_folders(category, layer):
    try:
        existing_folders = [folder.name.rstrip("/") for folder in dbutils.fs.ls(f"/{layer}/SIAPE/{category}")]
        return existing_folders
    except Exception as e:
        # Caso nao haja nenhuma pasta
        return []


# Remove colunas
def drop_columns(df, cols):
    return df.drop(*cols)


# Renomeia colunas
def rename_columns(df, names):
    for old, new in names.items():
        df = df.withColumnRenamed(old, new)
    return df


def replace_string(df, cols, str1, str2):
    for col in cols:
        df = df.withColumn(col, regexp_replace(df[col], str1, str2))
    return df


# Alteracao do tipo de dado
def cast_data(df, cols, to_type):
    for column in cols:
        df = df.withColumn(column, col(column).cast(to_type))
    return df


def strings_to_upper (df, cols):
    for column in cols:
        df = df.withColumn(column, upper(col(column)))
    return df


def transform_invalid_values_to_null(df, values_to_clean):
    for col, invalid_values in values_to_clean.items():        
        for invalid_value in invalid_values:    
            df = df.withColumn(col, when(df[col] == invalid_value, None).otherwise(df[col]))    
    return df


def cast_to_date(df, cols):
    for column in cols:        
        df = df.withColumn(column, to_date(df[column], "dd/MM/yyyy"))
    return df


def get_columns_by_schema(df, data_type):
    return [field.name for field in df.schema.fields if field.dataType == data_type]


def save_delta(df, category, subfolder):
    delta_path = f"/silver/SIAPE/{category}/{subfolder}"

    # Escreve o df particionado por 'ano' e 'mes'
    df.write.format("delta").partitionBy("ano_mes").mode("overwrite").save(delta_path)

   

In [0]:
categories = ["Servidores", "Aposentados", "Pensionistas"]

cols_to_rename_remuneracao = {   
    "Id_SERVIDOR_PORTAL": "identificador_servidor",
    "CPF": "numero_cpf",
    "NOME": "nome_servidor",
    "REMUNERAÇÃO BÁSICA BRUTA (R$)": "valor_remuneracao_basica_bruta",
    "ABATE-TETO (R$)": "valor_deducao_abate_teto",
    "GRATIFICAÇÃO NATALINA (R$)": "valor_gratificacao_natalina",
    "ABATE-TETO DA GRATIFICAÇÃO NATALINA (R$)": "valor_deducao_abate_teto_gratificacao_natalina",
    "FÉRIAS (R$)": "valor_adicional_ferias",
    "OUTRAS REMUNERAÇÕES EVENTUAIS (R$)": "valor_remuneracao_eventual",
    "IRRF (R$)": "valor_deducao_irrf",
    "PSS/RPGS (R$)": "valor_deducao_contribuicao_previdenciaria",
    "DEMAIS DEDUÇÕES (R$)": "valor_deducao_adiantamento",
    "PENSÃO MILITAR (R$)": "valor_deducao_pensao_militar",
    "FUNDO DE SAÚDE (R$)": "valor_deducao_fundo_saude",
    "TAXA DE OCUPAÇÃO IMÓVEL FUNCIONAL (R$)": "valor_deducao_taxa_ocupacao_imovel_funcional",
    "REMUNERAÇÃO APÓS DEDUÇÕES OBRIGATÓRIAS (R$)": "valor_remuneracao_apos_deducao_obrigatoria",
    "VERBAS INDENIZATÓRIAS REGISTRADAS EM SISTEMAS DE PESSOAL - CIVIL (R$)(*)": "valor_verba_indenizatoria_rh_civil",
    "VERBAS INDENIZATÓRIAS REGISTRADAS EM SISTEMAS DE PESSOAL - MILITAR (R$)(*)": "valor_verba_indenizatoria_rh_militar",
    "VERBAS INDENIZATÓRIAS PROGRAMA DESLIGAMENTO VOLUNTÁRIO – MP 792/2017 (R$)": "valor_verba_indenizatoria_desligamento_voluntario",
    "TOTAL DE VERBAS INDENIZATÓRIAS (R$)(*)": "valor_total_verba_indenizatoria"
}

cols_to_int_remuneracao = ['ano_mes', 'identificador_servidor']


cols_to_drop_cadastro = {
    "Servidores" : ["DATA_NOMEACAO_CARGOFUNCAO", "DIPLOMA_INGRESSO_CARGOFUNCAO", "REFERENCIA_CARGO"],
    "Aposentados" : ["DATA_NOMEACAO_CARGOFUNCAO", "DIPLOMA_INGRESSO_CARGOFUNCAO"],
    "Pensionistas" : ["DATA_NOMEACAO_CARGOFUNCAO_INSTITUIDOR_PENSAO", "DIPLOMA_INGRESSO_CARGOFUNCAO_INSTITUIDOR_PENSAO"]
}


cols_to_rename_cadastro_servidor = {
    "Id_SERVIDOR_PORTAL" : "identificador_pessoa",
    "NOME" : "nome_pessoa",
    "CPF" : "numero_cpf",
    "MATRICULA" : "numero_matricula",
    "DESCRICAO_CARGO" : "nome_cargo",
    "CLASSE_CARGO" : "codigo_classe_cargo",
    "PADRAO_CARGO" : "codigo_padrao_cargo",
    "NIVEL_CARGO" : "codigo_nivel_cargo",
    "SIGLA_FUNCAO" : "codigo_sigla_funcao_comissionada",
    "NIVEL_FUNCAO" : "codigo_nivel_funcao_comissionada",
    "FUNCAO" : "nome_funcao_comissionada",
    "CODIGO_ATIVIDADE" : "codigo_atividade_comissionada",
    "ATIVIDADE" : "nome_atividade_comissionada",
    "OPCAO_PARCIAL" : "codigo_opcao_parcial",
    "COD_UORG_LOTACAO" : "codigo_unidade_organizacional_lotacao",
    "UORG_LOTACAO" : "nome_unidade_organizacional_lotacao",
    "COD_ORG_LOTACAO" : "codigo_orgao_lotacao",
    "ORG_LOTACAO" : "nome_orgao_lotacao",
    "COD_ORGSUP_LOTACAO" : "codigo_orgao_superior_lotacao",
    "ORGSUP_LOTACAO" : "nome_orgao_superior_lotacao",
    "COD_UORG_EXERCICIO" : "codigo_unidade_organizacional_exercicio",
    "UORG_EXERCICIO" : "nome_unidade_organizacional_exercicio",
    "COD_ORG_EXERCICIO" : "codigo_orgao_exercicio",
    "ORG_EXERCICIO" : "nome_orgao_exercicio",
    "COD_ORGSUP_EXERCICIO" : "codigo_orgao_superior_exercicio",
    "ORGSUP_EXERCICIO" : "nome_orgao_superior_exercicio",
    "COD_TIPO_VINCULO" : "codigo_tipo_vinculo",
    "TIPO_VINCULO" : "nome_tipo_vinculo",
    "SITUACAO_VINCULO" : "nome_situacao_vinculo",
    "DATA_INICIO_AFASTAMENTO" : "data_inicio_afastamento",
    "DATA_TERMINO_AFASTAMENTO" : "data_termino_afastamento",
    "REGIME_JURIDICO" : "nome_regime_juridico",
    "JORNADA_DE_TRABALHO" : "nome_jornada_trabalho",
    "DATA_INGRESSO_CARGOFUNCAO" : "data_ingresso_cargo_funcao",
    "DATA_INGRESSO_ORGAO" : "data_ingresso_orgao",
    "DOCUMENTO_INGRESSO_SERVICOPUBLICO" : "nome_documento_ingresso_servico_publico",
    "DATA_DIPLOMA_INGRESSO_SERVICOPUBLICO" : "data_diploma_ingresso_servico_publico",
    "DIPLOMA_INGRESSO_ORGAO" : "codigo_diploma_ingresso_orgao",
    "DIPLOMA_INGRESSO_SERVICOPUBLICO" : "codigo_diploma_ingresso_servico_publico",
    "UF_EXERCICIO" : "sigla_unidade_federacao_exercicio"
}

cols_to_rename_cadastro_aposentado = {
    "Id_SERVIDOR_PORTAL" : "identificador_pessoa",
    "NOME" : "nome_pessoa",
    "CPF" : "numero_cpf",
    "MATRICULA" : "numero_matricula",
    "COD_TIPO_APOSENTADORIA" : "codigo_tipo_aposentadoria",
    "TIPO_APOSENTADORIA" : "nome_tipo_aposentadoria",
    "DATA_APOSENTADORIA" : "data_aposentadoria",
    "DESCRICAO_CARGO" : "nome_cargo",
    "COD_UORG_LOTACAO" : "codigo_unidade_organizacional_lotacao",
    "UORG_LOTACAO" : "nome_unidade_organizacional_lotacao",
    "COD_ORG_LOTACAO" : "codigo_orgao_lotacao",
    "ORG_LOTACAO" : "nome_orgao_lotacao",
    "COD_ORGSUP_LOTACAO" : "codigo_orgao_superior_lotacao",
    "ORGSUP_LOTACAO" : "nome_orgao_superior_lotacao",
    "COD_TIPO_VINCULO" : "codigo_tipo_vinculo",
    "TIPO_VINCULO" : "nome_tipo_vinculo",
    "SITUACAO_VINCULO" : "nome_situacao_vinculo",
    "REGIME_JURIDICO" : "nome_regime_juridico",
    "JORNADA_DE_TRABALHO" : "nome_jornada_trabalho",
    "DATA_INGRESSO_CARGOFUNCAO" : "data_ingresso_cargo_funcao",
    "DATA_INGRESSO_ORGAO" : "data_ingresso_orgao",
    "DOCUMENTO_INGRESSO_SERVICOPUBLICO" : "nome_documento_ingresso_servico_publico",
    "DATA_DIPLOMA_INGRESSO_SERVICOPUBLICO" : "data_diploma_ingresso_servico_publico",
    "DIPLOMA_INGRESSO_ORGAO" : "codigo_diploma_ingresso_orgao",
    "DIPLOMA_INGRESSO_SERVICOPUBLICO" : "codigo_diploma_ingresso_servico_publico"
}

cols_to_rename_cadastro_pensionista = {
    "Id_SERVIDOR_PORTAL" : "identificador_pessoa",
    "NOME" : "nome_pessoa",
    "CPF" : "numero_cpf",
    "MATRICULA" : "numero_matricula",
    "CPF_REPRESENTANTE_LEGAL" : "numero_cpf_representante_legal",
    "NOME_REPRESENTANTE_LEGAL" : "nome_representante_legal",
    "CPF_INSTITUIDOR_PENSAO" : "numero_cpf_instituidor_pensao",
    "NOME_INSTITUIDOR_PENSAO" : "nome_instituidor_pensao",
    "COD_TIPO_PENSAO" : "codigo_tipo_pensao",
    "TIPO_PENSAO" : "nome_tipo_pensao",
    "DATA_INICIO_PENSAO" : "data_inicio_pensao",
    "DESCRICAO_CARGO_INSTITUIDOR_PENSAO" : "nome_cargo_instituidor_pensao",
    "COD_UORG_LOTACAO_INSTITUIDOR_PENSAO" : "codigo_unidade_organizacional_lotacao_instituidor_pensao",
    "UORG_LOTACAO_INSTITUIDOR_PENSAO" : "nome_unidade_organizacional_lotacao_instituidor_pensao",
    "COD_ORG_LOTACAO_INSTITUIDOR_PENSAO" : "codigo_orgao_lotacao_instituidor_pensao",
    "ORG_LOTACAO_INSTITUIDOR_PENSAO" : "nome_orgao_lotacao_instituidor_pensao",
    "COD_ORGSUP_LOTACAO_INSTITUIDOR_PENSAO" : "codigo_orgao_superior_lotacao_instituidor_pensao",
    "ORGSUP_LOTACAO_INSTITUIDOR_PENSAO" : "nome_orgao_superior_lotacao_instituidor_pensao",
    "COD_TIPO_VINCULO" : "codigo_tipo_vinculo",
    "TIPO_VINCULO" : "nome_tipo_vinculo",
    "SITUACAO_VINCULO" : "nome_situacao_vinculo",
    "REGIME_JURIDICO_INSTITUIDOR_PENSAO" : "nome_regime_juridico_instituidor_pensao",
    "JORNADA_DE_TRABALHO_INSTITUIDOR_PENSAO" : "nome_jornada_trabalho_instituidor_pensao",
    "DATA_INGRESSO_CARGOFUNCAO_INSTITUIDOR_PENSAO" : "data_ingresso_cargo_funcao_instituidor_pensao",
    "DATA_INGRESSO_ORGAO_INSTITUIDOR_PENSAO" : "data_ingresso_orgao_instituidor_pensao",
    "DOCUMENTO_INGRESSO_SERVICOPUBLICO_INSTITUIDOR_PENSAO" : "nome_documento_ingresso_servico_publico_instituidor_pensao",
    "DATA_DIPLOMA_INGRESSO_SERVICOPUBLICO_INSTITUIDOR_PENSAO" : "data_diploma_ingresso_servico_publico_instituidor_pensao",
    "DIPLOMA_INGRESSO_ORGAO_INSTITUIDOR_PENSAO" : "codigo_diploma_ingresso_orgao_instituidor_pensao",
    "DIPLOMA_INGRESSO_SERVICOPUBLICO_INSTITUIDOR_PENSAO" : "codigo_diploma_ingresso_servico_publico_instituidor_pensao"
}


cols_to_clean_cadastro_servidor = {
    "DESCRICAO_CARGO": ["Sem informaç", "Inválido"],
    "PADRAO_CARGO": ["-1", "-3"],
    "NIVEL_CARGO" : ["-1", "-3"],
    "SIGLA_FUNCAO" : ["-1", "-11"],
    "NIVEL_FUNCAO" : ["-1", "-11"],
    "FUNCAO" : ["Sem informação"],
    "CODIGO_ATIVIDADE" : ["-1", "-11"],
    "ATIVIDADE" : ["Sem informaç"],
    "COD_UORG_LOTACAO" : ["-1", "-11", "-3", "-99"],
    "UORG_LOTACAO" : ["Sem informação", "Inválido"],
    "COD_ORG_LOTACAO": ["-20"],
    "COD_ORGSUP_LOTACAO" : ["-1", "-20"],
    "ORGSUP_LOTACAO" : ["Sem informação"],
    "COD_UORG_EXERCICIO" : ["-1", "-11", "-3", "-99"],
    "UORG_EXERCICIO" : ["Sem informação", "Inválido"],
    "COD_ORG_EXERCICIO": ["-20"],
    "COD_ORGSUP_EXERCICIO" : ["-1", "-20"],
    "ORGSUP_EXERCICIO" : ["Sem informação"],
    "REGIME_JURIDICO" : ["Sem informaç"],
    "JORNADA_DE_TRABALHO" : ["Sem informaç", "Inválido"],
    "DOCUMENTO_INGRESSO_SERVICOPUBLICO" : [".", "-"],
    "DIPLOMA_INGRESSO_ORGAO" : ["Sem informaç", "Inválido"],
    "DIPLOMA_INGRESSO_SERVICOPUBLICO" : ["Sem informaç", "Inválido"],
    "UF_EXERCICIO" : ["-1"]
}

cols_to_clean_cadastro_aposentado = {
    "COD_TIPO_APOSENTADORIA" : ["-1"],
    "TIPO_APOSENTADORIA" : ["Sem informação"],
    "COD_UORG_LOTACAO" : ["-3", "-11"],
    "UORG_LOTACAO" : ["Inválido"],
    "COD_ORG_LOTACAO" : ["-20"],
    "COD_ORGSUP_LOTACAO" : ["-20", "-1"],
    "ORGSUP_LOTACAO" : ["Sem informação"],
    "REGIME_JURIDICO" : ["Sem informaç"],
    "JORNADA_DE_TRABALHO" : ["Inválido", "Sem informaç"]
}

cols_to_clean_cadastro_pensionista = {
    "NOME_INSTITUIDOR_PENSAO" : ["Sem informação"],
    "COD_TIPO_PENSAO" : ["-1", "-3"],
    "TIPO_PENSAO" : ["Sem informação", "Inválido"],
    "DESCRICAO_CARGO_INSTITUIDOR_PENSAO" : ["Inválido"],
    "COD_UORG_LOTACAO_INSTITUIDOR_PENSAO" : ["-11", "-3"],
    "UORG_LOTACAO_INSTITUIDOR_PENSAO" : ["Inválido"],
    "COD_ORG_LOTACAO_INSTITUIDOR_PENSAO" : ["-20"],
    "COD_ORGSUP_LOTACAO_INSTITUIDOR_PENSAO" : ["-1", "-20"],
    "ORGSUP_LOTACAO_INSTITUIDOR_PENSAO" : ["Sem informação"],
    "REGIME_JURIDICO_INSTITUIDOR_PENSAO" : ["Inválido", "Sem informaç"],
    "JORNADA_DE_TRABALHO_INSTITUIDOR_PENSAO" : ["Inválido", "Sem informaç"]
}


cols_to_long_cadastro = {
    "Servidores" : ["codigo_unidade_organizacional_lotacao", "codigo_unidade_organizacional_exercicio"],
    "Aposentados" : ["codigo_unidade_organizacional_lotacao"],
    "Pensionistas" : ["codigo_unidade_organizacional_lotacao_instituidor_pensao"]
}

cols_to_int_cadastro = {
    "Servidores" : ["identificador_pessoa", "codigo_nivel_cargo", "codigo_nivel_funcao_comissionada", "codigo_atividade_comissionada", "codigo_orgao_lotacao", "codigo_orgao_superior_lotacao", "codigo_orgao_exercicio", "codigo_orgao_superior_exercicio", "codigo_tipo_vinculo", "ano_mes"],
    "Aposentados" : ["identificador_pessoa","codigo_tipo_aposentadoria","codigo_orgao_lotacao","codigo_orgao_superior_lotacao","codigo_tipo_vinculo", "ano_mes"],
    "Pensionistas" : ["identificador_pessoa", "codigo_tipo_pensao", "codigo_orgao_lotacao_instituidor_pensao", "codigo_orgao_superior_lotacao_instituidor_pensao", "codigo_tipo_vinculo", "ano_mes"]
}






In [0]:
data_to_process = {}
# Obtem apenas as pastas cujos dados nao foram processados
for category in categories:
    data_to_process[category] = list(
        set(get_folders(category, "bronze")) - set(get_folders(category, "silver"))
    )


for category, folders in data_to_process.items():
    for folder in folders:
        # Leitura do csv
        df_cadastro = spark.read.option("header", True).option("sep", ";").csv(f'/bronze/SIAPE/{category}/{folder}/{folder}_Cadastro.csv')
        df_remuneracao = spark.read.option("header", True).option("sep", ";").csv(f'/bronze/SIAPE/{category}/{folder}/{folder}_Remuneracao.csv')

        # --------------- Limpeza e transformação de dados ---------------

        # ---- Remuneracao ----

        # Remove colunas
        cols_to_drop_remuneracao = [col for col in df_remuneracao.columns if '(U$)' in col]
        cols_to_drop_remuneracao.extend(["ANO", "MES"])
        df_remuneracao_v2 = drop_columns(df_remuneracao, cols_to_drop_remuneracao)

        # Renomeia colunas
        df_remuneracao_v3 = rename_columns(df_remuneracao_v2, cols_to_rename_remuneracao)

        # Altera , por . nas casas decimais
        cols_to_replace_remuneracao = [col for col in df_remuneracao_v3.columns if col.startswith("valor_")]
        df_remuneracao_v4 = df_remuneracao_v3.fillna(0.00, subset=cols_to_replace_remuneracao)
        df_remuneracao_v5 = replace_string(df_remuneracao_v4, cols_to_replace_remuneracao, ',', '.')

        # Adiciona coluna anomes para particionamento
        df_remuneracao_v6 = df_remuneracao_v5.withColumn("ano_mes", lit(f'{folder}'))

        # Cast de dados
        df_remuneracao_v7 = cast_data(df_remuneracao_v6, cols_to_replace_remuneracao, DecimalType(20,2))
        
        df_remuneracao_v8 = cast_data(df_remuneracao_v7, cols_to_int_remuneracao, IntegerType())

        # Altera a caixa das colunas do tipo string
        cols_string_type = get_columns_by_schema(df_remuneracao_v8,  StringType()) 
        df_remuneracao_v9 = strings_to_upper(df_remuneracao_v8, cols_string_type)

        save_delta(df_remuneracao_v9, category, 'Remuneracao')         

        # ---- Cadastro ----

        # Remove colunas        
        df_cadastro_v2 = drop_columns(df_cadastro, cols_to_drop_cadastro[category])

        if category == "Servidores":
            # Limpa valores invalidos
            df_cadastro_v3 = transform_invalid_values_to_null(df_cadastro_v2, cols_to_clean_cadastro_servidor)
            
            # Renomeia colunas
            df_cadastro_v4 = rename_columns(df_cadastro_v3, cols_to_rename_cadastro_servidor)

        elif category == "Aposentados":
            # Limpa valores invalidos
            df_cadastro_v3 = transform_invalid_values_to_null(df_cadastro_v2, cols_to_clean_cadastro_aposentado)
            
            # Renomeia colunas
            df_cadastro_v4 = rename_columns(df_cadastro_v3, cols_to_rename_cadastro_aposentado)

        else:
            # Limpa valores invalidos
            df_cadastro_v3 = transform_invalid_values_to_null(df_cadastro_v2, cols_to_clean_cadastro_pensionista)
            
            # Renomeia colunas
            df_cadastro_v4 = rename_columns(df_cadastro_v3, cols_to_rename_cadastro_pensionista)


        # Adiciona coluna anomes para particionamento
        df_cadastro_v5 = df_cadastro_v4.withColumn("ano_mes", lit(f'{folder}'))

        # Cast de dados
        cols_to_date_cadastro = [col for col in df_cadastro_v5.columns if col.startswith("data_")]
        df_cadastro_v6 = cast_to_date(df_cadastro_v5, cols_to_date_cadastro)

        df_cadastro_v7 = cast_data(df_cadastro_v6, cols_to_long_cadastro[category], LongType())

        df_cadastro_v8 = cast_data(df_cadastro_v7, cols_to_int_cadastro[category], IntegerType())

        # Altera a caixa das colunas do tipo string
        cols_to_upper_cadastro = get_columns_by_schema(df_cadastro_v8,  StringType()) 
        df_cadastro_v9 = strings_to_upper(df_cadastro_v8, cols_to_upper_cadastro)

        save_delta(df_cadastro_v9, category, 'Cadastro')


print('Processamento camada Silver finalizado.')
        
