# NL2SQL com LangGraph!

Olá! Vamos trabalhar juntos para criar um banco de dados sobre produtos, seus preços e a quantidade vendida por filial. Depois, adaptaremos o código fornecido para usar o LangGraph com esse novo banco de dados. Vamos fazer tudo passo a passo e em português brasileiro.

1. Criar o Banco de Dados
Primeiro, vamos criar um banco de dados SQLite chamado vendas.db. Usaremos o módulo sqlite3 para isso.

Passo 1.1: Importar o módulo sqlite3 e conectar ao banco de dados

In [245]:
import sqlite3

# Conecta ao banco de dados (será criado se não existir)
conn = sqlite3.connect('vendas.db')
cursor = conn.cursor()


Passo 1.2: Criar as tabelas

Vamos criar três tabelas: produtos, filiais e vendas.

In [246]:
# Cria a tabela 'produtos' se ela não existir
cursor.execute('''
CREATE TABLE IF NOT EXISTS produtos (
    produto_id INTEGER PRIMARY KEY,
    nome TEXT NOT NULL,
    preco REAL NOT NULL
)
''')

# Cria a tabela 'filiais' se ela não existir
cursor.execute('''
CREATE TABLE IF NOT EXISTS filiais (
    filial_id INTEGER PRIMARY KEY,
    nome TEXT NOT NULL,
    localizacao TEXT NOT NULL
)
''')

# Cria a tabela 'vendas' se ela não existir
cursor.execute('''
CREATE TABLE IF NOT EXISTS vendas (
    venda_id INTEGER PRIMARY KEY,
    produto_id INTEGER,
    filial_id INTEGER,
    quantidade_vendida INTEGER NOT NULL,
    data_venda DATE NOT NULL,
    FOREIGN KEY (produto_id) REFERENCES produtos (produto_id),
    FOREIGN KEY (filial_id) REFERENCES filiais (filial_id)
)
''')

# Confirma as alterações
conn.commit()

Passo 1.3: Inserir dados nas tabelas

Agora, vamos inserir alguns dados de exemplo em nossas tabelas.

In [247]:
# Inserir dados na tabela 'produtos'
produtos = [
    (1, 'Notebook', 3500.00),
    (2, 'Smartphone', 2000.00),
    (3, 'Tablet', 1500.00),
    (4, 'Monitor', 800.00),
    (5, 'Teclado', 150.00)
]

cursor.executemany('INSERT OR IGNORE INTO produtos VALUES (?, ?, ?)', produtos)

# Inserir dados na tabela 'filiais'
filiais = [
    (1, 'Filial Centro', 'São Paulo'),
    (2, 'Filial Norte', 'Rio de Janeiro'),
    (3, 'Filial Sul', 'Porto Alegre')
]

cursor.executemany('INSERT OR IGNORE INTO filiais VALUES (?, ?, ?)', filiais)

# Inserir dados na tabela 'vendas'
vendas = [
    (1, 1, 1, 10, '2023-01-15'),
    (2, 2, 1, 5, '2023-01-16'),
    (3, 3, 2, 7, '2023-01-17'),
    (4, 1, 2, 3, '2023-01-18'),
    (5, 4, 3, 4, '2023-01-19'),
    (6, 5, 3, 20, '2023-01-20'),
    (7, 2, 1, 6, '2023-01-21'),
    (8, 3, 2, 8, '2023-01-22'),
    (9, 4, 3, 2, '2023-01-23'),
    (10, 5, 1, 15, '2023-01-24')
]

cursor.executemany('INSERT OR IGNORE INTO vendas VALUES (?, ?, ?, ?, ?)', vendas)

# Confirma as alterações
conn.commit()

# Aviso sobre dados existentes
print("Dados inseridos ou ignorados se já existentes.")

Dados inseridos ou ignorados se já existentes.


Passo 1.4: Verificar os dados inseridos

Vamos verificar se os dados foram inseridos corretamente.

In [248]:
# Consultar todos os produtos
cursor.execute('SELECT * FROM produtos')
produtos = cursor.fetchall()
print('Produtos:')
for produto in produtos:
    print(produto)

# Consultar todas as filiais
cursor.execute('SELECT * FROM filiais')
filiais = cursor.fetchall()
print('\nFiliais:')
for filial in filiais:
    print(filial)

# Consultar todas as vendas
cursor.execute('SELECT * FROM vendas')
vendas = cursor.fetchall()
print('\nVendas:')
for venda in vendas:
    print(venda)


Produtos:
(1, 'Notebook', 3500.0)
(2, 'Smartphone', 2000.0)
(3, 'Tablet', 1500.0)
(4, 'Monitor', 800.0)
(5, 'Teclado', 150.0)

