In [1]:
import psycopg2
import pandas as pd
from metrics_dict import metrics_dict

connection = psycopg2.connect(
        host="172.22.0.40",
        port=5432,
        database="metrics",
        user="admin",
        password="admin123"
        )

In [2]:
import pandas as pd
aux = None
with connection.cursor() as cur:
        cur.execute("SELECT * FROM metric")
        for linha in cur.fetchall():
            aux = pd.DataFrame(linha)

In [3]:
connection.close()

In [4]:
with connection.cursor() as cur:
    cur.execute("SELECT * FROM metric ORDER BY time DESC")
    colnames = [desc[0] for desc in cur.description] 
    rows = cur.fetchall()
    df = pd.DataFrame(rows, columns=colnames)

In [8]:
import requests
import json

# Mensagem enviada ao modelo
payload = {
    "model": "llama3.2:latest",
    "prompt": "Explique o que é aprendizado de máquina em termos simples."
}

# Faz a requisição para a API do Ollama
response = requests.post("http://localhost:11434/api/generate", json=payload, stream=True)


resposta_completa = ""

for line in response.iter_lines():
    if line:
        data = json.loads(line.decode("utf-8"))
        resposta_completa += data.get("response", "")
        if data.get("done", False):  
            break

print("Resposta do modelo:")
print(resposta_completa)


Resposta do modelo:
Claro, vou explicar o conceito de aprendizado de máquina de forma simples:

**O que é aprendizado de máquina?**

O aprendizado de máquina é uma técnica de processamento de informações que permite a uma máquina aprender e se adaptar ao mundo ao seu redor. Ela não é apenas limitada à execução de instruções previsíveis, mas sim pode se especializar em tarefas específicas e melhorar suas habilidades com base na experiência.

**Como funciona o aprendizado de máquina?**

Agora vamos entender como isso funciona:

1. **Dado**: São fornecidos dados de entrada que a máquina precisa processar, como imagens, textos ou áudio.
2. **Algoritmo**: Um algoritmo é executado sobre os dados de entrada para transformá-los em um formato mais útil, como um modelo ou uma representação matemática.
3. **Treinamento**: O algoritmo é treinado com base nos dados de entrada e é ajustado para melhorar seu desempenho ao longo do processo.
4. **Teste**: Uma vez que o modelo estiver treinado, ele é t

([{'table_name': 'metric'}], 'Sucesso')

In [2]:
import requests
import json
import logging
import re
from src.ollama_client import OllamaClient

if __name__ == "__main__":
    import sys
    logging.basicConfig(level=logging.INFO)
    
    client = OllamaClient()
    
    print("=== Teste de listagem de modelos ===")
    try:
        models = client.list_models()
        print(f"Modelos disponíveis: {models}")
    except Exception as e:
        print(f"Erro ao listar modelos: {e}")
        sys.exit(1)
    
    if not models:
        print("Nenhum modelo disponível. Servidor Ollama está rodando?")
        sys.exit(1)
    
    print("\n=== Teste de chamada básica ===")
    try:
        response = client.call(
            model=models[0],
            prompt="Explique o que é uma API REST em 1 frase",
            system_prompt="Seja conciso e técnico"
        )
        print(f"Resposta: {response}")
    except Exception as e:
        print(f"Erro na chamada ao modelo: {e}")
        sys.exit(1)
    
    print("\n=== Teste de streaming ===")
    try:
        stream_response = client.call(
            model=models[0],
            prompt="Liste 3 benefícios de beber água",
            stream=True
        )
        print(f"Resposta em streaming: {stream_response}")
    except Exception as e:
        print(f"Erro no streaming: {e}")
        sys.exit(1)
    
    print("\nTodos os testes completados com sucesso!")

=== Teste de listagem de modelos ===
Modelos disponíveis: ['llama3.2:latest', 'sqlcoder:7b-q5_K_M', 'sqlcoder:latest']

=== Teste de chamada básica ===
Resposta: Uma API REST (Representational State of Resource) é um padrão de arquitetura para serviços web que se concentra na reutilização de recursos em um formato de dados simples, facilitando a comunicação entre sistemas e aplicativos.

=== Teste de streaming ===
Resposta em streaming: Claro! Aqui estão três benefícios de beber água:

1. **Maior hidratação e melhor funcionamento do corpo**: A água é essencial para a vida, pois ajuda a manter o equilíbrio da água no corpo. Beber água suficiente ajuda a manter as funções corporais normais, como regulação da temperatura, transporte de nutrientes e oxigênio aos tecidos e eliminação de resíduos.

