#### Importação das bibliotecas

In [None]:
import os
import re
from dotenv import load_dotenv
from typing import Optional, Tuple

import pandas as pd
import psycopg2 as pg
import sqlalchemy
from sqlalchemy import create_engine
import panel as pn
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg')

pn.extension()
pn.extension('tabulator')
pn.extension(notifications=True)

#### Variáveis do ambiente

In [None]:
# carrega as variáveis de ambiente do arquivo .env
load_dotenv()

# ler as variáveis de ambiente que tem as credênciais
DB_HOST = os.getenv("DB_HOST")
DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PASS = os.getenv("DB_PASS")

# cria conexão com o Postgre usando o psycopg2 usando as variáveis de ambiente
# usado para executar comandos SQL
con = pg.connect(host = DB_HOST, dbname = DB_NAME, user = DB_USER, password = DB_PASS)
con.autocommit = False

#### SQLAlchemy

In [None]:
# define a string de conexão para o SQLAlchemy as variáveis de ambiente
cnx = f'postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}/{DB_NAME}'

# cria o objeto engine do SQLAlchemy que será usado para conectar e executar comandos no banco
engine = sqlalchemy.create_engine(cnx)


#### Validação

In [None]:
# funções de validação de dados

# cnpj
def validar_cnpj(cnpj:str) -> bool:
    cnpj_limpo = re.sub(r'\D', '', cnpj)
    return len(cnpj_limpo) == 14

# email
def validar_email(email:str) -> bool:
    padrao_email = r'^[\w\.-]+@[\w\.-]+\.\w+$'
    return re.match(padrao_email, email) is not None

# validação campos obrigatórios
def validar_obrigatorios(campos: dict) -> Tuple[bool, str]:
    for campo, valor in campos.items():
        if not valor or str(valor).strip() == '':
            return False, f'O campo "{campo}" é obrigatório'
    return True, ''

#### CRUD - Inventor


In [None]:
# consulta incial para testar a conexão

query = "SELECT * FROM instituicao;"
df = pd.read_sql_query(query, cnx)

In [None]:
# ignorar campos vazios
flag = ''

In [None]:
# campo para consulta com filtro
consulta_nome = pn.widgets.TextInput(
    name = 'Consulta por Nome',
    placeholder = "Digite o nome para consulta",
    description = "Em branco mostra todas as instituições cadastradas",
)

# dados da instituição

# nome
nome_oficial = pn.widgets.TextInput(
    name = 'Nome Oficial', 
    placeholder = 'Digite o nome')

# cnpj
cnpj = pn.widgets.TextInput(
    name = 'CNPJ',
    placeholder = "xx.xxx.xxx/xxxx-xx",
)

# email
email = pn.widgets.TextInput(
    name = 'Email',
    placeholder = "exemplo@dominio.com",
)

# cep
cep = pn.widgets.TextInput(
    name = 'CEP',
    placeholder = "xxxxxx-xxx",
)

# estado
estados_brasileiros = [
    'AC', 'AL', 'AP', 'AM', 'BA', 'CE', 'DF', 'ES', 'GO', 'MA', 
    'MT', 'MS', 'MG', 'PA', 'PB', 'PR', 'PE', 'PI', 'RJ', 'RN', 
    'RS', 'RO', 'RR', 'SC', 'SP', 'SE', 'TO'
]

estado = pn.widgets.AutocompleteInput(
    name = 'Estado',
    options = estados_brasileiros,
    placeholder = "Digite o estado (Ex: CE)",
    case_sensitive = False,
    search_strategy = 'includes'
)

# cidade
cidade = pn.widgets.TextInput(
    name = 'Cidade',
    placeholder = "Digite a cidade",
)

# bairro
bairro = pn.widgets.TextInput(
    name = 'Bairro',
    placeholder = 'Digite o bairro',
)

# rua 
rua = pn.widgets.TextInput(
    name = 'Rua',
    placeholder = "Digite a rua",
)

# numero
numero = pn.widgets.TextInput(
    name = 'Número',
    placeholder = "Digite o número",
)

# tipo de instituição
tipo_instituicao = pn.widgets.RadioBoxGroup(
name = 'Tipo de Instituição',
options = ['Empresa', 'Universidade'],
inline = True # lado a lado
)

# campo Empresa
setor_atividade = pn.widgets.TextInput(
    name = 'Setor de Atividade',
    placeholder = "Digite o setor de atividade",
)

# campo Universidade
credenciamento_mec = pn.widgets.IntInput(
    name = 'Credenciamento e-MEC',
    start = 0,
    step = 1,
    placeholder = "",
    visible = True
)

