In [1]:
"""
Servidor MCP para Recomenda√ß√£o de Filmes com RAG
Exp√µe ferramentas para o LLama 3 buscar filmes no PostgreSQL
"""

import asyncio
import logging
from typing import List, Dict, Any
import psycopg2
import numpy as np
from sentence_transformers import SentenceTransformer
from mcp.server import Server
from mcp.types import Tool, TextContent

# Configura√ß√£o de logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ============================================
# CONFIGURA√á√ïES
# ============================================

DB_CONFIG = {
    'dbname': 'filmes_rag',
    'user': 'postgres',
    'password': 'senha123',  # ‚Üê Altere se necess√°rio
    'host': 'localhost',
    'port': '5432'
}

MODELO_EMBEDDING = 'sentence-transformers/paraphrase-multilingual-mpnet-base-v2'

# ============================================
# GERENCIADOR DE RECURSOS
# ============================================

class RecursosRAG:
    """Gerencia conex√£o com banco e modelo de embeddings"""

    def __init__(self):
        self.conn = None
        self.cursor = None
        self.modelo = None

    def inicializar(self):
        """Inicializa recursos"""
        logger.info("üîå Conectando ao PostgreSQL...")
        self.conn = psycopg2.connect(**DB_CONFIG)
        self.cursor = self.conn.cursor()
        logger.info("‚úì Conectado ao PostgreSQL")

        logger.info("ü§ñ Carregando modelo de embeddings...")
        self.modelo = SentenceTransformer(MODELO_EMBEDDING)

        if self.modelo.device.type == 'cuda':
            logger.info("‚úì Modelo carregado na GPU")
        else:
            logger.info("‚úì Modelo carregado na CPU")

    def buscar_filmes(self, query: str, top_k: int = 3) -> List[Dict[str, Any]]:
        """
        Busca filmes usando busca vetorial sem√¢ntica

        Args:
            query: Descri√ß√£o do que o usu√°rio procura
            top_k: N√∫mero de resultados

        Returns:
            Lista de dicion√°rios com informa√ß√µes dos filmes
        """
        logger.info(f"üîç Buscando: '{query}'")

        # Gera embedding da query
        embedding_query = self.modelo.encode([query], convert_to_numpy=True)[0]

        # Busca no banco
        sql = """
        SELECT
            titulo,
            chunk_texto,
            chunk_index,
            1 - (vetor_embedding <=> %s::vector) as similaridade
        FROM filmes
        ORDER BY vetor_embedding <=> %s::vector
        LIMIT %s;
        """

        embedding_list = embedding_query.tolist()
        self.cursor.execute(sql, (embedding_list, embedding_list, top_k * 2))
        resultados_raw = self.cursor.fetchall()

        # Agrupa por filme e pega o melhor trecho de cada
        filmes_dict = {}
        for titulo, chunk, idx, sim in resultados_raw:
            if titulo not in filmes_dict:
                filmes_dict[titulo] = {
                    'titulo': titulo,
                    'sinopse': chunk[:500],  # Limita tamanho
                    'similaridade': sim,
                    'chunk_index': idx
                }

        # Pega os top_k filmes √∫nicos
        filmes_unicos = sorted(
            filmes_dict.values(),
            key=lambda x: x['similaridade'],
            reverse=True
        )[:top_k]

        logger.info(f"‚úì Encontrados {len(filmes_unicos)} filmes")

        return filmes_unicos

    def listar_filmes(self) -> List[str]:
        """
        Lista todos os filmes no banco de dados

        Returns:
            Lista de t√≠tulos dos filmes
        """
        logger.info("üìã Listando todos os filmes...")

        self.cursor.execute("""
            SELECT DISTINCT titulo
            FROM filmes
            ORDER BY titulo;
        """)

        titulos = [row[0] for row in self.cursor.fetchall()]
        logger.info(f"‚úì {len(titulos)} filmes no banco")

        return titulos

    def estatisticas(self) -> Dict[str, Any]:
        """
        Retorna estat√≠sticas do banco de dados

        Returns:
            Dicion√°rio com estat√≠sticas
        """
        logger.info("üìä Obtendo estat√≠sticas...")

        # Total de chunks
        self.cursor.execute("SELECT COUNT(*) FROM filmes;")
        total_chunks = self.cursor.fetchone()[0]

        # Total de filmes
        self.cursor.execute("SELECT COUNT(DISTINCT titulo) FROM filmes;")
        total_filmes = self.cursor.fetchone()[0]

        stats = {
            'total_filmes': total_filmes,
            'total_chunks': total_chunks,
            'media_chunks': round(total_chunks / total_filmes, 1) if total_filmes > 0 else 0
        }

        logger.info(f"‚úì Stats: {stats}")
        return stats

    def fechar(self):
        """Fecha conex√µes"""
        if self.cursor:
            self.cursor.close()
        if self.conn:
            self.conn.close()
        logger.info("‚úì Recursos liberados")


# ============================================
# SERVIDOR MCP
# ============================================

# Inst√¢ncia global dos recursos
recursos = RecursosRAG()

# Criar servidor MCP
server = Server("filmes-rag-server")