Filiais:
(1, 'Filial Centro', 'São Paulo')
(2, 'Filial Norte', 'Rio de Janeiro')
(3, 'Filial Sul', 'Porto Alegre')

Vendas:
(1, 1, 1, 10, '2023-01-15')
(2, 2, 1, 5, '2023-01-16')
(3, 3, 2, 7, '2023-01-17')
(4, 1, 2, 3, '2023-01-18')
(5, 4, 3, 4, '2023-01-19')
(6, 5, 3, 20, '2023-01-20')
(7, 2, 1, 6, '2023-01-21')
(8, 3, 2, 8, '2023-01-22')
(9, 4, 3, 2, '2023-01-23')
(10, 5, 1, 15, '2023-01-24')


Passo 1.5: Fechar a conexão

Após verificar os dados, podemos fechar a conexão com o banco de dados.

In [249]:
# Fecha a conexão com o banco de dados
conn.close()


Passo 2.1: Aplicando LangGraph 


Instalar as bibliotecas necessárias

Certifique-se de que as bibliotecas langchain e langgraph estejam instaladas. Se não estiverem, instale-as usando:

In [250]:
#!pip install langchain langgraph
#!pip install langgraph==0.2.1

#!pip install langchain-community langchain-core langgraph openai


Passo 2.2: Importar as bibliotecas

In [251]:
import warnings
warnings.filterwarnings('ignore')

import json
import os
from langchain.chat_models import ChatOpenAI
from langgraph.graph import StateGraph, END
from langchain.schema import SystemMessage, HumanMessage
from typing import TypedDict, List


Passo 2.3: Configurar o modelo de linguagem

In [252]:
os.environ['OPENAI_API_KEY'] = ''
model = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)


Passo 2.4: Definir o estado do agente

In [253]:
class AgentState(TypedDict):
    question: str
    table_schemas: str
    database: str
    sql: str
    reflect: List[str]
    accepted: bool
    revision: int
    max_revision: int


Passo 2.5: Obter o esquema do banco de dados

Vamos criar uma função para obter o esquema do nosso banco de dados.

In [254]:
def get_database_schema(db_path):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = cursor.fetchall()
    schema = ''
    for table_name in tables:
        table_name = table_name[0]
        cursor.execute(f"PRAGMA table_info({table_name});")
        columns = cursor.fetchall()
        schema += f"Table: {table_name}\n"
        schema += "Columns:\n"
        for column in columns:
            schema += f" - {column[1]} ({column[2]})\n"
        schema += '\n'
    conn.close()
    return schema

# Obter o esquema do banco de dados
db_schema = get_database_schema('vendas.db')
print(db_schema)


Table: produtos
Columns:
 - produto_id (INTEGER)
 - nome (TEXT)
 - preco (REAL)

Table: filiais
Columns:
 - filial_id (INTEGER)
 - nome (TEXT)
 - localizacao (TEXT)

Table: vendas
Columns:
 - venda_id (INTEGER)
 - produto_id (INTEGER)
 - filial_id (INTEGER)
 - quantidade_vendida (INTEGER)
 - data_venda (DATE)




Passo 2.6: Definir as funções dos agentes

Função do Search Engineer

Como estamos trabalhando com um banco de dados fixo, podemos simplificar esta função.

In [255]:
def search_engineer_node(state: AgentState):
    # Fornecemos o esquema do banco de dados diretamente
    state['table_schemas'] = db_schema
    state['database'] = 'vendas.db'
    return {"table_schemas": state['table_schemas'], "database": state['database']}


In [256]:
def senior_sql_writer_node(state: AgentState):
    role_prompt = """
Você é um especialista em SQL. Sua tarefa é escrever **apenas** a consulta SQL que responda à pergunta do usuário. A consulta deve:

- Usar a sintaxe SQL padrão em inglês.
- Utilizar os nomes das tabelas e colunas conforme definidos no esquema do banco de dados.
- Não incluir comentários, explicações ou qualquer texto adicional.
- Não utilizar formatação de código ou markdown.
- Retornar apenas a consulta SQL válida.
"""
    instruction = f"Esquema do banco de dados:\n{state['table_schemas']}\n"
    if len(state['reflect']) > 0:
        instruction += f"Considere os seguintes feedbacks:\n{chr(10).join(state['reflect'])}\n"
    instruction += f"Escreva a consulta SQL que responda à seguinte pergunta: {state['question']}\n"
    messages = [
        SystemMessage(content=role_prompt), 
        HumanMessage(content=instruction)
    ]
    response = model.invoke(messages)
    return {"sql": response.content.strip(), "revision": state['revision'] + 1}