2. **Melhora a saúde digestiva**: A água ajuda a prevenir a constipação e a indigestão, pois ajuda a dissolver as fibras alimentares e a melhorar a motilidade intestinal. Além disso, beber água an

In [1]:
from src.ollama_client import OllamaClient
from src.database_client import DatabaseClient
import asyncio
import json
import logging
from typing import Dict, Any, Optional, Tuple
from datetime import datetime
import psycopg2
from psycopg2.extras import RealDictCursor
import requests
import re
import sqlglot
from sqlglot.expressions import Column
from sqlglot.errors import ParseError

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class Pipeline:
    def __init__(self):
        self.ollama_base_url = "http://localhost:11434"
        self.database = DatabaseClient()
        #tem que estar em inglês
        self.table_schema = """
        CREATE TABLE IF NOT EXISTS metric (
            time TIMESTAMPTZ NOT NULL PRIMARY KEY, -- Metric timestamp
            fivegs_amffunction_rm_registeredsubnbr INT, -- Number of registered subscribers
            fivegs_amffunction_rm_regmobreq INT, -- Mobility registration update requests
            fivegs_amffunction_rm_regmobsucc INT, -- Successful mobility registration updates
            fivegs_amffunction_rm_regmobfail INT, -- Failed mobility registration updates
            fivegs_smffunction_sm_sessionnbr INT, -- Number of active sessions
            fivegs_smffunction_sm_pdusessioncreationreq INT, -- PDU session creation requests
            fivegs_smffunction_sm_pdusessioncreationsucc INT, -- Successful PDU session creations
            fivegs_ep_n3_gtp_indatapktn3upf INT, -- Uplink throughput on N3 interface
            fivegs_ep_n3_gtp_outdatapktn3upf INT, -- Downlink throughput on N3 interface
            fivegs_upffunction_upf_sessionnbr INT, -- Average number of PDU sessions
            fivegs_smffunction_upf_qos_flow_nbr INT, -- Number of active QoS flows
            fivegs_upffunction_sm_n4sessionreport INT, -- N4 session reports
            fivegs_upffunction_sm_n4sessionreportsucc INT, -- Successful N4 session reports
            fivegs_upffunction_sm_n4sessionestabreq INT, -- N4 session establishment requests
            fivegs_upffunction_sm_n4sessionestabfail INT, -- Failed N4 session establishments
            softmodern_bler_dl INT, -- Block error rate - downlink
            softmodern_bler_ul INT, -- Block error rate - uplink
            softmodern_rsrp INT, -- Received signal power
            pfcp_sessions_active INT -- Used for testing value changes
        );
        Get the current 5G network performance metrics, including throughput, latency and packet loss rates.
        """
        
    def call_ollama(self, model : str, prompt : str, system_prompt: str = None) -> str:

        try:
            url = f"{self.ollama_base_url}/api/generate"
            payload = {
                "model": model,
                "prompt":prompt,
                "stream":False,
                "options":{
                    "temperature": 0.1,
                    "top_p": 0.9,
                    "num_predict": 1000
                }
            }
            if system_prompt:
                payload["system"] = system_prompt
            response = requests.post(url, json=payload, timeout=120)
            response.raise_for_status()

            return response.json()["response"].strip()
        except Exception as e:
            print(e)

    def translate_and_process_question(self, question: str) -> str:
        system_prompt = """
        Você é um assistente especializado em tradução e processamento de perguntas para consultas SQL.
        Sua tarefa é:
        1. Traduzir a pergunta do português para o inglês
        2. Reformular a pergunta para ser mais específica e clara para geração de SQL
        3. Incluir contexto técnico relevante sobre métricas de redes 5G quando apropriado
        4. Manter o significado original mas torná-la mais precisa
        
        Responda apenas com a pergunta traduzida e processada, sem explicações adicionais.
        """
        
        prompt = f"""
        Pergunta original em português: "{question}"
        
        Traduza para inglês e processe para ser mais específica para geração de SQL sobre métricas de rede 5G.
        """
        
        logger.info("Etapa 1: Traduzindo e processando pergunta...")
        processed_question = self.call_ollama("llama3.2:latest", prompt, system_prompt)
        logger.info(f"Pergunta processada: {processed_question}")
        
        return processed_question
    
    def execute_sql_query(self, sql_query: str) -> Tuple[list, str]:
        try:
            logger.info("Etapa 3: Executando query SQL...")
            return self.database.execute(sql_query)                    
        except Exception as e:
            error_msg = f"Erro ao executar query SQL: {str(e)}"
            logger.error(error_msg)
            return [], error_msg

    def generate_sql_query(self, processed_question: str, max_retries: int = 3) -> str:
        system_prompt = """
        You are an expert SQL query generator for PostgreSQL. Generate precise SQL queries based on the user's question.
        Always use proper SQL syntax and consider performance optimizations.
        Focus on the specific metrics requested and use appropriate aggregations and filters.
        Return only the SQL query without any explanations or markdown formatting.
        """

        available_columns = self._extract_column_names()

        base_prompt = f"""
        Database schema:
        {self.table_schema}

        Available columns in 'metric' table:
        {', '.join(available_columns)}

        User question: {processed_question}

        Generate a SQL query to answer this question. Consider:
        - The table name is 'metric'
        - Use appropriate time filtering when needed
        - Apply proper aggregations (SUM, AVG, COUNT, etc.)
        - Include relevant columns based on the question
        - Use proper PostgreSQL syntax
        - ONLY use column names that exist in the available columns list above

        Important constraints:
        - Table name: 'metric'
        - Time column: 'time' (TIMESTAMPTZ)
        - NEVER use nested aggregate functions like AVG(SUM(...))
        - Use subqueries for complex aggregations
        - Use proper time filtering with PostgreSQL date functions
        - Include appropriate GROUP BY clauses
        - Use LIMIT when showing recent data
        - NEVER use aggregate functions in GROUP BY clause
        - Only reference columns that exist in the available columns list

        Examples of VALID patterns:
        - SELECT AVG(column_name) FROM metric WHERE time >= NOW() - INTERVAL '1 hour'
        - SELECT date_trunc('hour', time), SUM(column_name) FROM metric GROUP BY date_trunc('hour', time)
        - SELECT AVG(daily_total) FROM (SELECT DATE(time), SUM(column_name) as daily_total FROM metric GROUP BY DATE(time)) subq
        - SELECT time, column_name FROM metric ORDER BY time DESC LIMIT 10

        Examples of INVALID patterns (DO NOT USE):
        - SELECT AVG(SUM(column_name)) FROM metric -- INVALID: nested aggregates
        - SELECT SUM(AVG(column_name)) FROM metric -- INVALID: nested aggregates
        - SELECT time, SUM(column_name) FROM metric GROUP BY COUNT(other_column) -- INVALID: aggregate in GROUP BY
        - SELECT nonexistent_column FROM metric -- INVALID: column doesn't exist

        Generate only the SQL query:
        """

        attempt = 0
        while attempt < max_retries:
            logger.info(f"Etapa 2: Tentativa {attempt + 1} de {max_retries} para gerar query SQL...")
            sql_query = self.call_ollama("llama3.2:latest", base_prompt, system_prompt)

            sql_query = re.sub(r'```sql\n?', '', sql_query)
            sql_query = re.sub(r'```\n?', '', sql_query)
            sql_query = sql_query.strip()

            validation_errors = self._validate_sql_query(sql_query, available_columns)
            if not validation_errors:
                logger.info(f"Query SQL gerada com sucesso: {sql_query}")
                return sql_query
            else:
                logger.warning(f"Tentativa {attempt + 1} falhou com erros: {validation_errors}")
                attempt += 1

        logger.error("Falha ao gerar uma query SQL válida após várias tentativas.")
        raise ValueError("Não foi possível gerar uma query SQL válida.")


    def _extract_column_names(self) -> list:
        """Extract column names from the table schema"""
        try:

            columns = []
            if hasattr(self, 'table_schema') and self.table_schema:
                if isinstance(self.table_schema, str):
                    import re
                    column_matches = re.findall(r'(\w+)\s+(?:INTEGER|VARCHAR|TEXT|TIMESTAMPTZ|NUMERIC|FLOAT|DOUBLE)', 
                                            self.table_schema, re.IGNORECASE)
                    columns = column_matches
                
            if not columns:
                try:
                    query = """
                    SELECT column_name 
                    FROM information_schema.columns 
                    WHERE table_name = 'metric' 
                    ORDER BY ordinal_position
                    """
                    result = self.db_client.execute_query(query)
                    columns = [row[0] for row in result]
                except Exception as e:
                    logger.warning(f"Could not fetch column names: {e}")
                    columns = ['time']
            
            return columns
        except Exception as e:
            logger.error(f"Error extracting column names: {e}")
            return ['time']

    def _validate_sql_query(self, sql_query: str, available_columns: list) -> list:
        """Validate the SQL query for common issues"""
        errors = []
        
        if re.search(r'(AVG|SUM|COUNT|MIN|MAX)\s*\(\s*(AVG|SUM|COUNT|MIN|MAX)', sql_query, re.IGNORECASE):
            errors.append("Nested aggregate functions detected")
        
        group_by_match = re.search(r'GROUP BY\s+(.+?)(?:\s+ORDER|\s+HAVING|\s+LIMIT|$)', sql_query, re.IGNORECASE | re.DOTALL)
        if group_by_match:
            group_by_clause = group_by_match.group(1)
            if re.search(r'(AVG|SUM|COUNT|MIN|MAX)\s*\(', group_by_clause, re.IGNORECASE):
                errors.append("Aggregate function in GROUP BY clause")
        

        column_refs = re.findall(r'\b(?!(?:SELECT|FROM|WHERE|GROUP|ORDER|BY|HAVING|LIMIT|AND|OR|NOT|IN|EXISTS|CASE|WHEN|THEN|ELSE|END|AS|DISTINCT|ALL|UNION|INTERSECT|EXCEPT|JOIN|LEFT|RIGHT|INNER|OUTER|ON|USING|INSERT|UPDATE|DELETE|CREATE|ALTER|DROP|TRUNCATE|GRANT|REVOKE)\b)\w+\b', sql_query, re.IGNORECASE)
        
        sql_keywords = {'metric', 'time', 'now', 'interval', 'date', 'extract', 'date_trunc', 'hour', 'day', 'month', 'year', 'desc', 'asc', 'null', 'true', 'false'}
        potential_columns = [col for col in column_refs if col.lower() not in sql_keywords and not col.isdigit()]
        
        missing_columns = [col for col in potential_columns if col not in available_columns]
        if missing_columns:
            errors.append(f"Potential missing columns: {missing_columns}")
        
        return errors

    def interpret_results(self, original_question: str, sql_query: str, 
                         results: list, execution_status: str) -> str:
        system_prompt = """
        Você é um assistente especializado em análise de dados de redes 5G.
        Sua tarefa é interpretar os resultados de consultas SQL e fornecer uma resposta clara em português.
        
        Diretrizes:
        1. Responda em português brasileiro
        2. Seja claro e objetivo
        3. Forneça insights relevantes sobre os dados
        4. Explique o que os números significam no contexto de redes 5G
        5. Se houver problemas com a query, explique de forma compreensível
        6. Use formatação clara para apresentar os dados
        """
        
        if results:
            results_summary = f"Resultados encontrados: {len(results)} registros\n"
            results_summary += f"Primeiros resultados:\n{json.dumps(results[:5], indent=2, default=str)}"
        else:
            results_summary = "Nenhum resultado encontrado ou erro na execução."
        
        prompt = f"""
        Pergunta original do usuário: "{original_question}"
        
        Query SQL executada: {sql_query}
        
        Status da execução: {execution_status}
        
        {results_summary}
        
        Forneça uma resposta completa em português interpretando estes resultados para o usuário.
        Se houver dados, analise-os e forneça insights relevantes.
        Se houver erros, explique o que aconteceu de forma compreensível.
        """
        
        logger.info("Etapa 4: Interpretando resultados...")
        interpretation = self.call_ollama("llama3.2:latest", prompt, system_prompt)
        
        return interpretation

    def process_question(self, question: str):
        start_time = datetime.now()
        try:
            processed_question = self.translate_and_process_question(question)

            sql_query = self.generate_sql_query(processed_question)

            results, execution_status = self.execute_sql_query(sql_query)

            interpretation = self.interpret_results(
                question, sql_query, results, execution_status
            )

            end_time = datetime.now()
            processing_time = (end_time - start_time).total_seconds()
            
            return {
                "success": True,
                "original_question": question,
                "processed_question": processed_question,
                "sql_query": sql_query,
                "execution_status": execution_status,
                "results_count": len(results),
                "results": results,
                "interpretation": interpretation,
                "processing_time_seconds": processing_time,
                "timestamp": start_time.isoformat()
            }

        except Exception as e:
            print(e)
            error_msg = f"Erro no pipeline: {str(e)}"
            logger.error(error_msg)
            
            return {
                "success": False,
                "error": error_msg,
                "original_question": question,
                "timestamp": start_time.isoformat()
            }
        
    

