## Empowering Data Analysis with LangChain-OpenAI in Microsoft Fabric
**by Jesus Lopez Martin - https://www.syntax.es - January 2025**

In the world of modern data analysis, combining cutting-edge language models like OpenAI with the power of Spark SQL and Microsoft Fabric is a game-changer. This code demonstrates how to integrate LangChain-OpenAI into a Microsoft Fabric Notebook with PySpark, leveraging a Lakehouse to store and query data. This setup allows us to turn natural language questions into complex SQL queries seamlessly.

**Note:** Attach a Lakehouse with your tables to this Notebook, previous to run.

In [4]:
%pip install --upgrade --quiet langchain langchain-openai

StatementMeta(, b586f6ec-912d-4fbd-a1a4-f97f12ba663c, 15, Finished, Available, Finished)


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.



In [5]:
# Parameters samples
tables = "sales"
question = "How many orders were generated in each year?"

tables = "sales, customer"
question = "Get the top 10 customers with the most orders, along with their total amount."

StatementMeta(, b586f6ec-912d-4fbd-a1a4-f97f12ba663c, 17, Finished, Available, Finished)

In [6]:
from pyspark.sql import SparkSession
from langchain_openai import AzureChatOpenAI
from langchain.prompts import PromptTemplate
from notebookutils import credentials
import certifi
import os
import re

# SSL certificates configuration
os.environ['SSL_CERT_FILE'] = certifi.where()

# Keys and endpoint from the Vault
openai_api_key = credentials.getSecret("https://<your_key_vault>.vault.azure.net/", "OPENAI-API-KEY")
azure_endpoint = credentials.getSecret("https://<your_key_vault>.vault.azure.net/", "OPENAI-API-BASE")
openai_version = "2024-05-01-preview"

# Configure the language model
llm = AzureChatOpenAI(
    azure_deployment="gpt-4o",
    azure_endpoint=azure_endpoint,
    openai_api_key=openai_api_key,
    api_version=openai_version
)

# Create the Spark session
spark = SparkSession.builder.getOrCreate()

# Load tables and their columns
def load_tables(tables):
    tables_info = {}
    for table in tables.split(","):
        table = table.strip()
        df_spark = spark.sql(f"SELECT * FROM {table} LIMIT 1")
        tables_info[table] = list(df_spark.columns)
    return tables_info

# Prompt to generate SQL
sql_prompt = PromptTemplate(
    input_variables=["question", "tables_info"],
    template=(
        "You have access to the following tables and columns:\n"
        "{tables_info}\n"
        "Generate a valid SQL query to answer the following question: {question}. "
        "Make sure to include the necessary joins between tables if applicable. "
        "Only include the Spark SQL query without additional explanations."
    ),
)

# Function to execute SQL queries
def execute_query(query):
    try:
        print(f"Executing SQL:\n{query}")
        return spark.sql(query)
    except Exception as e:
        return f"Error executing the query: {str(e)}"

# Function to process the model's response
def extract_query(sql_text):
    # Extract only the SQL part
    sql_code = re.findall(r"```sql\n(.*?)```", sql_text, re.DOTALL)
    return sql_code[0] if sql_code else None

# Function to handle questions
def answer_question(question, tables):
    tables_info = load_tables(tables)
    tables_str = "\n".join([f"{table}: {', '.join(columns)}" for table, columns in tables_info.items()])
    
    prompt = sql_prompt.format(question=question, tables_info=tables_str)
    
    try:
        # Get the generated query
        response = llm.invoke(prompt).content.strip()
        query = extract_query(response)
        if query:
            spark_result = execute_query(query)
            # Convert to Pandas DataFrame
            return spark_result.toPandas()
        else:
            return "Could not extract a valid query from the response."
    except Exception as e:
        return f"Error processing the query: {str(e)}"

response = answer_question(question, tables)
display(response)

StatementMeta(, b586f6ec-912d-4fbd-a1a4-f97f12ba663c, 18, Finished, Available, Finished)

Executing SQL:
SELECT 
  c.CustomerKey, 
  c.GivenName, 
  c.Surname, 
  COUNT(s.OrderKey) AS TotalOrders, 
  SUM(s.NetPrice) AS TotalAmount
FROM 
  sales s
JOIN 
  customer c 
ON 
  s.CustomerKey = c.CustomerKey
GROUP BY 
  c.CustomerKey, 
  c.GivenName, 
  c.Surname
ORDER BY 
  TotalOrders DESC
LIMIT 10



SynapseWidget(Synapse.DataFrame, fa09876f-5a44-4356-91f8-8f4fc5f5db3a)