In [3]:
import pandas as pd  # Importa pandas.
import sqlite3  # Importa sqlite3.
from itertools import combinations  # Importa 'combinations'.
import csv  # Importa csv
import tqdm  # Importa tqdm

## Funções

A função **contar_por_grupo** consulta um banco de dados para contar as ocorrências de **"Admissao"** agrupadas por variáveis específicas de uma tabela, retornando o resultado em um DataFrame do Pandas.

In [1]:
def contar_por_grupo(conexao, tabela, variaveis):
    # Esta função conta ocorrências únicas de combinações de variáveis.
    try:
        colunas = ", ".join(variaveis)  # Formata variáveis para SQL.

        sql = f"""
            SELECT {colunas}, COUNT(*) as admissoes
            FROM {tabela}
            WHERE tipomovimentação LIKE '%Admissao%'
            GROUP BY {colunas}
        """  # Constrói a consulta SQL de contagem.
        return pd.read_sql(sql, conexao)  # Executa a consulta e retorna DataFrame.
    except TypeError as t:
        print(t)
        print("As variáveis devem ser uma lista ou tupla de strings.")
    except pd.io.sql.DatabaseError as pdr:
        print(pdr)
        print("Falha ao executar a consulta SQL. Verifique a conexão com o banco de dados e a sintaxe da query.")
    except AttributeError as a:
        print(a)
        print("O objeto 'conexao' não é um objeto de conexão de banco de dados válido para pandas.")

Essa função consulta um banco de dados para **contar as demissões** em uma tabela específica, agrupando os resultados pelas variáveis (colunas) fornecidas.

In [2]:
def contar_por_grupo_demissoes(conexao, tabela, variaveis):
    # Esta função conta ocorrências únicas de combinações de variáveis.
    try:
        colunas = ", ".join(variaveis)  # Formata variáveis para SQL.

        sql = f"""
            SELECT {colunas}, COUNT(*) as demissoes
            FROM {tabela}
            WHERE tipomovimentação NOT LIKE '%Admissao%'
            GROUP BY {colunas}
        """  # Constrói a consulta SQL de contagem.

        return pd.read_sql(sql, conexao)  # Executa a consulta e retorna DataFrame.
    except TypeError as t:
        print(t)
        print("A variável 'variaveis' deve ser uma lista ou tupla de strings.")

    except ValueError as v:
        print(v)
        print("A consulta SQL está malformada ou a tabela/coluna não existe no banco de dados.")

A função **`contagem_total_admissoes`** executa uma consulta SQL para contar o número total de admissões em uma tabela específica do banco de dados e, em seguida, retorna esse valor em um DataFrame do Pandas formatado com colunas vazias para padronização.

In [None]:
def contagem_total_admissoes(conexao,tabela):
    # Esta função conta o número total de registros na tabela.
  try:
    contagem = pd.read_sql(f"SELECT count(*) FROM {tabela} WHERE tipomovimentação LIKE '%Admissao%';", conexao) # Conta total de admissões.

    contagem = contagem.rename(columns={"count(*)": 'admissoes'}) # Renomeia coluna.

    valor = contagem.iloc[0, 0] # Extrai o valor da contagem.

    dados = pd.DataFrame([{
        'uf': "",
        'sexo': "",
        'raçacor': "",
        'faixa_idade': "",
        'graudeinstrução': "",
        "admissoes": valor
    }]) # Cria DataFrame padronizado.

    return dados # Retorna o DataFrame.
  except:
    print(f"Erro ao executar a contagem total de admissões")

A função **`contagem_total_demissoes`** consulta um banco de dados para obter o número total de demissões em uma tabela específica e retorna esse total em um DataFrame do Pandas, preenchendo as demais colunas com valores vazios para padronização.

In [None]:
def contagem_total_demissoes(conexao,tabela):
    # Esta função conta o número total de registros na tabela.
  try:
    contagem = pd.read_sql(f"SELECT count(*) FROM {tabela} WHERE tipomovimentação NOT LIKE '%Admissao%';", conexao) # Conta total de demissões.

    contagem = contagem.rename(columns={"count(*)": 'demissoes'}) # Renomeia coluna.

    valor = contagem.iloc[0, 0] # Extrai o valor da contagem.

    dados = pd.DataFrame([{
        'uf': "",
        'sexo': "",
        'raçacor': "",
        'faixa_idade': "",
        'graudeinstrução': "",
        "demissoes": valor
    }]) # Cria DataFrame padronizado.
    return dados # Retorna o DataFrame.
  except:
    print(f"Erro ao executar a contagem total de demissões")