# ID da instituição
id_instituicao = pn.widgets.IntInput(
    name = 'ID da Instituição',
    start = 0,
    step = 1,
    placeholder = "ID para exclusão/atualização",
)

# criação dos botões
buttonConsultar = pn.widgets.Button(name='Consultar', button_type='default')
buttonInserir = pn.widgets.Button(name='Inserir', button_type='default')
buttonExcluir = pn.widgets.Button(name='Excluir', button_type='default')
buttonAtualizar = pn.widgets.Button(name='Atualizar', button_type='default')


##### Consultas

In [None]:
# função de consulta
def on_consultar():
    try:
        termo_consulta = consulta_nome.value.strip() if consulta_nome.value else ''

        if termo_consulta:  # consulta com filtragem por nome
            query = """
                SELECT
                    i.id_instituicao,
                    i.nome_oficial,
                    i.cnpj,
                    i.email,
                    i.cep,
                    i.cidade,
                    i.estado,
                    CASE
                        WHEN e.id_instituicao IS NOT NULL THEN 'Empresa'
                        WHEN u.id_instituicao IS NOT NULL THEN 'Universidade'
                        ELSE 'Indefinido'
                    END as tipo
                FROM instituicao i
                LEFT JOIN empresa e ON i.id_instituicao = e.id_instituicao
                LEFT JOIN universidade u ON i.id_instituicao = u.id_instituicao
                WHERE i.nome_oficial ILIKE %s
                ORDER BY i.id_instituicao DESC
            """
            df = pd.read_sql_query(query, engine, params=(f'%{termo_consulta}%',))

            if df.empty:
                return pn.pane.Markdown(f'**Nenhuma instituição encontrada com termo:** "{termo_consulta}"')
            
            pn.state.notifications.success(f'{len(df)} instituição(ões) encontrada(s)')

        else:  # consulta SEM filtragem (todas as instituições)
            query = """
                SELECT
                    i.id_instituicao,
                    i.nome_oficial,
                    i.cnpj,
                    i.email,
                    i.cep,
                    i.rua,
                    i.numero,
                    i.bairro,
                    i.cidade,
                    i.estado,
                    e.setor_atividade,
                    u.credenciamento_mec,
                    CASE
                        WHEN e.id_instituicao IS NOT NULL THEN 'Empresa'
                        WHEN u.id_instituicao IS NOT NULL THEN 'Universidade'
                        ELSE 'Indefinido'
                    END as tipo
                FROM instituicao i
                LEFT JOIN empresa e ON i.id_instituicao = e.id_instituicao
                LEFT JOIN universidade u ON i.id_instituicao = u.id_instituicao
                ORDER BY i.id_instituicao DESC
            """
            df = pd.read_sql_query(query, engine)

            if df.empty:
                return pn.pane.Markdown('**Nenhuma instituição cadastrada.**')


        return pn.widgets.Tabulator(
            df,  
            page_size = 15,
            pagination ='local',
            layout = 'fit_columns',
            sizing_mode = 'stretch_width'
        )

    except Exception as e:
        pn.state.notifications.error(f'Erro ao consultar: {str(e)}')
        return pn.pane.Alert(f'Erro: {str(e)}', alert_type='danger')

##### Inserção