KeyboardInterrupt: 

In [2]:
import psycopg2
import pandas as pd
from metrics_dict import metrics_dict

connection = psycopg2.connect(
        host="172.22.0.40",
        port=5432,
        database="metrics",
        user="admin",
        password="admin123"
        )

cur = connection.cursor()

cur.execute("""
    SELECT table_schema, table_name
    FROM information_schema.tables
    WHERE table_type = 'BASE TABLE'
        AND table_schema NOT IN ('pg_catalog', 'information_schema')
""")

cur.execute("""
    SELECT * FROM metric LIMIT 10;
""")


result = cur.fetchall()
for index in result:
    print(index)

(datetime.datetime(2025, 7, 10, 22, 47, tzinfo=datetime.timezone.utc), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
(datetime.datetime(2025, 7, 10, 22, 44, 45, tzinfo=datetime.timezone.utc), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
(datetime.datetime(2025, 7, 10, 22, 40, 16, tzinfo=datetime.timezone.utc), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
(datetime.datetime(2025, 7, 10, 22, 42, tzinfo=datetime.timezone.utc), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
(datetime.datetime(2025, 7, 10, 22, 46, tzinfo=datetime.timezone.utc), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
(datetime.datetime(2025, 7, 10, 22, 43, 30, tzinfo=datetime.timezone.utc), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
(datetime.datetime(2025, 7, 10, 22, 40, 30, tzinfo=datetime.timezone.utc), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
(datetime.datetime(2025, 7, 10, 22, 48, tzinfo=datetime.timezone.utc), 0, 0, 0, 0, 0, 0,

In [18]:
def display_results(resultado: Dict[str, Any]) -> None:

        print("=" * 80)
        print("RESULTADO DO PIPELINE")
        print("=" * 80)
        
        if resultado["success"]:
            print(f"Status: Processamento concluído com sucesso")
            print(f"Tempo: {resultado['processing_time_seconds']:.2f} segundos")
            print(f"Registros encontrados: {resultado['results_count']}")
            print()
            
            print("PERGUNTA ORIGINAL:")
            print(f"{resultado['original_question']}")
            print()
            
            print("PERGUNTA PROCESSADA:")
            print(f"{resultado['processed_question']}")
            print()
            
            print("QUERY SQL GERADA:")
            print(f"{resultado['sql_query']}")
            print()
            
            print("INTERPRETAÇÃO DOS RESULTADOS:")
            print("-" * 50)
            interpretation_lines = resultado['interpretation'].split('\n')
            for line in interpretation_lines:
                print(f"   {line}")
            print("-" * 50)
            
        else:
            print(f"Status: Erro no processamento")
            print(f" Erro: {resultado['error']}")
            print(f"Pergunta: {resultado['original_question']}")
        
        print("=" * 80)
        print()


In [None]:
pipe = Pipeline()

perguntas_exemplo = [
        "Quantos assinantes estão registrados atualmente?",
        "Qual é a taxa de sucesso das sessões PDU na última semana?",
        "Mostre o throughput médio de upload e download nas últimas 24 horas",
        "Qual é a taxa de erro de bloco para uplink e downlink hoje?",
        "Quantas sessões ativas existem no momento?"
    ]
for pergunta in perguntas_exemplo:
    resultado = pipe.process_question(pergunta)
    if resultado["success"]:
        print(f"Processamento concluído em {resultado['processing_time_seconds']:.2f}s")
        print(f"Resultados encontrados: {resultado['results_count']}")
        print(f"Query SQL: {resultado['sql_query']}")
        print(f"Interpretação: {resultado['interpretation']}")
    else:
        print(f"Bola furada...")


INFO:__main__:Etapa 1: Traduzindo e processando pergunta...


INFO:__main__:Pergunta processada: "Number of active subscribers in the 5G network."
INFO:__main__:Etapa 2: Tentativa 1 de 3 para gerar query SQL...
ERROR:__main__:Erro no pipeline: expected string or bytes-like object, got 'NoneType'
INFO:__main__:Etapa 1: Traduzindo e processando pergunta...


HTTPConnectionPool(host='localhost', port=11434): Read timed out. (read timeout=120)
expected string or bytes-like object, got 'NoneType'
Bola furada...


INFO:__main__:Pergunta processada: "Which is the session PDU success rate for the last week in 5G network?"
INFO:__main__:Etapa 2: Tentativa 1 de 3 para gerar query SQL...
