In [None]:
import re

import pandas as pd
import os

from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, UniqueConstraint, func
from sqlalchemy.orm import DeclarativeBase, Session

from dotenv import dotenv_values

from municipio_id import municipio_map

## DATA FILTER CLASS

- transformar tudo para o tipo de dado do banco de dados, se possivel com SQLALCHEMY

In [None]:
class InepFilters():
    # csv_path is path source folder + file name
    def __init__(self, csv_path) -> None:
        self.__file_path = csv_path
        source_path, self.__file_name = os.path.split(csv_path)
        if '__' not in self.__file_name:
            pattern = r'(\d{4})_(\w+)_(\w+)_(\w+)\.csv'
        else:
            pattern = r'(\d{4})_(\w+)__(\w+)\.csv'
        self.__filters = re.findall(pattern, self.__file_name)[0]

        self.__correspondents = {
            'EnsinoFundamental': 'EF',
            'AnosIniciais': '1',
            'AnosFinais': '2',
            'TodososValoresdeColunas': 'todos',
            'EnsinoMdio': 'EM',
        }

        self.__year()
        self.__teaching_stage()

    def __year(self):
        self.__year = int(self.__filters[0])

    def __teaching_stage(self):
        self.__teaching_stage = self.__correspondents[self.__filters[1]]
        if self.__teaching_stage == 'EF':
            level = self.__correspondents[self.__filters[2]]
            if level != 'todos':
                self.__teaching_stage += level

    # getters
    def get_year(self):
        return self.__year

    def get_teaching_stage(self):
        return self.__teaching_stage

    def get_file_name(self):
        return self.__file_name

    def get_file_path(self):
        return self.__file_path

    def get_df(self) -> pd.DataFrame:
        return pd.read_csv(self.__file_path, sep=';')

## OFICIAL: Get path from categories and reading and group all file by path

In [None]:
def transform_file_path() -> list:
    source_path_data = os.path.abspath(os.path.join(os.getcwd(), "../oracle_data"))
    category_files = [os.path.join(source_path_data, i) for i in os.listdir(source_path_data)]
    return category_files

all_path_files = transform_file_path()
print(len(all_path_files))


## OFICIAL: GEN LIST INSTANCE FILTER BY PATH_FILE

In [None]:
all_category_objs = [InepFilters(i) for i in all_path_files]
print(len(all_category_objs))

### TEST: full_table_data (DISPOSABLE)

In [None]:
arquivo1 = all_category_objs[2]

In [None]:
df = arquivo1.get_df()
# df = df.sort_values(by='Categoria 1 - Ordenação')
df.rename(columns={'Categoria 1': 'cor_raca',
                   'Matrículas': 'quantidade',
                   'Categoria 2': 'dependencia_administrativa'}, inplace=True)
df.drop(columns=[
    'Etapa de Ensino - Superior', 'Etapa de Ensino', 'Localidade da Escola', 'Categoria 1 - Ordenação', 'Categoria 2 - Ordenação'], inplace=True)
df['etapa_de_ensino'] = arquivo1.get_teaching_stage()
df['ano'] = arquivo1.get_year()
df['municipio_id'] = df['Município'].map(municipio_map)
display(df)
# verifica se todos os valores da coluna tem tal valor
print((df['UF'] == 'MG').all())

In [None]:
lines_with_nan = df[df.isna().any(axis=1)]

In [None]:
display(lines_with_nan)

# OFICIAL CODE:

## Organizing table for insert

In [None]:
def make_full_table(data_filters: InepFilters) -> pd.DataFrame:
    df = data_filters.get_df()
    df.rename(columns={'Categoria 1': 'cor_raca',
                       'Matrículas': 'quantidade',
                       'Categoria 2': 'dependencia_administrativa'}, inplace=True)
    df.drop(columns=[
        'Etapa de Ensino - Superior', 'Etapa de Ensino', 'Localidade da Escola', 'Categoria 1 - Ordenação', 'Categoria 2 - Ordenação'], inplace=True)
    df['etapa_de_ensino'] = data_filters.get_teaching_stage()
    df['ano'] = data_filters.get_year()
    df['municipio_id'] = df['Município'].map(municipio_map)
    return df

## Settings database and create SQLALCHEMY engine

In [None]:
config = dotenv_values("./.env")
username = config.get("DATABASE_USERNAME")
password = config.get("DATABASE_PASSWORD")
dbname = config.get("DATABASE_NAME")
port = config.get("DATABASE_PORT")
host = config.get("DATABASE_HOST")

