This is mainly to just test RAG solutions for this type of problem. I will maybe try to implement a custom graphrag later.

In [12]:
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer
import torch
from transformers.utils import is_flash_attn_2_available

import os
import pandas as pd
from openai import OpenAI
from neo4j import GraphDatabase
import ast
import re

In [2]:
load_dotenv()
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "password")
model_id = "gemma-2-9b-it"

device = "cuda" if torch.cuda.is_available() else "cpu"
client = OpenAI(
    base_url="http://localhost:1234/v1",
    api_key="lm-studio"
)
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", NEO4J_PASSWORD))

Creating vector database embeddings

In [3]:
embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device=device)

df = pd.read_csv('mathinfo.csv')

embeddings = embedding_model.encode(df['Problem'].tolist(), convert_to_tensor=True, device=device)
df['embedding'] = embeddings.tolist()

In [4]:
embeddings = torch.tensor(df['embedding'].tolist(), dtype=torch.float32, device=device)
embeddings.shape

torch.Size([50, 384])

In [5]:
query = "What is the derivative of x^2?"
print(f"Query: {query}")

query_embedding = embedding_model.encode(query, convert_to_tensor=True, device=device)

dot_scores = torch.nn.functional.cosine_similarity(query_embedding, embeddings)
top_k = 5
top_k_indices = torch.topk(dot_scores, top_k).indices
dot_scores_list = dot_scores.tolist()
print(f"Top {top_k} most similar sentences:")
for i in top_k_indices:
    print(f"Score: {dot_scores_list[i]:.4f}, Problem: {df['Problem'][i.item()]}")

Query: What is the derivative of x^2?
Top 5 most similar sentences:
Score: 0.5572, Problem: Find the derivative of tan(x)
Score: 0.5399, Problem: Find the derivative of arcsin(x)
Score: 0.4920, Problem: Find the derivative of ln(sin(x))
Score: 0.3991, Problem: Solve ∫ 1/(x^2 + 1) dx
Score: 0.3675, Problem: Solve ∫ x ln(x) dx


In [6]:
def cosine_similarity(query_embedding, embeddings):
    dot_scores = torch.nn.functional.cosine_similarity(query_embedding, embeddings)
    return dot_scores, dot_scores.tolist()

def retrieve_relevant_resources(query, df, embedding_model, top_k=5):
    query_embedding = embedding_model.encode(query, convert_to_tensor=True, device=device)
    problem = df['Problem'].tolist()
    embeddings = embedding_model.encode(problem, convert_to_tensor=True, device=device)
    dot_scores, dot_list = cosine_similarity(query_embedding, embeddings)
    top_k_indices = torch.topk(dot_scores, top_k).indices
    return top_k_indices, dot_list

In [7]:
if (is_flash_attn_2_available()) and (torch.cuda.get_device_capability(0)[0] >= 8):
    attn_implementation = "flash_attention_2"
else:
    attn_implementation = "sdpa"
print(f"Using {attn_implementation} attention")

Using sdpa attention


In [13]:
prompt = """A ball is thrown vertically upward with an initial velocity of 20 m/s. The height of the ball after t seconds is given by the function:

h(t) = 20t - 5t^2

Question:
What is the maximum height reached by the ball?"""

def generate_system_prompt(examples):
    system_prompt = "You are a helpful assistant that helps solve mathematical problems. " \
                    "Use the following examples to understand the format and type of responses expected:\n\n"
    for i, example in enumerate(examples):
        system_prompt += f"Example {i + 1}:\n"
        system_prompt += f"Problem: {example['Problem']}\n"
        system_prompt += "Solution:\n"
        for step in safe_parse_steps(example['Steps']):
            system_prompt += f"{step.strip()}\n"
        system_prompt += "\n"
    system_prompt += "\nYou will be given a new problem. Use the format shown in the examples to provide a step-by-step solution.\n"
    return system_prompt

def safe_parse_steps(steps_str):
    fixed_str = re.sub(r"([A-Za-z])'(\()", r"\1\\'\2", steps_str)
    try:
        return ast.literal_eval(fixed_str)
    except Exception as e:
        print(f"Error parsing steps: {e}. Original string: {steps_str}")
        return []

top_k = 2
top_k_indices, dot_list = retrieve_relevant_resources(prompt, df, embedding_model, top_k=top_k)

examples = [{'Problem': df['Problem'][i.item()], 'Steps': df['Steps'][i.item()]} for i in top_k_indices]
system_prompt = generate_system_prompt(examples)
print("Generated System Prompt:")
print(system_prompt)

messages = [{"role": "system", "content": system_prompt},
            {"role": "user", "content": prompt}]
response = client.chat.completions.create(
    model=model_id,
    messages=messages
)
results = response.choices[0].message.content
print(results)

Generated System Prompt:
You are a helpful assistant that helps solve mathematical problems. Use the following examples to understand the format and type of responses expected:

Example 1:
Problem: A ladder is leaning against a wall. The ladder is 10 feet long, and the base is 6 feet away from the wall. How high up the wall does the ladder reach?
Solution:
Use the Pythagorean theorem: a^2 + b^2 = c^2
Substitute values: 6^2 + b^2 = 10^2
Solve for b: b = √(10^2 - 6^2) = 8
Result: 8 feet

Example 2:
Problem: Maximize the area of a rectangle with a perimeter of 20 units
Solution:
Let length = x and width = y, and use the perimeter constraint: 2x + 2y = 20
Express y in terms of x: y = 10 - x
Area = x * y = x(10 - x) = 10x - x^2
Find the derivative: A'(x) = 10 - 2x
Set A'(x) = 0: 10 - 2x = 0, x = 5
Substitute x = 5 into y = 10 - x: y = 5
Result: Maximum area is 5 * 5 = 25


You will be given a new problem. Use the format shown in the examples to provide a step-by-step solution.

Solution:
Th