@server.list_tools()
async def list_tools() -> List[Tool]:
    """
    Lista as ferramentas dispon√≠veis para o LLM
    """
    return [
        Tool(
            name="buscar_filmes",
            description=(
                "Busca filmes no banco de dados usando busca sem√¢ntica. "
                "Use esta ferramenta quando o usu√°rio pedir recomenda√ß√µes de filmes, "
                "perguntar sobre filmes espec√≠ficos, ou buscar por g√™nero/tema. "
                "A busca entende linguagem natural."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": (
                            "Descri√ß√£o do que o usu√°rio procura. "
                            "Exemplos: 'filme de a√ß√£o', 'com√©dia rom√¢ntica', "
                            "'algo emocionante', 'filme com espionagem'"
                        )
                    },
                    "top_k": {
                        "type": "integer",
                        "description": "N√∫mero de filmes a retornar (padr√£o: 3)",
                        "default": 3,
                        "minimum": 1,
                        "maximum": 10
                    }
                },
                "required": ["query"]
            }
        ),
        Tool(
            name="listar_todos_filmes",
            description=(
                "Lista todos os filmes dispon√≠veis no banco de dados. "
                "Use quando o usu√°rio perguntar 'quais filmes voc√™ tem?', "
                "'me mostre todos os filmes', ou similar."
            ),
            inputSchema={
                "type": "object",
                "properties": {}
            }
        ),
        Tool(
            name="estatisticas_banco",
            description=(
                "Retorna estat√≠sticas sobre o banco de filmes: "
                "quantos filmes est√£o indexados, total de trechos, etc. "
                "Use quando o usu√°rio perguntar sobre o tamanho do banco."
            ),
            inputSchema={
                "type": "object",
                "properties": {}
            }
        )
    ]


@server.call_tool()
async def call_tool(name: str, arguments: dict) -> List[TextContent]:
    """
    Executa uma ferramenta quando chamada pelo LLM

    Args:
        name: Nome da ferramenta
        arguments: Argumentos da ferramenta

    Returns:
        Resultado da ferramenta em formato TextContent
    """
    logger.info(f"üîß Ferramenta chamada: {name}")
    logger.info(f"   Argumentos: {arguments}")

    try:
        if name == "buscar_filmes":
            query = arguments.get("query", "")
            top_k = arguments.get("top_k", 3)

            if not query:
                return [TextContent(
                    type="text",
                    text="Erro: query n√£o pode ser vazia"
                )]

            # Busca filmes
            filmes = recursos.buscar_filmes(query, top_k)

            if not filmes:
                return [TextContent(
                    type="text",
                    text="Nenhum filme encontrado para esta busca."
                )]

            # Formata resultado
            resultado = f"Encontrados {len(filmes)} filmes para '{query}':\n\n"

            for i, filme in enumerate(filmes, 1):
                resultado += f"{i}. **{filme['titulo']}**\n"
                resultado += f"   Relev√¢ncia: {filme['similaridade']:.1%}\n"
                resultado += f"   Sinopse: {filme['sinopse']}\n\n"

            return [TextContent(type="text", text=resultado)]

        elif name == "listar_todos_filmes":
            titulos = recursos.listar_filmes()

            if not titulos:
                return [TextContent(
                    type="text",
                    text="Banco de dados vazio. Nenhum filme indexado."
                )]

            resultado = f"Total de {len(titulos)} filmes no banco:\n\n"
            resultado += "\n".join(f"‚Ä¢ {titulo}" for titulo in titulos)

            return [TextContent(type="text", text=resultado)]

        elif name == "estatisticas_banco":
            stats = recursos.estatisticas()

            resultado = "üìä Estat√≠sticas do Banco de Filmes:\n\n"
            resultado += f"‚Ä¢ Total de filmes: {stats['total_filmes']}\n"
            resultado += f"‚Ä¢ Total de trechos: {stats['total_chunks']}\n"
            resultado += f"‚Ä¢ M√©dia de trechos por filme: {stats['media_chunks']}\n"

            return [TextContent(type="text", text=resultado)]

        else:
            return [TextContent(
                type="text",
                text=f"Erro: Ferramenta '{name}' n√£o encontrada"
            )]

    except Exception as e:
        logger.error(f"‚ùå Erro ao executar ferramenta: {e}")
        return [TextContent(
            type="text",
            text=f"Erro ao executar ferramenta: {str(e)}"
        )]


# ============================================
# MAIN
# ============================================

async def main():
    """Fun√ß√£o principal para iniciar o servidor"""
    try:
        logger.info("üöÄ Iniciando servidor MCP para recomenda√ß√£o de filmes...")

        # Inicializa recursos
        recursos.inicializar()

        # Inicia servidor
        logger.info("‚úì Servidor MCP pronto!")
        logger.info("   Aguardando conex√µes do cliente...")

        # Mant√©m o servidor rodando
        from mcp.server.stdio import stdio_server

        async with stdio_server() as (read_stream, write_stream):
            await server.run(
                read_stream,
                write_stream,
                server.create_initialization_options()
            )

    except KeyboardInterrupt:
        logger.info("\n‚ö† Interrompido pelo usu√°rio")
    except Exception as e:
        logger.error(f"‚ùå Erro fatal: {e}")
        raise
    finally:
        recursos.fechar()
        logger.info("üëã Servidor encerrado")


if __name__ == "__main__":
    asyncio.run(main())

ModuleNotFoundError: Could not import module 'CodeCarbonCallback'. Are this object's requirements defined correctly?