A função **`tabela_contagem`** calcula o número de admissões para todas as combinações possíveis de variáveis especificadas e as concatena em um único DataFrame do Pandas. Ela inclui uma contagem total de admissões no início e adiciona uma coluna de mês ao DataFrame final.

In [None]:
def tabela_contagem(n,tabela,variaveis,mes,conexao) -> pd.DataFrame:
    todas_combinacoes = []  # Inicializa lista de combinações.

    for r in range(1, len(variaveis) + 1):  # Itera sobre tamanhos de combinação.
        todas_combinacoes.extend(combinations(variaveis, r))  # Gera e adiciona combinações.

    todas_combinacoes = [list(comb) for comb in todas_combinacoes]  # Converte tuplas em listas.
    df_start = contagem_total_admissoes(conexao,tabela)  # Contagem total de admissões.

    for comb in todas_combinacoes:
      df = contar_por_grupo(conexao,tabela,comb)  # Conta por grupo.
      df_start = pd.concat([df_start,df],ignore_index=True)  # Concatena DataFrames.

    df_start.insert(0,'mes',mes)  # Insere coluna 'mes'.
    return df_start  # Retorna o DataFrame final.

A função **`tabela_contagem_demissoes`** calcula o número de demissões para todas as combinações possíveis de variáveis especificadas, concatenando-as em um único DataFrame do Pandas. Ela inclui uma contagem total de demissões no início e adiciona uma coluna de mês ao DataFrame resultante.

In [None]:
def tabela_contagem_demissoes(n,tabela,variaveis,mes,conexao):
    todas_combinacoes = []  # Inicializa lista de combinações.

    for r in range(1, len(variaveis) + 1):  # Itera sobre tamanhos de combinação.
        todas_combinacoes.extend(combinations(variaveis, r))  # Gera e adiciona combinações.

    todas_combinacoes = [list(comb) for comb in todas_combinacoes]  # Converte tuplas em listas.
    df_start = contagem_total_demissoes(conexao,tabela)  # Contagem total de demissões.

    for comb in todas_combinacoes:
      df = contar_por_grupo_demissoes(conexao,tabela,comb)  # Conta por grupo de demissões.
      df_start = pd.concat([df_start,df],ignore_index=True)  # Concatena DataFrames.

    df_start.insert(0,'mes',mes)  # Insere coluna 'mes'.
    return df_start  # Retorna o DataFrame final.

## Executando planilha de admissões

In [None]:
chaves = ['competênciamov','cbo2002ocupação','uf', 'sexo', 'raçacor', 'faixa_idade', 'graudeinstrução']  # Define lista de chaves.

In [None]:
# dic_dfs = {}  # Inicializa dicionário de DataFrames.

# dic_mov = {
#     "cagedmov202401": "janeiro",
#     "cagedmov202402": "fevereiro",
#     "cagedmov202403": "março",
#     "cagedmov202404": "abril",
#     "cagedmov202405": "maio",
#     "cagedmov202406": "junho",
#     "cagedmov202407": "julho",
#     "cagedmov202408": "agosto",
#     "cagedmov202409": "setembro",
#     "cagedmov202410": "outubro",
#     "cagedmov202411": "novembro",
#     "cagedmov202412": "dezembro"
# }

In [None]:
# for chave, valor in dic_mov.items():
#     dic_dfs[valor] = tabela_contagem("",chave,chaves,valor).fillna("") # Popula dic_dfs com contagens de admissões.
#     print(f"Mes de {valor} executado") # Imprime progresso.

# df_final = dic_dfs['janeiro'] # Inicializa df_final com dados de janeiro.
# for chave, valor in dic_dfs.items():
#     if chave != 'janeiro':
#         df_final = pd.concat([df_final,valor],ignore_index=True) # Concatena os demais meses.

# df_final # Exibe o DataFrame final.

In [None]:
ufs = ['','CE','BA','PE'] # Lista de UFs.

In [None]:
# tabela_filtrada = df_final.query("uf in @ufs") # Filtra o DataFrame por UFs.

A função **`Tabela_contagem_admissoes`** automatiza a coleta e agregação de dados de admissões. Ela percorre um dicionário de arquivos, calcula o número de admissões para diversas combinações de variáveis em cada um, e então, une esses resultados em um único DataFrame. Por fim, o DataFrame consolidado é filtrado por Unidades da Federação (UFs) e retornado. Se ocorrer qualquer problema durante esse processo, uma mensagem de erro será exibida.

