# Criação de página web para cadastro dos agentes de Vigilância Sanitária

## Criação dos bancos de dados e back-end

In [1]:
#Instalação de bibliotecas necessárias
#!pip install python-multipart
#!pip install nest_asyncio
#!pip install pydantic[email]

In [2]:
# Importações
from fastapi import FastAPI, Depends, HTTPException, status, Request, Query, APIRouter
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy import create_engine, Column, String, DateTime, ForeignKey, and_, Integer, select, func, engine_from_config, pool, JSON
from sqlalchemy.orm import sessionmaker, declarative_base, relationship
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.future import select
from datetime import datetime, timedelta, date
from typing import Optional, Annotated, List, AsyncGenerator
from pydantic import BaseModel, field_validator, EmailStr, constr
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.future import select
from rotas_opcoes import router_opcoes
import threading
from threading import Thread
import ssl
import asyncio
import nest_asyncio
import uuid
import os
import uvicorn
import pandas as pd


In [3]:
# Configurações
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://neondb_owner:npg_RaZNeP4EuFT8@ep-ancient-rice-a8mud72e-pooler.eastus2.azure.neon.tech/neondb?")
SECRET_KEY = os.getenv("SECRET_KEY", "chave-padrao-insegura")
ALGORITHM = os.getenv("ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("TOKEN_EXPIRE", 60))

In [4]:
# Inicar sessão para criação de banco de dados assíncrono no Neon
engine = create_async_engine(DATABASE_URL, connect_args={"ssl": "require"}, echo=True)
async_session = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
    async with async_session() as session:
        yield session
Base = declarative_base()

  Base = declarative_base()


In [5]:
# Auth
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login") # usar tokens JWT em rotas protegidas

### Descrição dos bancos de dados

In [6]:
# Banco de dados de cadastro dos usuários
class Usuario(Base):
    __tablename__ = "cadastro_usuarios"
    id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
    nome = Column(String)
    email = Column(String, unique=True)
    senha = Column(String)
    criado_em = Column(DateTime, default=datetime.utcnow)
    agentes = relationship("Agente", back_populates="usuario")


In [7]:
# Banco de dados de cadastro dos agentes 
class Agente(Base):
    __tablename__ = "agentes"
    id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
    usuario_id = Column(String, ForeignKey("cadastro_usuarios.id"))
    usuario = relationship("Usuario", back_populates="agentes")

    nu_cpf = Column(String, unique=True, nullable=False)
    nu_cns_cnes = Column(String, nullable=False)
    co_especialidade = Column(JSON)
    co_nivel_escolaridade = Column(String, nullable=False)
    co_capacitacao = Column(JSON)
    co_cbo_cnes = Column(JSON)
    ds_vinculo_empregaticio = Column(String, nullable=False)
    ds_cargo = Column(String, nullable=False)
    co_faixa_etaria = Column(String)
    co_genero = Column(String, nullable=False)
    co_unidade_vigilancia_sanitaria = Column(String, nullable=False)
    co_cep = Column(String, nullable=False)
    municipio = Column(String, nullable=False)
    regiao_saude = Column(String)
    crs = Column(String)
    macrorregiao = Column(String)
    criado_em = Column(DateTime, default=datetime.utcnow)
    atualizado_em = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

In [8]:
# Banco de dados de tentativas de login
class TentativaLogin(Base):
    __tablename__ = "tentativas_login"
    id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
    email = Column(String, index=True)
    tentativas = Column(Integer, default=0)
    ultima_tentativa = Column(DateTime, default=datetime.utcnow)

### Validação de dados inseridos no banco "cadastro_usuarios"

In [9]:
# Validação dos dados inseridos no banco "cadastro_usuarios"
class UsuarioCreate(BaseModel):
    nome: str
    email: EmailStr
    senha: str

In [10]:
# Inserção de informações que serão utilizados no retorno (GET) como resposta completa dos cadastros
class UsuarioOut(BaseModel):
    id: str
    nome: str
    email: EmailStr

    class Config:
        from_attributes = True

### Validação de dados inseridos no banco "tentativas_login"

In [11]:
# Modelo Pydantic (para receber dados do frontend)
class LoginSchema(BaseModel):
    email: EmailStr
    senha: str 

### Validação de dados inseridos no banco "agentes"

In [12]:
# Validação dos dados inseridos no banco "agentes"
class AgenteCreate(BaseModel):
    nu_cpf: str
    data_nascimento: date
    nu_cns_cnes: str
    co_especialidade: List[int]
    co_nivel_escolaridade: str
    co_capacitacao: List[int]
    co_cbo_cnes: List[int]
    ds_vinculo_empregaticio: str
    ds_cargo: str
    co_faixa_etaria: Optional[str] = None
    co_genero: str
    co_unidade_vigilancia_sanitaria: str
    co_cep: str
    municipio: Optional[str] = None
    regiao_saude: Optional[str] = None
    crs: Optional[str] = None
    macrorregiao: Optional[str] = None

In [13]:
# Função para calcular a idade a partir da data de nascimento utilizando o Pydantic Validator (@model_validator)
@field_validator('co_faixa_etaria', mode='before')
@classmethod
def calcular_idade_em_anos(cls, v, values):
        data_nascimento = values.get('data_nascimento')
        if data_nascimento:
            hoje = date.today()
            idade = hoje.year - data_nascimento.year
            if (hoje.month, hoje.day) < (data_nascimento.month, data_nascimento.day):
                idade -= 1  # ainda não fez aniversário este ano
            return f"{idade} anos"
        return v

In [14]:
# Inserção de informações que serão utilizados no retorno (GET) como resposta completa dos agentes
class AgenteOut(AgenteCreate):
    id: str
    criado_em: datetime
    atualizado_em: datetime

    class Config:
        from_attributes = True

In [15]:
# Validação para recebimento de filtros para busca de agentes
class FiltroAgente(BaseModel):
    municipio: Optional[str] = None
    co_especialidade: Optional[str] = None
    co_genero: Optional[str] = None
    nu_cpf: Optional[str] = None
    order_by: Optional[str] = "criado_em"
    order_dir: Optional[str] = "desc"

### Inicializar os bancos de dados assíncronos no Neon

In [16]:
# Inicializa o banco
async def criar_tabelas():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

await criar_tabelas()

2025-08-06 20:43:25,941 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2025-08-06 20:43:25,941 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-08-06 20:43:26,663 INFO sqlalchemy.engine.Engine select current_schema()
2025-08-06 20:43:26,663 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-08-06 20:43:27,316 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2025-08-06 20:43:27,318 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-08-06 20:43:27,861 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-08-06 20:43:27,867 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = $1::VARCHAR AND pg_catalog.pg_class.relkind = ANY (ARRAY[$2::VARCHAR, $3::VARCHAR, $4::VARCHAR, $5::VARCHAR, $6::VARCHAR]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != $7::VARCHAR


### Criação da API e limitação de requisições

In [17]:
# App FastAPI
app = FastAPI()

# Configurar o CORS (Cross-Origin Resource Sharing)

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # ou "*" pra permitir tudo (somente dev)
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"])

In [18]:
router = APIRouter()

In [19]:
# Limitação de requisições
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter

In [20]:
# Redirecionar todos os acessos para HTTPS (só na produção)
if os.getenv("ENV") == "production":
    app.add_middleware(HTTPSRedirectMiddleware)

In [21]:
# Tratamento de erro de limite de requisições
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
    return JSONResponse(status_code=429, content={"detail": "Muitas tentativas, tente novamente mais tarde."})

### Auxiliares de autenticação

In [22]:
# Abre uma sessão assíncrona com o banco de dados.
async def get_db() -> AsyncSession:
    async with async_session() as session:
        yield session

In [23]:
# Hash de senha
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # criptografar/verificar senhas

# Cria um hash de uma senha de forma segura.
def criar_hash_senha(senha: str) -> str:
    return pwd_context.hash(senha)

In [24]:
# Compara uma senha enviada pelo usuário com o hash da senha armazenado no banco
def verificar_senha(senha, hash):
    return pwd_context.verify(senha, hash)

In [25]:
# Gera um token JWT para o usuário, incluindo o conteúdo de data + uma data de expiração (exp).
def criar_token(data: dict, expira_min=15):
    dados = data.copy()
    expira = datetime.utcnow() + timedelta(minutes=expira_min)
    dados.update({"exp": expira})
    return jwt.encode(dados, SECRET_KEY, algorithm=ALGORITHM)

In [26]:
# Consulta o banco buscando um Usuario pelo e-mail.
async def obter_usuario_email(db: AsyncSession, email: str):
    result = await db.execute(select(Usuario).where(Usuario.email == email))
    return result.scalar_one_or_none()

In [27]:
# Decodifica o token JWT enviado pelo front-end e busca no banco o usuário correspondente.
# Se o token for inválido, expirado ou não tiver o e-mail, dispara um erro 401 Unauthorized.
async def obter_usuario_token(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_async_session)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        email = payload.get("sub")
        if email is None:
            raise HTTPException(status_code=401)
        usuario = await obter_usuario_email(db, email)
        if usuario is None:
            raise HTTPException(status_code=401)
        return usuario
    except JWTError:
        raise HTTPException(status_code=401)

In [28]:
# Garante que apenas usuários autenticados possam acessar a rota /perfil
@router.get("/perfil")
async def perfil(usuario: Usuario = Depends(obter_usuario_token)):
    return {"nome": usuario.nome}

### Rotas de autenticação

In [29]:
# Renovação do token
@app.post("/auth/refresh")
async def renovar_token(refresh_token: str, db: AsyncSession = Depends(get_async_session)):
    try:
        payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
        email = payload.get("sub")
        if email is None:
            raise HTTPException(status_code=401)

        result = await db.execute(select(Usuario).filter(Usuario.email == email))
        usuario = result.scalars().first()
        if usuario is None:
            raise HTTPException(status_code=401)

        novo_token = criar_token({"sub": usuario.email}, expira_min=15)
        return {"access_token": novo_token, "token_type": "bearer"}
    except JWTError:
        raise HTTPException(status_code=401, detail="Token inválido ou expirado")

### Rotas de manipulação dos dados

In [30]:
# Configuração para cadastro de usuários
@router.post("/auth/cadastrar", response_model=UsuarioOut)
async def cadastrar_usuario(usuario: UsuarioCreate, db: AsyncSession = Depends(get_async_session)):
    try:
        resultado = await db.execute(select(Usuario).where(Usuario.email == usuario.email))
        usuario_existente = resultado.scalar_one_or_none()

        if usuario_existente:
            raise HTTPException(status_code=400, detail="Email já cadastrado")

        novo_usuario = Usuario(
            id=str(uuid.uuid4()),
            nome=usuario.nome,
            email=usuario.email,
            senha=criar_hash_senha(usuario.senha),
            criado_em=datetime.utcnow()
        )

        db.add(novo_usuario)
        await db.commit()
        await db.refresh(novo_usuario)

        return UsuarioOut(
            id=novo_usuario.id,
            nome=novo_usuario.nome,
            email=novo_usuario.email
        )
    except Exception as e:
        print("❌ ERRO AO CADASTRAR:", e)
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="Erro interno ao cadastrar usuário"
        )

In [31]:
# Autenticação de login
MAX_TENTATIVAS = 5
BLOQUEIO_MINUTOS = 15

@router.post("/auth/login")
async def login(dados: LoginSchema, db: AsyncSession = Depends(get_async_session)):
    # Verifica tentativas anteriores
    resultado = await db.execute(select(TentativaLogin).where(TentativaLogin.email == dados.email))
    tentativa = resultado.scalars().first()

    if tentativa:
        tempo_desde_ultima = datetime.utcnow() - tentativa.ultima_tentativa
        if tentativa.tentativas >= MAX_TENTATIVAS and tempo_desde_ultima < timedelta(minutes=BLOQUEIO_MINUTOS):
            raise HTTPException(status_code=429, detail="Muitas tentativas. Tente novamente mais tarde.")

    # Verifica credenciais
    resultado = await db.execute(select(Usuario).where(Usuario.email == dados.email))
    usuario = resultado.scalars().first()

    if not usuario or not verificar_senha(dados.senha, usuario.senha):
        if not tentativa:
            tentativa = TentativaLogin(email=dados.email, tentativas=1, ultima_tentativa=datetime.utcnow())
            db.add(tentativa)
        else:
            tentativa.tentativas += 1
            tentativa.ultima_tentativa = datetime.utcnow()
        await db.commit()
        raise HTTPException(status_code=401, detail="Credenciais inválidas")

    # Login válido
    if tentativa:
        await db.delete(tentativa)
        await db.commit()

    access_token = criar_token({"sub": usuario.email}, expira_min=15)
    refresh_token = criar_token({"sub": usuario.email}, expira_min=60 * 24 * 7)
    await db.commit()

    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer"
    }