In [None]:
# função para inserção
def on_inserir():
    cursor = None
    try:
        # validação dos campos obrigatórios
        campos_obrigatorios = {
            'Nome Oficial': nome_oficial.value,
            'CNPJ': cnpj.value,
            'Email': email.value,
            'Tipo de Instituição': tipo_instituicao.value
        }

        valido, mensagem = validar_obrigatorios(campos_obrigatorios)
        if not valido:
            pn.state.notifications.warning(mensagem)
            return on_consultar()
        
        # validações de formato
        if not validar_cnpj(cnpj.value):
            pn.state.notifications.warning('CNPJ inválido! Use o formato XX.XXX.XXX/XXXX-XX')
            return on_consultar()
        
        if not validar_email(email.value):
            pn.state.notifications.warning('Email inválido!')
            return on_consultar()
        
        # Validações específicas por tipo
        if tipo_instituicao.value == 'Empresa':
            if not setor_atividade.value or setor_atividade.value.strip() == '':
                pn.state.notifications.warning('Preencha o Setor de Atividade!')
                return on_consultar()
        
        elif tipo_instituicao.value == 'Universidade':
            if not credenciamento_mec.value or credenciamento_mec.value <= 0:
                pn.state.notifications.warning('Preencha o Credenciamento e-MEC!')
                return on_consultar()
        
        cursor = con.cursor()

        # inserção na tabela instituicao
        
        cursor.execute("""
            INSERT INTO instituicao 
            (nome_oficial, email, cnpj, cidade, bairro, numero, rua, estado, cep)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
            RETURNING id_instituicao
        """, (
            nome_oficial.value.strip(),
            email.value.strip(),
            cnpj.value.strip(),
            cidade.value.strip(),
            bairro.value.strip() if bairro.value else None,
            numero.value.strip() if numero.value else None,
            rua.value.strip() if rua.value else None,
            estado.value if estado.value else None,
            cep.value.strip() if cep.value else None
        ))

        id_inst = cursor.fetchone()[0]
        
        # Inserir na tabela específica
        if tipo_instituicao.value == 'Empresa':
            cursor.execute("""
                INSERT INTO empresa (id_instituicao, setor_atividade)
                VALUES (%s, %s)
            """, (id_inst, setor_atividade.value.strip()))
        
        elif tipo_instituicao.value == 'Universidade':
            cursor.execute("""
                INSERT INTO universidade (id_instituicao, credenciamento_mec)
                VALUES (%s, %s)
            """, (id_inst, credenciamento_mec.value))
        
        con.commit()
        pn.state.notifications.success(f'Instituição inserida com sucesso! ID: {id_inst}')
        return on_consultar()
    
    except Exception as e:
        if con:
            con.rollback()
        pn.state.notifications.error(f'Erro ao inserir: {str(e)}')
        return on_consultar()
    
    finally:
        if cursor:
            cursor.close()

##### Exclusão

In [None]:
# função para exclusão - utiliza o id da instituição
def on_excluir():
    cursor = None
    try:
        # validação do ID
        if not id_instituicao.value or id_instituicao.value <= 0:
            pn.state.notifications.error('Preencha o ID da instituição!')
            return on_consultar()
        
        cursor = con.cursor()
        
        # verifica a existência do ID
        cursor.execute(
            "SELECT nome_oficial FROM instituicao WHERE id_instituicao = %s",
            (id_instituicao.value,)
        )

        resultado = cursor.fetchone()
        if not resultado:
            pn.state.notifications.error('ID não encontrado!')
            cursor.close()
            return on_consultar()
        
        nome = resultado[0]
        
        # remover dependências nas tabelas
        cursor.execute("DELETE FROM empresa WHERE id_instituicao = %s", (id_instituicao.value,))
        cursor.execute("DELETE FROM universidade WHERE id_instituicao = %s", (id_instituicao.value,))
    
        # remover instituicao
        cursor.execute("DELETE FROM instituicao WHERE id_instituicao = %s", (id_instituicao.value,))

        con.commit()
        pn.state.notifications.success('Instituição excluída com sucesso!')
        return on_consultar()
    
    except Exception as e:
        if con:
            con.rollback()
        pn.state.notifications.error(f'Erro ao excluir instituição: {e}')
        return on_consultar()
    
    finally:
        if cursor:
            cursor.close()  

##### Atualização

