#  deal.II Assistant

In [1]:
# !pip install langchain langchain-community langchain-cohere langchain-chroma tiktoken gradio beautifulsoup4

In [2]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
from langchain.load import dumps, loads
import json
import tiktoken
#from langchain_community.document_loaders import RecursiveUrlLoader
#from bs4 import BeautifulSoup as Soup

In [None]:
# Cohere api
import os
cohere_api = 'Your Cohere API Key'
os.environ['COHERE_API_KEY'] = cohere_api

In [4]:
def num_tokens_from_string(string: str, encoding_name: str) -> int:
    """Returns the number of tokens in a text string."""
    encoding = tiktoken.get_encoding(encoding_name)
    num_tokens = len(encoding.encode(string))
    return num_tokens


urls = []
for i in range(1,91):
    if i not in [73,80,84,88]:
        base_url = "https://dealii.org/current/doxygen/deal.II/step_"
        url = base_url + str(i) + ".html"
        urls.append(url)
#print(urls)

"""
docs = []
for url in urls:
    loader = RecursiveUrlLoader(url=url, max_depth=11, extractor=lambda x: Soup(x, "html.parser").text)

    doc = loader.load()

    docs.extend(doc)

    string_representation = dumps(doc)

    with open("./save_urls/step_" + url[48:-5] + ".json", "w") as fp:
        json.dump(string_representation, fp)

print(f"No. of webpages: {len(docs)}")
"""

docs = []
for i in range(1,91):
    if i not in [73,80,84,88]:
        with open("./save_urls/step_" + str(i) + ".json", "r") as fp:
            doc = loads(json.load(fp))
            docs.extend(doc)

print(f"No. of webpages: {len(docs)}")



# Split the document into chunks
splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=400, chunk_overlap=40)
splitted_docs = splitter.split_documents(docs)
print(f"No. of splitted documents: {len(splitted_docs)}")


# Calculate the number of tokens for each document
docs_texts = [d.page_content for d in docs]
counts = [num_tokens_from_string(d, "cl100k_base") for d in docs_texts]

print(f"No. of tokens: {sum(counts)}")

  doc = loads(json.load(fp))


No. of webpages: 86
No. of splitted documents: 11051
No. of tokens: 2334034


In [None]:
from langchain.schema import Document

# Directory containing test files
directory_path = ".\\tests"

# Size limit in bytes (1 KB = 1 * 1024)
size_limit = 1000000000 * 1024  # No limit for now

test_docs = []

# Use os.walk to go through all subdirectories and files
for root, _, files in os.walk(directory_path):
    for filename in files:
        if filename.endswith(".cc"):  # Process only .cc files
            file_path = os.path.join(root, filename)

            # Check if file size is within the specified limit
            if os.path.getsize(file_path) <= size_limit:
                with open(file_path, "r", encoding="utf-8") as file:
                    content = file.readlines()
                    content = "".join(content[13:])  # remove the first 13 lines related to license notice
                    content = content.strip()
                
                # Create a Document object with file content and metadata
                linux_path = os.path.normpath(file_path).replace("\\", "/")
                base_github_url = 'https://github.com/dealii/dealii/blob/master/'
                
                doc = Document(
                    page_content=content,
                    metadata={"source": base_github_url+linux_path}
                )
                
                test_docs.append(doc)

test_docs_texts = [d.page_content for d in test_docs]
test_counts = [num_tokens_from_string(d, "cl100k_base") for d in test_docs_texts]
print(f"No. of tokens for all of test docs: {sum(test_counts)}")
print(f"No. of test files: {len(test_counts)}")

No. of tokens for all of test docs: 5830042
No. of test files: 6310


In [6]:
from langchain_cohere import CohereEmbeddings

embedding_function = CohereEmbeddings(model="embed-english-v3.0")

sagemaker.config INFO - Not applying SDK defaults from location: C:\ProgramData\sagemaker\sagemaker\config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: C:\Users\mehdi\AppData\Local\sagemaker\sagemaker\config.yaml


In [7]:
db_tutorials = Chroma(persist_directory='./dealii_db_400_40/', embedding_function=embedding_function)

#db_tutorials.delete_collection()

#db_tutorials = Chroma.from_documents(splitted_docs, embedding_function, persist_directory='./dealii_db_400_40')

In [8]:
db_tests = Chroma(persist_directory='./dealii_db_tests/', embedding_function=embedding_function)

#db_tests.delete_collection()

#db_tests = Chroma.from_documents(test_docs, embedding_function, persist_directory='./dealii_db_tests')

In [9]:
from langchain_cohere import ChatCohere

llm = ChatCohere(model='command-r', temperature=0.)

In [10]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain.load import dumps, loads
from operator import itemgetter
from langchain_core.runnables import RunnableParallel
import gradio as gr

In [11]:
def get_unique_union(documents: list[list]):
    """ Unique union of retrieved docs """
    # Flatten list of lists, and convert each Document to string
    flattened_docs = [dumps(doc) for sublist in documents for doc in sublist]
    # Get unique documents
    unique_docs = list(set(flattened_docs))
    return [loads(doc) for doc in unique_docs]

def create_history(history):
    s = ""
    for i in range(0, len(history), 2):
        s += f'Question {i//2+1}: ' + history[i]['content'] + "\n"
        s += f'Answer {i//2+1}: ' + history[i+1]['content'] + "\n"
    return s

def remove_empty_string(list_of_questions):
    indices = []
    for i, sentence in enumerate(list_of_questions):
        if sentence == "":
            indices.append(i)
    for i in sorted(indices, reverse=True):
        del list_of_questions[i]
    return list_of_questions