In [32]:
app.include_router(router, tags=["Autenticação"])

In [33]:
# Configuração da rota para obter dados do município 
@router.get("/municipio_info/{municipio}")
async def get_municipio_info(municipio: str, db: AsyncSession = Depends(get_async_session)):
    info = await db.execute(
        select(MunicipioInfo).where(MunicipioInfo.municipio == municipio)
    )
    info = info.scalar_one_or_none()
    if not info:
        raise HTTPException(status_code=404, detail="Município não encontrado")
    
    return {
        "municipio": info.municipio,
        "regiao_saude": info.regiao_saude,
        "crs": info.crs,
        "macrorregiao": info.macrorregiao
    }

In [34]:
@app.post("/agentes", response_model=AgenteOut)
@limiter.limit("10/minute")
async def criar_agente(
    request: Request,
    agente_in: AgenteCreate,
    usuario: Usuario = Depends(obter_usuario_token),
    db: AsyncSession = Depends(get_async_session),
):
    faixa_etaria_calculada = calcular_idade_em_anos(agente_in.data_nascimento)

    info = await db.execute(
        select(MunicipioInfo).where(MunicipioInfo.municipio == agente_in.municipio)
    )
    info = info.scalar_one_or_none()
    if not info:
        raise HTTPException(status_code=404, detail="Município não encontrado")

    novo_agente = Agente(
        usuario_id=usuario.id,
        nu_cpf=agente_in.nu_cpf,
        nu_cns_cnes=agente_in.nu_cns_cnes,
        co_especialidade=agente_in.co_especialidade,
        co_nivel_escolaridade=agente_in.co_nivel_escolaridade,
        co_capacitacao=agente_in.co_capacitacao,
        co_cbo_cnes=agente_in.co_cbo_cnes,
        ds_vinculo_empregaticio=agente_in.ds_vinculo_empregaticio,
        ds_cargo=agente_in.ds_cargo,
        co_faixa_etaria=faixa_etaria_calculada,
        co_genero=agente_in.co_genero,
        co_unidade_vigilancia_sanitaria=agente_in.co_unidade_vigilancia_sanitaria,
        co_cep=agente_in.co_cep,
        municipio=agente_in.municipio,
        regiao_saude=info.regiao_saude,
        crs=info.crs,
        macrorregiao=info.macrorregiao,
    )

    db.add(novo_agente)
    await db.commit()
    await db.refresh(novo_agente)
    return novo_agente