In [257]:

#Função do QA Engineer
def senior_qa_engineer_node(state: AgentState):
    role_prompt = """
Você é um engenheiro de QA especializado em SQL. Sua tarefa é verificar se a consulta SQL fornecida responde corretamente à pergunta do usuário.
"""
    instruction = f"Com base no seguinte esquema de banco de dados:\n{state['table_schemas']}\n"
    instruction += f"E na seguinte consulta SQL:\n{state['sql']}\n"
    instruction += f"Verifique se a consulta SQL pode completar a tarefa: {state['question']}\n"
    instruction += "Responda 'ACEITO' se estiver correta ou 'REJEITADO' se não estiver.\n"
    messages = [
        SystemMessage(content=role_prompt), 
        HumanMessage(content=instruction)
    ]
    response = model(messages)
    return {"accepted": 'ACEITO' in response.content.upper()}


In [258]:
#Função do Chief DBA
def chief_dba_node(state: AgentState):
    role_prompt = """
Você é um DBA experiente. Sua tarefa é fornecer feedback detalhado para melhorar a consulta SQL fornecida.
"""
    instruction = f"Com base no seguinte esquema de banco de dados:\n{state['table_schemas']}\n"
    instruction += f"E na seguinte consulta SQL:\n{state['sql']}\n"
    instruction += f"Por favor, forneça recomendações úteis e detalhadas para ajudar a melhorar a consulta SQL para a tarefa: {state['question']}\n"
    messages = [
        SystemMessage(content=role_prompt), 
        HumanMessage(content=instruction)
    ]
    response = model(messages)
    return {"reflect": [response.content]}


Passo 2.7: Construir o grafo

In [259]:
builder = StateGraph(AgentState)

# Adicionar nós
builder.add_node("search_engineer", search_engineer_node)
builder.add_node("sql_writer", senior_sql_writer_node)
builder.add_node("qa_engineer", senior_qa_engineer_node)
builder.add_node("chief_dba", chief_dba_node)

# Adicionar arestas
builder.add_edge("search_engineer", "sql_writer")
builder.add_edge("sql_writer", "qa_engineer")
builder.add_edge("chief_dba", "sql_writer")

# Adicionar arestas condicionais
builder.add_conditional_edges(
    "qa_engineer", 
    lambda state: END if state['accepted'] or state['revision'] >= state['max_revision'] else "reflect", 
    {END: END, "reflect": "chief_dba"}
)

# Definir ponto de entrada
builder.set_entry_point("search_engineer")

# Compilar o grafo
graph = builder.compile()


2.7.1. Importar e Configurar o Checkpointer

In [260]:
from langgraph.checkpoint.memory import MemorySaver

# Configurar o checkpointer usando MemorySaver
memory = MemorySaver()


2.7.2. Compilar o Grafo com o Checkpointer

In [261]:
# Compilar o grafo com o checkpointer
graph = builder.compile(checkpointer=memory)


2.8 Executar o Grafo

In [262]:
# Definir a pergunta
question = "Qual produto teve a maior quantidade vendida na Filial Norte?"

# Inicializar o estado
initial_state = {
    'question': question,
    'table_schemas': '',  # Será preenchido pelo 'search_engineer_node'
    'database': '',       # Será preenchido pelo 'search_engineer_node'
    'sql': '',
    'reflect': [],
    'accepted': False,
    'revision': 0,
    'max_revision': 2
}

# Executar o grafo
thread = {"configurable": {"thread_id": "1"}}
for s in graph.stream(initial_state, thread):
    pass  # O processamento é feito internamente

# Obter o estado final
state = graph.get_state(thread)


In [263]:
print('Banco de dados:', state.values['database'])
print('Consulta SQL:\n', state.values['sql'])


Banco de dados: vendas.db
Consulta SQL:
 SELECT p.nome
FROM produtos p
JOIN vendas v ON p.produto_id = v.produto_id
JOIN filiais f ON v.filial_id = f.filial_id
WHERE f.nome = 'Filial Norte'
GROUP BY p.nome
ORDER BY SUM(v.quantidade_vendida) DESC
LIMIT 1


In [265]:
import re

def extract_sql(query_text):
    # Remove trechos de texto antes e depois da consulta SQL
    # Procura por conteúdo entre ```sql e ```
    match = re.search(r'```sql\s*(.*?)\s*```', query_text, re.DOTALL | re.IGNORECASE)
    if match:
        return match.group(1).strip()
    else:
        # Se não encontrar, tenta extrair a partir de "SELECT"
        match = re.search(r'(SELECT .*?);', query_text, re.DOTALL | re.IGNORECASE)
        if match:
            return match.group(1).strip() + ';'
        else:
            # Caso não consiga extrair, retorna o texto original
            return query_text.strip()