In [None]:
def Tabela_contagem_admissoes(conexao,dic_arquivos):
    dic_dfs = {}  # Inicializa dicionário de DataFrames.
    for chave, valor in tqdm.tqdm(dic_arquivos.items()):
      dic_dfs[valor] = tabela_contagem("",chave,chaves,valor,conexao).fillna("") # Popula dic_dfs com contagens de admissões.

    df_final = dic_dfs['janeiro'] # Inicializa df_final com dados de janeiro.
    for chave, valor in dic_dfs.items():
      if chave != 'janeiro':
        df_final = pd.concat([df_final,valor],ignore_index=True) # Concatena os demais meses.

    tabela_filtrada = df_final.query("uf in @ufs") # Filtra o DataFrame por UFs.
    return tabela_filtrada


## Executando planilha de demissões

In [None]:
# dic_dfs2 = {} # Inicializa um dicionário vazio para armazenar DataFrames.

In [None]:
# for chave, valor in dic_mov.items():
#     dic_dfs2[valor] = tabela_contagem_demissoes("",chave,chaves,valor).fillna("") # Popula dic_dfs2 com contagens de demissões.
#     print(f"Mes de {valor} executado") # Imprime progresso.

# df_final2 = dic_dfs2['janeiro'] # Inicializa df_final2 com dados de janeiro.
# for chave, df in dic_dfs2.items(): # Itera sobre o dicionário original 'dic_dfs'.
#     if chave != 'janeiro':
#         df_final2 = pd.concat([df_final2,df],ignore_index=True) # Concatena os DataFrames.

# df_final2 # Exibe o DataFrame final.

In [None]:
# tabela_filtrada2 = df_final2.query("uf in @ufs") # Filtra o DataFrame por UFs.

Esta função, **`Tabela_contagem_demissoes`**, processa um dicionário de arquivos para calcular e consolidar contagens de demissões. Para cada item no dicionário de entrada, ela gera um DataFrame com contagens de demissões agrupadas por diferentes variáveis e as concatena em um DataFrame final. Caso ocorra algum erro durante o processo, uma mensagem de erro é exibida.

In [None]:
def Tabela_contagem_demissoes(conexao,dic_arquivos):
  dic_dfs2 = {} # Inicializa um dicionário vazio para armazenar DataFrames.
  global chaves
  try:
    for chave, valor in tqdm.tqdm(dic_arquivos.items()):
      dic_dfs2[valor] = tabela_contagem_demissoes("",chave,chaves,valor,conexao).fillna("") # Popula dic_dfs2 com contagens de demissões.

    df_final2 = dic_dfs2['janeiro']
    for chave, df in dic_dfs2.items(): # Itera sobre o dicionário original 'dic_dfs'.
      if chave != 'janeiro':
        df_final2 = pd.concat([df_final2,df],ignore_index = True)
    return df_final2
  except:
    print(f"Erro ao executar a tabela de demissões")

In [None]:
dic = {
    "cagedexc202401": "janeiro",
    "cagedexc202402": "fevereiro"
}

In [None]:
con = sqlite3.connect("exc.db")

In [None]:
teste = ['mes','competênciamov','cbo2002ocupação','uf', 'sexo', 'raçacor', 'faixa_idade', 'graudeinstrução',
       'demissoes']

In [None]:
Tabela_contagem_demissoes(con,dic).reindex(columns = teste)

100%|██████████| 2/2 [00:05<00:00,  2.59s/it]


Unnamed: 0,mes,competênciamov,cbo2002ocupação,uf,sexo,raçacor,faixa_idade,graudeinstrução,demissoes
0,janeiro,,,,,,,,4109
1,janeiro,202001.0,,,,,,,24
2,janeiro,202002.0,,,,,,,21
3,janeiro,202003.0,,,,,,,17
4,janeiro,202004.0,,,,,,,20
...,...,...,...,...,...,...,...,...,...
413826,fevereiro,202401.0,919105.0,MT,Masculino,Parda,25 a 39,MEDIO COMPL,1
413827,fevereiro,202401.0,991305.0,MS,Masculino,Parda,25 a 39,MEDIO COMPL,1
413828,fevereiro,202401.0,991310.0,SP,Masculino,Branca,40 a 59,MEDIO COMPL,1
413829,fevereiro,202401.0,991315.0,SC,Masculino,Não informada,40 a 59,MEDIO COMPL,1