In [35]:
# Listagem com filtros
@app.post("/agentes/filtrar")
async def listar_agentes_filtrados(
    filtros: FiltroAgente,
    pagina: int = Query(1, ge=1),
    tamanho: int = Query(10, ge=1),
    usuario: Usuario = Depends(obter_usuario_token),
    db: AsyncSession = Depends(get_async_session)
):
    query_base = select(Agente).where(Agente.usuario_id == usuario.id)
    condicoes = []

    if filtros.municipio:
        condicoes.append(Agente.municipio.ilike(f"%{filtros.municipio}%"))
    if filtros.co_especialidade:
        condicoes.append(Agente.co_especialidade == filtros.co_especialidade)
    if filtros.co_genero:
        condicoes.append(Agente.co_genero == filtros.co_genero)
    if filtros.nu_cpf:
        condicoes.append(Agente.nu_cpf == filtros.nu_cpf)

    if condicoes:
        query_base = query_base.where(*condicoes)

    ordenaveis = {
        "municipio": Agente.co_unidade_vigilancia_sanitaria,
        "criado_em": Agente.criado_em,
        "nome": Agente.nu_cpf,  # Ajuste se tiver campo real de nome
        "cpf": Agente.nu_cpf
    }

    campo_ordenacao = ordenaveis.get(filtros.order_by, Agente.criado_em)
    if filtros.order_dir == "asc":
        query_base = query_base.order_by(campo_ordenacao.asc())
    else:
        query_base = query_base.order_by(campo_ordenacao.desc())

    total = await db.scalar(select(func.count()).select_from(query_base.subquery()))
    resultado = await db.execute(
        query_base.offset((pagina - 1) * tamanho).limit(tamanho)
    )
    agentes = resultado.scalars().all()

    return {
        "total": total,
        "pagina": pagina,
        "tamanho": tamanho,
        "itens": agentes
    }