engine = create_engine(
    f"postgresql+psycopg2://{username}:{password}@{host}:{port}/{dbname}", echo=True)

# inserting in database -> refactore

## necessity apply design patterns: [Design Patterns](https://refactoring.guru/)

In [None]:
class Base(DeclarativeBase):
    ...

In [None]:
class Municipio(Base):
    __tablename__ = 'Municipio'
    id = Column(Integer, primary_key=True, autoincrement=True)
    nome = Column(String, nullable=False)
    UF = Column(String, nullable=False)


class Filtro(Base):
    __tablename__ = 'Filtro'

    id = Column(Integer, primary_key=True, autoincrement=True)
    municipio_id = Column(Integer, ForeignKey('Municipio.id'), nullable=False)
    etapa_de_ensino = Column(String(5), nullable=False)
    ano = Column(Integer, nullable=False)

    __table_args__ = (UniqueConstraint(
        'municipio_id', 'etapa_de_ensino', 'ano', name='unique_municipio_etapa_ano'),)


class Matricula(Base):
    __tablename__ = 'Matricula'
    id = Column(Integer, primary_key=True, autoincrement=True)
    id_filtro = Column(Integer, ForeignKey('Filtro.id'), nullable=False)
    cor_raca = Column(String(20), nullable=True)
    dependencia_administrativa = Column(String(20), nullable=True)
    quantidade = Column(Integer, nullable=True)


# Configuração do banco de dados
Base.metadata.create_all(engine)

### Verifica se o filtro existe. Se não existir, cria e retorna o ID

In [None]:
def get_existing_filters(session):
    # Obtém todos os filtros existentes no banco de dados
    existing_filtros = session.query(Filtro).all()
    # Converte em um dicionário para acesso rápido
    filter_dict = {(f.municipio_id, f.etapa_de_ensino, f.ano): f.id for f in existing_filtros}
    return filter_dict

### Define checkpoints functions

In [None]:
script_path = os.path.dirname(os.path.realpath(__file__))
CHECKPOINT_FILE = os.path.abspath(os.path.join(script_path, '.data_checkpoint.log'))

In [None]:
def save_checkpoint(file_name):
    with open(CHECKPOINT_FILE, "w") as f:
        f.write(f"{file_name}\n")


def load_checkpoint():
    if os.path.exists(CHECKPOINT_FILE):
        with open(CHECKPOINT_FILE, "r") as f:
            content = f.read().strip()
            if content:
                file_name = content
                return file_name
    return None

In [None]:
def bulk_insert_data(full_table_data: pd.DataFrame, current_file: str=None):
    # Verifica o progresso do checkpoint
    with Session(engine) as session:
        filter_dict = get_existing_filters(session)
        matricula_objs = []

        # Processar cada linha do DataFrame
        for _, row in full_table_data.iterrows():
            # Obter ou criar o filtro e obter seu ID

            filter_key = (row['municipio_id'],
                          row['etapa_de_ensino'], row['ano'])

            if filter_key in filter_dict:
                # Filtro já existe, usar o ID existente.
                id_filter = filter_dict[filter_key]
            else:
                # Criar novo e adicionar no dicionario
                filtro = Filtro(
                    municipio_id=row['municipio_id'], etapa_de_ensino=row['etapa_de_ensino'], ano=row['ano'])
                session.add(filtro)
                session.flush()
                id_filter = filtro.id
                filter_dict[filter_key] = id_filter

            matricula = Matricula(id_filtro=id_filter,
                                  cor_raca=row['cor_raca'],
                                  dependencia_administrativa=row['dependencia_administrativa'],
                                  quantidade=row['quantidade'])
            matricula_objs.append(matricula)
            if current_file:
                save_checkpoint(current_file)

        # Inserir em lote
        session.bulk_save_objects(matricula_objs)
        session.commit()
    if current_file:
        os.remove(CHECKPOINT_FILE)

In [None]:
def alternate_file_and_insert():
    checkpoint_file = load_checkpoint()
    # garante que a partir de um certo index em all_category_objs, todos serao varridos
    was_not_scanned = False
    if not checkpoint_file:
        was_not_scanned = True
    for matricula_data in all_category_objs:
        if was_not_scanned:
            df_full_table = make_full_table(matricula_data)
            bulk_insert_data(
                df_full_table, matricula_data.get_file_path())
        elif checkpoint_file == matricula_data.get_file_path():
            was_not_scanned = True