In [None]:
# função para atualização
def on_atualizar():
    cursor = None
    try:
        # validação do ID
        if not id_instituicao.value or id_instituicao.value <= 0:
            pn.state.notifications.error('Preencha o ID da instituição!')
            return on_consultar()

        cursor = con.cursor()

        # verifica se o ID existe
        # erro consertado
        cursor.execute(
            "SELECT email, cnpj FROM instituicao WHERE id_instituicao = %s",
            (id_instituicao.value,)
            )
        # versão errada: está esperando receber dois valores quando só recebe um
        # cursor.execute(
            #"SELECT 1 FROM instituicao WHERE id_instituicao = %s",
            #(id_instituicao.value,)
            #)
        #
        registro_atual = cursor.fetchone()
        if not registro_atual:
            pn.state.notifications.error('ID não encontrado!')
            return on_consultar()

        email_atual, cnpj_atual = registro_atual

        # validação dos campos obrigatórios
        campos_obrigatorios = {
            'Nome Oficial': nome_oficial.value,
            'CNPJ': cnpj.value,
            'Email': email.value,
            'Tipo de Instituição': tipo_instituicao.value
        }

        valido, mensagem = validar_obrigatorios(campos_obrigatorios)
        if not valido:
            pn.state.notifications.warning(mensagem)
            return on_consultar()

        # validações de formato
        if not validar_cnpj(cnpj.value):
            pn.state.notifications.warning('CNPJ inválido!')
            return on_consultar()

        if not validar_email(email.value):
            pn.state.notifications.warning('Email inválido!')
            return on_consultar()

        #validações de duplicidade (unique)

        # email
        if email.value.strip() != email_atual:
            cursor.execute("""
                SELECT 1
                FROM instituicao
                WHERE email = %s
                  AND id_instituicao <> %s
            """, (
                email.value.strip(),
                id_instituicao.value
            ))

            if cursor.fetchone():
                pn.state.notifications.warning('Este email já está sendo usado')
                return on_consultar()

        # cnpj
        if cnpj.value.strip() != cnpj_atual:
            cursor.execute("""
                SELECT 1
                FROM instituicao
                WHERE cnpj = %s
                  AND id_instituicao <> %s
            """, (
                cnpj.value.strip(),
                id_instituicao.value
            ))

            if cursor.fetchone():
                pn.state.notifications.warning('Este CNPJ já está sendo usado')
                return on_consultar()

        # validações específicas por tipo
        if tipo_instituicao.value == 'Empresa':
            if not setor_atividade.value or setor_atividade.value.strip() == '':
                pn.state.notifications.warning('Preencha o Setor de Atividade!')
                return on_consultar()

        elif tipo_instituicao.value == 'Universidade':
            if not credenciamento_mec.value or credenciamento_mec.value <= 0:
                pn.state.notifications.warning('Preencha o Credenciamento e-MEC!')
                return on_consultar()

        # update da tabela principal
        cursor.execute("""
            UPDATE instituicao
            SET nome_oficial = %s,
                email = %s,
                cnpj = %s,
                cidade = %s,
                bairro = %s,
                numero = %s,
                rua = %s,
                estado = %s,
                cep = %s
            WHERE id_instituicao = %s
        """, (
            nome_oficial.value.strip(),
            email.value.strip(),
            cnpj.value.strip(),
            cidade.value.strip() if cidade.value else None,
            bairro.value.strip() if bairro.value else None,
            numero.value.strip() if numero.value else None,
            rua.value.strip() if rua.value else None,
            estado.value if estado.value else None,
            cep.value.strip() if cep.value else None,
            id_instituicao.value
        ))

        # atualização do tipo de instituição
        cursor.execute(
            "DELETE FROM empresa WHERE id_instituicao = %s",
            (id_instituicao.value,)
        )

        cursor.execute(
            "DELETE FROM universidade WHERE id_instituicao = %s",
            (id_instituicao.value,)
        )

        if tipo_instituicao.value == 'Empresa':
            cursor.execute("""
                INSERT INTO empresa (id_instituicao, setor_atividade)
                VALUES (%s, %s)
            """, (
                id_instituicao.value,
                setor_atividade.value.strip()
            ))

        elif tipo_instituicao.value == 'Universidade':
            cursor.execute("""
                INSERT INTO universidade (id_instituicao, credenciamento_mec)
                VALUES (%s, %s)
            """, (
                id_instituicao.value,
                credenciamento_mec.value
            ))

        con.commit()
        pn.state.notifications.success('Instituição atualizada com sucesso!')
        return on_consultar()

    except Exception as e:
        if con:
            con.rollback()
        pn.state.notifications.error(f'Erro ao atualizar instituição: {e}')
        return on_consultar()

    finally:
        if cursor:
            cursor.close()

#### Interface e Gráfico

In [None]:
button_gerar_grafico = pn.widgets.Button(
    name = 'Gerar Gráfico - Instituições por Estado',
    button_type = 'default', width = 300)

In [None]:
# função consulta por agregação
def get_instituicoes_por_estado():
    query = """
        SELECT estado, COUNT(*) AS total
        FROM instituicao
        WHERE estado IS NOT NULL AND estado != ''
        GROUP BY estado
        ORDER BY total DESC;
    """
    return pd.read_sql_query(query, engine)