In [266]:
# Extrair a consulta SQL limpa
clean_sql = extract_sql(state.values['sql'])
print('Consulta SQL Extraída:\n', clean_sql)

# Executar a consulta SQL
try:
    cursor.execute(clean_sql)
    results = cursor.fetchall()
    print('\nResultados da consulta:')
    for result in results:
        print(result)
except Exception as e:
    print('Erro ao executar a consulta SQL:', e)


Consulta SQL Extraída:
 SELECT p.nome
FROM produtos p
JOIN vendas v ON p.produto_id = v.produto_id
JOIN filiais f ON v.filial_id = f.filial_id
WHERE f.nome = 'Filial Norte'
GROUP BY p.nome
ORDER BY SUM(v.quantidade_vendida) DESC
LIMIT 1
Erro ao executar a consulta SQL: Cannot operate on a closed database.


In [267]:
import sqlite3

# Conectar ao banco de dados
conn = sqlite3.connect(state.values['database'])
cursor = conn.cursor()

# Executar a consulta SQL
try:
    cursor.execute(state.values['sql'])
    results = cursor.fetchall()
    print('\n Resultados da consulta:')
    for result in results:
        print(result)
except Exception as e:
    print('Erro ao executar a consulta SQL:', e)

# Fechar a conexão
conn.close()



 Resultados da consulta:
('Tablet',)


In [268]:
# Definir a pergunta
question = "Qual produto teve a maior quantidade vendida na Filial Sul?"

# Inicializar o estado
initial_state = {
    'question': question,
    'table_schemas': '',  # Será preenchido pelo 'search_engineer_node'
    'database': '',       # Será preenchido pelo 'search_engineer_node'
    'sql': '',
    'reflect': [],
    'accepted': False,
    'revision': 0,
    'max_revision': 2
}

# Executar o grafo
thread = {"configurable": {"thread_id": "1"}}
for s in graph.stream(initial_state, thread):
    pass  # O processamento é feito internamente

# Obter o estado final
state = graph.get_state(thread)


In [269]:
# Extrair a consulta SQL limpa
clean_sql = extract_sql(state.values['sql'])
print('Consulta SQL Extraída:\n', clean_sql)

# Executar a consulta SQL
try:
    cursor.execute(clean_sql)
    results = cursor.fetchall()
    print('\nResultados da consulta:')
    for result in results:
        print(result)
except Exception as e:
    print('Erro ao executar a consulta SQL:', e)


Consulta SQL Extraída:
 SELECT p.nome
FROM produtos p
JOIN vendas v ON p.produto_id = v.produto_id
JOIN filiais f ON v.filial_id = f.filial_id
WHERE f.nome = 'Filial Sul'
GROUP BY p.nome
ORDER BY SUM(v.quantidade_vendida) DESC
LIMIT 1
Erro ao executar a consulta SQL: Cannot operate on a closed database.


In [270]:
import sqlite3

# Conectar ao banco de dados
conn = sqlite3.connect(state.values['database'])
cursor = conn.cursor()

# Executar a consulta SQL
try:
    cursor.execute(state.values['sql'])
    results = cursor.fetchall()
    print('\n Resultados da consulta:')
    for result in results:
        print(result)
except Exception as e:
    print('Erro ao executar a consulta SQL:', e)

# Fechar a conexão
conn.close()


 Resultados da consulta:
('Teclado',)


In [272]:
# Definir a pergunta
question = "Qual produto tem o maior preço entre todos as Filiais?"

# Inicializar o estado
initial_state = {
    'question': question,
    'table_schemas': '',  # Será preenchido pelo 'search_engineer_node'
    'database': '',       # Será preenchido pelo 'search_engineer_node'
    'sql': '',
    'reflect': [],
    'accepted': False,
    'revision': 0,
    'max_revision': 2
}

# Executar o grafo
thread = {"configurable": {"thread_id": "1"}}
for s in graph.stream(initial_state, thread):
    pass  # O processamento é feito internamente

# Obter o estado final
state = graph.get_state(thread)


In [273]:
# Extrair a consulta SQL limpa
clean_sql = extract_sql(state.values['sql'])
print('Consulta SQL Extraída:\n', clean_sql)

# Executar a consulta SQL
try:
    cursor.execute(clean_sql)
    results = cursor.fetchall()
    print('\nResultados da consulta:')
    for result in results:
        print(result)