In [36]:
# Listagem com filtros via URL
@app.get("/agentes")
async def listar_agentes(
    filtro: Annotated[FiltroAgente, Depends()],
    skip: int = 0,
    limit: int = 10,
    usuario: Usuario = Depends(obter_usuario_token),
    db: AsyncSession = Depends(get_async_session)
):
    query = select(Agente).where(Agente.usuario_id == usuario.id)

    if filtro.nu_cpf:
        query = query.where(Agente.nu_cpf == filtro.nu_cpf)
    if filtro.municipio:
        query = query.where(Agente.municipio == filtro.municipio)
    if filtro.co_genero:
        query = query.where(Agente.co_genero == filtro.co_genero)
    if filtro.co_especialidade:
        query = query.where(Agente.co_especialidade == filtro.co_especialidade)

    total_query = await db.execute(
        select(func.count()).select_from(query.subquery())
    )
    total = total_query.scalar()

    order_col = getattr(Agente, filtro.order_by, Agente.criado_em)
    order_func = order_col.desc() if filtro.order_dir == "desc" else order_col.asc()
    query = query.order_by(order_func).offset(skip).limit(limit)

    result = await db.execute(query)
    agentes = result.scalars().all()

    return {
        "total": total,
        "page": (skip // limit) + 1,
        "per_page": limit,
        "data": agentes
    }

In [37]:
# Inclui também o router de opções, se já existir
app.include_router(router_opcoes, prefix="/opcoes", tags=["Opções"])

### Iniciar servidor

In [38]:
# Garantir a ausência de conflitos de loop de eventos
nest_asyncio.apply()

# Iniciar o servidor Uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)

INFO:     Started server process [23344]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


2025-08-06 20:43:44,806 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-08-06 20:43:44,813 INFO sqlalchemy.engine.Engine SELECT cadastro_usuarios.id, cadastro_usuarios.nome, cadastro_usuarios.email, cadastro_usuarios.senha, cadastro_usuarios.criado_em 
FROM cadastro_usuarios 
WHERE cadastro_usuarios.email = $1::VARCHAR
2025-08-06 20:43:44,814 INFO sqlalchemy.engine.Engine [generated in 0.00102s] ('lucasmachado@hotmail.com',)
2025-08-06 20:43:45,536 INFO sqlalchemy.engine.Engine INSERT INTO cadastro_usuarios (id, nome, email, senha, criado_em) VALUES ($1::VARCHAR, $2::VARCHAR, $3::VARCHAR, $4::VARCHAR, $5::TIMESTAMP WITHOUT TIME ZONE)
2025-08-06 20:43:45,536 INFO sqlalchemy.engine.Engine [generated in 0.00097s] ('efde432d-2e45-41ea-80f0-1cf2a4631540', 'Lucas Machado', 'lucasmachado@hotmail.com', '$2b$12$F2f/CZQMKFoFGowv3dZqVewe37llHjJG5Q9xEaK4OA/FbzDOosQ0i', datetime.datetime(2025, 8, 6, 23, 43, 45, 536718))


  criado_em=datetime.utcnow()


2025-08-06 20:43:45,837 INFO sqlalchemy.engine.Engine COMMIT
2025-08-06 20:43:46,011 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-08-06 20:43:46,013 INFO sqlalchemy.engine.Engine SELECT cadastro_usuarios.id, cadastro_usuarios.nome, cadastro_usuarios.email, cadastro_usuarios.senha, cadastro_usuarios.criado_em 
FROM cadastro_usuarios 
WHERE cadastro_usuarios.id = $1::VARCHAR
2025-08-06 20:43:46,014 INFO sqlalchemy.engine.Engine [generated in 0.00110s] ('efde432d-2e45-41ea-80f0-1cf2a4631540',)
2025-08-06 20:43:46,471 INFO sqlalchemy.engine.Engine ROLLBACK
INFO:     127.0.0.1:64615 - "POST /auth/cadastrar HTTP/1.1" 200 OK
INFO:     127.0.0.1:64621 - "POST /auth/login HTTP/1.1" 422 Unprocessable Entity
INFO:     127.0.0.1:64736 - "POST /auth/login HTTP/1.1" 422 Unprocessable Entity
INFO:     127.0.0.1:64740 - "POST /auth/login HTTP/1.1" 422 Unprocessable Entity


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [23344]