In [None]:
# gerar gráfico
def gerar_grafico(event=None):
    """
    Gera um gráfico de barras mostrando a quantidade de instituições por estado
    Utiliza consulta SQL com agregação (GROUP BY e COUNT)
    """
    try:
        # Busca os dados usando agregação SQL
        df = get_instituicoes_por_estado()
        
        if df.empty:
            pn.state.notifications.warning('Não há dados para gerar o gráfico')
            return pn.pane.Markdown('**Sem dados disponíveis para análise**')
        
        # Cria o gráfico de barras
        fig, ax = plt.subplots(figsize=(12, 6))
        
        bars = ax.bar(df['estado'], df['total'], 
                     color='steelblue', 
                     edgecolor='navy', 
                     alpha=0.7)
        
        ax.set_xlabel('Estado', fontsize=12, fontweight='bold')
        ax.set_ylabel('Quantidade de Instituições', fontsize=12, fontweight='bold')
        ax.set_title('Total de Instituições por Estado', fontsize=14, fontweight='bold', pad=20)
        
        plt.xticks(rotation=45, ha='right')
        ax.grid(axis='y', alpha=0.3, linestyle='--')
        
        # Adiciona valores no topo das barras
        for bar in bars:
            height = bar.get_height()
            ax.text(bar.get_x() + bar.get_width()/2., height,
                   f'{int(height)}',
                   ha='center', va='bottom', 
                   fontsize=10, fontweight='bold')
        
        plt.tight_layout()
        
        pn.state.notifications.success(f'Gráfico gerado! Total: {len(df)} estados')
        
        return pn.Column(
            pn.pane.Matplotlib(fig, tight=True, sizing_mode='stretch_width'),
            pn.pane.Markdown('### Dados da Consulta com Agregação (GROUP BY estado)'),
            pn.widgets.Tabulator(
                df,
                layout='fit_columns',
                sizing_mode='stretch_width',
                page_size=15,
                titles={'estado': 'Estado', 'total': 'Total de Instituições'}
            )
        )
        
    except Exception as e:
        pn.state.notifications.error(f'Erro ao gerar gráfico: {str(e)}')
        return pn.pane.Alert(f'Erro: {str(e)}', alert_type='danger')


In [None]:
# chama a função apropriada dependendo do botão clicado.
def table_creator(cons, ins, atu, exc):

    if cons:
        return on_consultar()
    if ins:
        return on_inserir()
    if atu:
        return on_atualizar()
    if exc:
        return on_excluir()

interactive_table = pn.bind(table_creator,
                             buttonConsultar.param.clicks,
                             buttonInserir.param.clicks,
                             buttonAtualizar.param.clicks,
                             buttonExcluir.param.clicks
                             )

# bind do botão gerar gráfico
interactive_grafico = pn.bind(gerar_grafico, button_gerar_grafico.param.clicks)

##### Montagem do Layout

In [None]:
sidebar_crud = pn.Column(
    consulta_nome,
    pn.pane.Markdown('Digite o nome'),
    buttonConsultar,

    pn.layout.Divider(), # divisao dos dados basicos
    nome_oficial,
    cnpj,
    email,

    pn.layout.Divider(), # divisao dos dados de endereco
    rua,
    numero,
    bairro,
    cidade,
    estado,
    cep,

    pn.layout.Divider(), # divisao do tipo de instituicao
    tipo_instituicao,
    setor_atividade,
    credenciamento_mec,     

    pn.layout.Divider(),
    id_instituicao,
    pn.pane.Markdown('Digite o ID para exclusão/atualização'),

    pn.layout.Divider(),
    pn.Row(
        buttonInserir,
        buttonAtualizar,
        buttonExcluir
    ),
    width = 350
)   

main_area_crud = pn.Column(
    pn.pane.Markdown('Use o campo de busca para filtra por nome'),
    interactive_table,
    sizing_mode = 'stretch_both'
)

# Sidebar para Análise
sidebar_analise = pn.Column(
    '## Gráfico de Análise',
    pn.pane.Markdown('**Consulta com Agregação SQL**'),

    pn.layout.Divider(),
    
    pn.pane.Markdown(
        """
        Este gráfico apresenta a **quantidade total de instituições 
        cadastradas agrupadas por estado**.
        """,
        styles={'font-size': '12px', 'background': '#f0f0f0', 'padding': '15px', 'border-radius': '5px'}
    ),
    
    pn.layout.Divider(),
    button_gerar_grafico,
    width=400
)

main_analise = pn.Column(
    pn.pane.Markdown('**Gráfico gerado a partir de consulta SQL com agregação (GROUP BY)**'),
    interactive_grafico,
    sizing_mode = 'stretch_both'
)

# criar abas
tabs = pn.Tabs(
    ('CRUD - Gerenciamento', pn.Row(sidebar_crud, main_area_crud)),
    ('Gráfico', pn.Row(sidebar_analise, main_analise)),
    dynamic = True
)

# layout final (finalmente)
layout = pn.template.FastListTemplate(
    title = 'Gerenciamento Tabela de Instituições',
    main = [tabs],
    header_background = '#1f77b4'
)

layout.servable()