template1 = """You are an expert assistant for question-answering tasks for deal.II library, \
an open-source C++ finite element library. The library website can be accessed at https://dealii.org. \
Use the following pieces of retrieved context and history of the conversation to answer the question. \
Provide the code examples where possible. Use contexts sourced from step-* tutorial programs to explain concepts. \
Use the code sourced from *.cc files for code examples. If you don't know the answer, just say that you don't know. \
If you know the answer, cite the source of your answer at the end. REMEMBER to add the sources of your answers at the end.

Question: {question}

Context: {context}

History of questions and answers between user and assistant: {history}

Answer:
"""
prompt1 = ChatPromptTemplate.from_template(template1)

template2 = """You are an expert assistant for question-answering tasks for a finite element library. \
Using this library, one can numerically solve ordinary differential equations \
and partial differential equations on mathematical domains for variety of problems. \
Your task is to generate four different versions of the given question. \
JUST output each question in one line. Add the original question also. \
Nothing else should be mentioned in the output, just questions separated by newlines.

Question: {question}
"""

prompt2 = ChatPromptTemplate.from_template(template2)

template3 = """Given a chat history and the latest user question \
which might reference context in the chat history, \
reformulate a standalone question which can be understood \
without the chat history. Do NOT answer the question, \
just reformulate it if needed and otherwise return it as is. \
Do not return anything else.

Question: {question}

History of questions and answers between user and assistant: {history}
"""
prompt3 = ChatPromptTemplate.from_template(template3)

tut_retriever = db_tutorials.as_retriever()
tests_retriever = db_tests.as_retriever()

reformulate_chain = prompt3 | llm | StrOutputParser()

multi_question_chain = prompt2 | llm | StrOutputParser() | (lambda x: x.split("\n")) | remove_empty_string

tut_retrieval_chain = tut_retriever.map() | get_unique_union
tests_retrieval_chain = tests_retriever.map() | get_unique_union

def multi_retrieval_chain(input_dict):
    questions = input_dict['multi_question']
    ans1 = tut_retrieval_chain.invoke(questions)
    ans2 = tests_retrieval_chain.invoke(questions)
    ans1.extend(ans2)
    return ans1

rag_chain = (
    {"question": reformulate_chain, "history": itemgetter('history')}
    | RunnableParallel({"multi_question": multi_question_chain, "question": itemgetter('question'), "history": itemgetter('history')})
    | RunnableParallel({"context": multi_retrieval_chain, "question": itemgetter('question'), "history": itemgetter('history')})
    | prompt1
    | llm
    | StrOutputParser()
)

In [None]:
def response_function(message, history):
    history_str = create_history(history)

    partial_message = ""
    for s in rag_chain.stream({'question': message, 'history': history_str}):
        partial_message += s
        yield partial_message

examples = ["How can you help me?", "What is FE_Nothing?", "How to write a loop over all cells?", "How to construct Lagrange elements?", "What is a preconditioner?",
            "How to use IncrementalFunction function?", "How to use to_spherical?", "How to add indices to an index set?", "How to calculate the distance between 2 points?"]

description = """This assistant helps with questions about the functionality of the deal.II library, including its applications, use cases, and specific functions or classes. \
It draws information from all 90 tutorials in the deal.II documentation and the available test suite, though it does not cover the entire documentation."""

gr.ChatInterface(response_function, type="messages", title="deal.II Assistant", description=description, examples=examples).launch() #share=True


* Running on local URL:  http://127.0.0.1:7861

To create a public link, set `share=True` in `launch()`.




In [13]:
question = "What is FE_Nothing?"

history = []
history_str = create_history(history)

result1 = rag_chain.invoke({'question': question, 'history': history_str})

history.append({"role": "user", "content": question})
history.append({"role": "assistant", "content": result1})

question2 = "Where can I use it?"

history_str = create_history(history)

result2 = rag_chain.invoke({'question': question2, 'history': history_str})

history.append({"role": "user", "content": question2})
history.append({"role": "assistant", "content": result2})

## Answers:

In [14]:
print(result1)

FE_Nothing is a special finite element class with exactly zero degrees of freedom per cell. The local basis on each cell for FE_Nothing is the empty set. This finite element is used when one does not need to perform any computations with shape functions, but only needs the JxW values from an FEValues object. 

An example of its usage can be found in the step-10 tutorial program. Here, FE_Nothing is used together with MappingQ to set up an FEValues object which only computes the JxW values upon calling the reinit function.
```cpp
// Create a finite element.
const MappingQ<dim> mapping(degree);
const FE_Nothing<dim> fe;

// Set up the FEValues object.
FEValues<dim> fe_values(mapping,
                      fe,
                      quadrature,
                      update_values | update_gradients | update_JxW_values);
```
The FE_Nothing class is also used in the step-46 and step-69 tutorial programs.

The term FE_Nothing also appears in some of the test programs for the deal.II library, 

In [15]:
print(result2)

FE_Nothing finite elements are used when one does not need to perform any computations with shape functions, but only requires the JxW values from an FEValues object. In such cases, FE_Nothing is a handy finite element choice as it has zero degrees of freedom per cell. This means that there is no need to compute other quantities upon calling the reinit function on the FEValues object, thereby saving computational time.

FE_Nothing is employed in situations where the goal is to extend functions to the entire domain, keeping them zero on a particular cell. Two use cases of FE_Nothing in the deal.II library are:

1. **Extending functions by zero to the entire domain**: In step-46, FE_Nothing is used to define a finite dimensional function space of functions that are constantly zero. This technique is employed to handle multiphysics problems where each of the physics involved is defined on different subdomains. By extending the functions to the entire domain, it becomes possible to use the