In [None]:
%run "D:/a3-una-engenharia-de-dados/notebooks/utils/logger.ipynb"

In [None]:
from os import listdir
from datetime import datetime
from typing import List

from pandas import DataFrame, read_csv

In [None]:
class PAICBronzeParaSilver:
    def __init__(self) -> None:
        self._logger = Logger()
        self._diretorio_raiz = "D:/a3-una-engenharia-de-dados"

    def executar_etl(self) -> None:
        """
        Executa o processo de etl para cada tabela na listagem de entrada
        """
        arquivos_de_entrada = self._criar_lista_de_arquivos_bronze()
        for arquivo in arquivos_de_entrada:
            self._logger.log_mensagem(mensagem=f"Processando tabela {arquivo}")
            tabela_paic = self._extrair_tabela_paic_bronze(
                nome_da_tabela=arquivo)
            tabela_paic = self._transformar_tabela_paic(tabela_paic=tabela_paic)
            self._carregar_tabela_paic(
                tabela_paic=tabela_paic,
                nome_da_tabela=arquivo)

    def _criar_lista_de_arquivos_bronze(self) -> List[str]:
        """
        Gera a listagem de arquivos bronze
        """
        self._logger.log_mensagem(
            mensagem="Criando lista de arquivos a serem processados")
        lista_arquivos_a_serem_processados = []
        lista_arquivos_bronze = listdir(
            path=f"{self._diretorio_raiz}/dados/bronze")    
        for arquivo in lista_arquivos_bronze:
            lista_arquivos_a_serem_processados.append(
                arquivo.replace(".csv", ""))
        return lista_arquivos_a_serem_processados

    def _extrair_tabela_paic_bronze(self, nome_da_tabela: str) -> DataFrame:
        """
        Lê a tabela localizada no diretório da camada bronze
        """
        self._logger.log_mensagem(
            mensagem=f"Lendo a tabela bronze {nome_da_tabela}")
        caminho = f"{self._diretorio_raiz}/dados/bronze/{nome_da_tabela}.csv"
        tabela_paic = read_csv(filepath_or_buffer=caminho, sep=";")
        return tabela_paic

    def _transformar_tabela_paic(self, tabela_paic: DataFrame) -> DataFrame:
        """
        Performa as transformações necessárias para enriquecer os dados
        vindos da camada bronze
        """
        self._logger.log_mensagem(mensagem="Iniciando transformações")
        tabela_paic = self._remover_linhas_de_categoria_da_tabela_paic(
            tabela_paic=tabela_paic)
        tabela_paic = self._criar_coluna_de_codigo_cnae(tabela_paic=tabela_paic)
        tabela_paic = self._criar_coluna_de_categoria_empresa(
            tabela_paic=tabela_paic)
        tabela_paic = self._mudar_tipo_das_colunas_para_object(
            tabela_paic=tabela_paic)
        tabela_paic = self._remover_espacamento_das_colunas(
            tabela_paic=tabela_paic)
        tabela_paic = self._de_para_coluna_valor(tabela_paic=tabela_paic)
        return tabela_paic
    
    def _remover_linhas_de_categoria_da_tabela_paic(
            self,
            tabela_paic: DataFrame) -> DataFrame:
        """
        Remove a totalização referente a totalização de cada divisão
        """
        self._logger.log_mensagem(mensagem="Removendo totalização de divisao")
        categorias = [
            "Total das empresas",
            "Empresas entre 1 e 4 de PO - total",
            "Empresas entre 5 e 29 de PO - total",
            "Empresas com 30 ou mais de PO - total"]
        return (
            tabela_paic
            .query(expr="pessoal_ocupado_grupos_classes not in @categorias"))

    def _criar_coluna_de_codigo_cnae(self, tabela_paic: DataFrame) -> DataFrame:
        """
        Extrair o dado do código cnae da coluna pessoal_ocupado_grupos_classes
        em uma nova
        """
        self._logger.log_mensagem(mensagem="Criando coluna codigo_cnae")
        tabela_paic.loc[:, ("codigo_cnae")] = (
            tabela_paic
            .loc[:, ("pessoal_ocupado_grupos_classes")]
            .str
            .extract(pat=r"(.+?(?= Empresas))"))
        return tabela_paic

    def _criar_coluna_de_categoria_empresa(
            self,
            tabela_paic: DataFrame) -> DataFrame:
        """
        Extrair o dado da categoria da empresa da coluna
        pessoal_ocupado_grupos_classes em uma nova
        """
        self._logger.log_mensagem(mensagem="Criando coluna categoria_empresa")
        tabela_paic.loc[:, ("categoria_empresa")] = (
            tabela_paic
            .loc[:, "pessoal_ocupado_grupos_classes"]
            .str
            .extract(pat=r"(Empresas.*?PO)"))
        return tabela_paic

    def _mudar_tipo_das_colunas_para_object(
            self,
            tabela_paic: DataFrame) -> DataFrame:
        """
        Muda o tipo de total as colunas para string
        """
        self._logger.log_mensagem(
            mensagem="Mudando o tipo de total as colunas para string")
        relacao_de_tipo_por_coluna = {}
        colunas = tabela_paic.columns
        for coluna in colunas:
            relacao_de_tipo_por_coluna[coluna] = "str"
        return tabela_paic.astype(dtype=relacao_de_tipo_por_coluna)

    def _remover_espacamento_das_colunas(
            self,
            tabela_paic: DataFrame) -> DataFrame:
        """
        Remove espaçamentos a esquerda e a direita de todas as colunas
        """
        self._logger.log_mensagem(mensagem="Removendo espacamento das colunas")
        colunas = tabela_paic.columns
        for coluna in colunas:
            if tabela_paic[coluna].dtype == "str":
                tabela_paic.loc[:, (coluna)] = (
                    tabela_paic
                    .loc[:, coluna]
                    .str
                    .strip())
        return tabela_paic

    def _de_para_coluna_valor(self, tabela_paic: DataFrame) -> DataFrame:
        """
        Faz o de para de valores invalidos de acordo com a legenda da pesquisa
        """
        self._logger.log_mensagem(mensagem="Executando de para coluna valor")
        valor_de_para = {
            "-": "0",
            "..": "0",
            "...": "0",
            "X": "0"
        }
        tabela_paic.loc[:, ("valor")] = (
            tabela_paic.loc[:, ("valor")]
            .replace(to_replace=valor_de_para))
        return tabela_paic

    def _carregar_tabela_paic(
            self,
            tabela_paic: DataFrame,
            nome_da_tabela: str) -> None:
        """
        Carrega a tabela paic na pasta silver do drive
        """
        self._logger.log_mensagem(
            mensagem=f"Carregando tabela {nome_da_tabela}")
        caminho = f"{self._diretorio_raiz}/dados/silver/{nome_da_tabela}.csv"
        colunas = [
            "ano_pesquisa", "variavel", "codigo_cnae", "categoria_empresa",
            "valor", "unidade"
        ]
        tabela_paic.to_csv(
        path_or_buf=caminho,
        sep=";",
        mode="w",
        index=False,
        columns=colunas,
        encoding="utf-8")


if __name__ == "__main__":
    paic_bronze_para_silver = PAICBronzeParaSilver()
    paic_bronze_para_silver.executar_etl()