In [None]:
alternate_file_and_insert()

## Insert 'Todos' Columns

In [None]:
def check_and_create_all_municipios_id(session : Session):
    # Verificar se o município com ID 1 existe e, se não, inseri-lo
    municipio = session.query(Municipio).filter_by(id=1).first()
    if not municipio:
        municipio = Municipio(id=1, nome='Todos', UF='MG')
        session.add(municipio)
        session.commit()

def municipios_sum_query_df(session: Session):
    query = (
        session.query(
            Filtro.etapa_de_ensino,
            Filtro.ano,
            Matricula.cor_raca,
            Matricula.dependencia_administrativa,
            func.sum(Matricula.quantidade).label('quantidade')
        )
        .join(Matricula, Filtro.id == Matricula.id_filtro)
        .filter(Filtro.municipio_id != 1)
        .filter(Filtro.etapa_de_ensino != 'Todos')
        .group_by(
            Filtro.etapa_de_ensino,
            Filtro.ano,
            Matricula.cor_raca,
            Matricula.dependencia_administrativa
        )
        .order_by(
            Filtro.ano,
            Filtro.etapa_de_ensino,
            Matricula.cor_raca,
            Matricula.dependencia_administrativa
        )
    )

    # Executar a query
    results = query.all()
    # Converter resultados em DataFrame
    df = pd.DataFrame(results, columns=['etapa_de_ensino', 'ano', 'cor_raca', 'dependencia_administrativa', 'quantidade'])
    df['municipio_id'] = 1
    
    display(df)
    return df

def etapa_de_ensino_sum(session: Session):
    query = (
        session.query(
            Filtro.municipio_id,
            Filtro.ano,
            Matricula.cor_raca,
            Matricula.dependencia_administrativa,
            func.sum(Matricula.quantidade).label('quantidade')
        )
        .join(Matricula, Filtro.id == Matricula.id_filtro)
        .filter(Filtro.municipio_id != 1)
        .filter(Filtro.etapa_de_ensino != 'Todos')
        .group_by(
            Filtro.municipio_id,
            Filtro.ano,
            Matricula.cor_raca,
            Matricula.dependencia_administrativa
        )
        .order_by(
            Filtro.ano,
            Filtro.municipio_id,
            Matricula.cor_raca,
            Matricula.dependencia_administrativa
        )
    )
    
    # Executar a query
    results = query.all()
    # Converter resultados em DataFrame
    df = pd.DataFrame(results, columns=['municipio_id', 'ano', 'cor_raca', 'dependencia_administrativa', 'quantidade'])
    df['etapa_de_ensino'] = 'Todos'
    
    display(df)
    return df

def municipios_etapa_de_ensino_sum(session: Session):
    query = (
        session.query(
            Filtro.ano,
            Matricula.cor_raca,
            Matricula.dependencia_administrativa,
            func.sum(Matricula.quantidade).label('quantidade')
        )
        .join(Matricula, Filtro.id == Matricula.id_filtro)
        .filter(Filtro.municipio_id != 1)
        .filter(Filtro.etapa_de_ensino != 'Todos')
        .group_by(
            Filtro.ano,
            Matricula.cor_raca,
            Matricula.dependencia_administrativa
        )
        .order_by(
            Filtro.ano,
            Matricula.cor_raca,
            Matricula.dependencia_administrativa
        )
    )

    # Executar a query
    results = query.all()
    # Converter resultados em DataFrame
    df = pd.DataFrame(results, columns=['ano', 'cor_raca', 'dependencia_administrativa', 'quantidade'])
    df['etapa_de_ensino'] = 'Todos'
    df['municipio_id'] = 1
    
    display(df)
    return df

def send_combination(session: Session):
    check_and_create_all_municipios_id(session=session)
    query_df = municipios_etapa_de_ensino_sum(session=session)
    bulk_insert_data(full_table_data=query_df)

def send_etapa_de_ensino_sum_to_db(session: Session):
    query_df = etapa_de_ensino_sum(session=session)
    bulk_insert_data(full_table_data=query_df)

def send_municipios_sum_to_db(session: Session):
    check_and_create_all_municipios_id(session=session)
    query_df = municipios_sum_query_df(session=session)
    bulk_insert_data(full_table_data=query_df)

def insert_total():
    with Session(engine) as session:
        send_municipios_sum_to_db(session=session)
        send_etapa_de_ensino_sum_to_db(session=session)
        send_combination(session=session)

In [None]:
insert_total()