except Exception as e:
    print('Erro ao executar a consulta SQL:', e)


Consulta SQL Extraída:
 SELECT nome FROM produtos WHERE preco = (SELECT MAX(preco) FROM produtos)
Erro ao executar a consulta SQL: Cannot operate on a closed database.


In [274]:
import sqlite3

# Conectar ao banco de dados
conn = sqlite3.connect(state.values['database'])
cursor = conn.cursor()

# Executar a consulta SQL
try:
    cursor.execute(state.values['sql'])
    results = cursor.fetchall()
    print('\n Resultados da consulta:')
    for result in results:
        print(result)
except Exception as e:
    print('Erro ao executar a consulta SQL:', e)

# Fechar a conexão
conn.close()


 Resultados da consulta:
('Notebook',)


Aqui está o texto formatado em Markdown de forma organizada e bonita:

# LangGraph: Lembrando Conceitos Fundamentais



## O Text2sql ou NL2sql abaixo com LangGraph é um exemplo clássico de fluxos de execução, como a transformação de perguntas em linguagem natural para SQL (NL2SQL). Cada "nó" do grafo representa uma etapa do fluxo, e as "arestas" entre esses nós representam a transição de um estado para outro, de acordo com condições definidas.

## Funções e Componentes do LangGraph no Código

### 1. StateGraph

- **Descrição**: Coração do LangGraph, permite definir um gráfico de estados que controla o fluxo de execução entre diferentes nós.
- **Função no código**: Estrutura o processo de transformação de uma pergunta em linguagem natural para SQL.
- **Uso em outros contextos**: Chatbots complexos, agentes de IA multitarefa, sistemas de recomendação.

### 2. add_node

- **Descrição**: Adiciona "nós" ao gráfico, representando funções ou tarefas específicas no fluxo.
- **Função no código**: Representa agentes no pipeline de geração de SQL (ex: search_engineer_node, senior_sql_writer_node).
- **Uso em outros contextos**: Modelagem de pipelines de tomada de decisão em chatbots ou sistemas de processamento.

### 3. add_edge

- **Descrição**: Define a conexão entre dois nós, determinando o fluxo de execução.
- **Função no código**: Conecta nós em uma sequência lógica (ex: search_engineer ao sql_writer).
- **Uso em outros contextos**: Controle de fluxo em sistemas de automação empresarial ou pipelines de machine learning.

### 4. add_conditional_edges

- **Descrição**: Adiciona arestas condicionais ao gráfico, permitindo diferentes caminhos de execução.
- **Função no código**: Verifica a aceitação da consulta SQL pelo agente de QA, redirecionando o fluxo se necessário.
- **Uso em outros contextos**: Sistemas de decisão complexos, como diagnósticos médicos automáticos ou controle de qualidade.

### 5. compile

- **Descrição**: Transforma o gráfico de estado em um sistema funcional, pronto para execução.
- **Função no código**: Prepara o grafo para execução, integrando o checkpointer.
- **Uso em outros contextos**: Construção de gráficos de tarefas em sistemas de atendimento automatizado.

### 6. MemorySaver

- **Descrição**: Tipo de checkpointer usado para persistir o estado durante a execução do grafo.
- **Função no código**: Armazena o estado do grafo entre execuções.
- **Uso em outros contextos**: Sistemas interativos ou contínuos, como jogos ou assistentes virtuais.

### 7. get_state e stream

- **Descrição**: Recuperam o estado atual de execução e processam o gráfico de forma contínua.
- **Função no código**: Executam o fluxo definido e obtêm o estado final após a execução.
- **Uso em outros contextos**: Sistemas de recomendação em tempo real, agentes autônomos, sistemas de workflow.

## Casos de Uso Adicionais do LangGraph

1. **Chatbots Complexos**: Estruturação de chatbots com múltiplas interações e respostas personalizadas.
2. **Sistemas de Automação Empresarial**: Criação de fluxos de trabalho complexos com múltiplas etapas de aprovação.
3. **Pipelines de Machine Learning**: Controle de fluxo entre diferentes fases do pipeline (pré-processamento, treinamento, validação).
4. **Sistemas de Diagnóstico**: Verificação de sintomas, históricos médicos e resultados de exames em diagnósticos médicos.

## Conclusão

O LangGraph permite criar fluxos de execução complexos e interativos, modelando processos com clareza e flexibilidade. Sua modularidade facilita o desenvolvimento de sistemas escaláveis e compreensíveis, tornando-o uma ferramenta poderosa para estruturar fluxos de decisão e automação com IA em diversos domínios.