In [None]:
# Imports
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain.chains import create_retrieval_chain
from langchain_openai import ChatOpenAI
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate
from  langchain_community.document_loaders import PyMuPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
import getpass
import os
from dotenv import load_dotenv
from langchain.chat_models import init_chat_model
from langchain_core.messages import HumanMessage, SystemMessage
from duckduckgo_search import DDGS
import requests
import streamlit as st
import pymupdf

import logging
# Set up logging configuration
logging.basicConfig(
    level=logging.WARNING,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('partition_pdf.log'),  # write logs to partition_pdf.log file
        logging.StreamHandler()                    # also print logs to console
    ]
)

load_dotenv()
if not os.environ.get("OPENAI_API_KEY"):
    os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API Key: ")
model = init_chat_model("o3-mini", model_provider="openai")



In [1]:
sensor_name = "MCP9808"
sensor_data_key_val = "(temperature)"
data_size = "2"

# sensor_name = "AHT20"
# sensor_data_key_val = "(humidity, temperature)"
# data_size = "6"



In [2]:
# Find and download sensor datasheet

search_query = f"{sensor_name} datasheet filetype:pdf"
search_results = DDGS().text(search_query)
if search_results:
    datasheet_url = search_results[0]['href']
    print(f"Datasheet URL: {datasheet_url}")
    print("Downloading datasheet...")
    response = requests.get(datasheet_url)
    if response.status_code == 200:
        if not os.path.exists(f"/home/steven/FYP/v2_LLM_OS/LLM/Datasheet_DB/{sensor_name}.pdf"):     
            with open(f"/home/steven/FYP/v2_LLM_OS/LLM/Datasheet_DB/{sensor_name}.pdf", "wb") as file:
                for chunk in response.iter_content(1024):
                    file.write(chunk)
            print("Datasheet downloaded!")
        else:
            print("Datasheet already exists in the database.")
    print("Loading datasheet...")
    datasheet_path = f"/home/steven/FYP/v2_LLM_OS/LLM/Datasheet_DB/{sensor_name}.pdf"
    print("Datasheet loaded!")
else:
    print("No datasheet found for this I2C sensor.")

NameError: name 'DDGS' is not defined

In [None]:
# Load and partition the datasheet into elements
# 5 levels of partitioning
import pymupdf4llm
import pathlib
from langchain_text_splitters import RecursiveCharacterTextSplitter, MarkdownTextSplitter
from langchain_community.document_loaders import UnstructuredMarkdownLoader
from langchain_core.documents import Document

md_path = f"/home/steven/FYP/v2_LLM_OS/LLM/MD_DB/md_{sensor_name}.md"
if not os.path.exists(md_path):
    md_text = pymupdf4llm.to_markdown(datasheet_path)
    pathlib.Path(md_path).write_bytes(md_text.encode())
    print("Datasheet Partition does not exist. Created a new parition")
else:
    md_text = pathlib.Path(md_path).read_text()
    print("Datasheet partition exists. Loaded from local file")

splitter = MarkdownTextSplitter(chunk_size=500, chunk_overlap=100)

docs = splitter.create_documents([md_text])

print(len(docs))
# Join all document contents into one string
all_text = "\n\n---------XXXX----------\n\n".join(doc.page_content for doc in docs)

# Save to a single file
output_file = f"/home/steven/FYP/v2_LLM_OS/LLM/MD_DB/split_md_{sensor_name}.md"
with open(output_file, "w", encoding="utf-8") as f:
    f.write(all_text)



Datasheet partition exists. Loaded from local file
60


In [None]:
# Embed the datasheet chunks using FAISS
#TODO: We might want to use multiple datasheets for the same sensor
embeddings = OpenAIEmbeddings(
    openai_api_key=os.getenv("OPENAI_API_KEY"), 
    model="text-embedding-ada-002"
)

vector_db_path = f"/home/steven/FYP/v2_LLM_OS/LLM/Datasheet_Vector_DB/{sensor_name}"
if not os.path.exists(vector_db_path):
    vector_db = FAISS.from_documents(docs, embeddings)
    vector_db.save_local(vector_db_path)
    print("Vector DB not found, created and saved a new Vector DB")
else:
    vector_db = FAISS.load_local(vector_db_path, embeddings, allow_dangerous_deserialization=True)
    print("Vector DB found, loaded from local file")

Vector DB found, loaded from local file


In [None]:
#Take 10 most similar chunks from the vector DB using cosine simlarity.
retriever = vector_db.as_retriever(search_type="similarity", search_kwargs={"k": 3})
query = "Formula to convert raw sensor data to measurement units"
# Response: humidity: [8:15, 16:23, 24:27], temperature: [28:31, 32:39, 40:47]
# ['humidity: [8:27]', 'temperature: [28:47]']
retrieved_chunk = retriever.invoke(query)

In [None]:
# Iterate through the chunks. Ask the LLM if the chunk is helpful for answering the query. (Chunk validation)
# How do I ask LLM if the chunk is helpful, if not mark the chunk as not helpful and retrieve the next chunk?
validation_prompt = ChatPromptTemplate.from_template(
    """
    You are an assistant that validates if a provided document chunk is helpful in answering the user's query.

    QUERY:
    {query}

    CHUNK:
    {chunk}

    Is this chunk helpful for answering the query? Respond ONLY with 'Yes' or 'No'.
    """
)

validated_chunks = []

# Inspect the retrieved chunks (optional, for debugging purposes)
for idx, chunk in enumerate(retrieved_chunk):
    print(f"Retrieved Chunk {idx+1}: {chunk.page_content}")
    prompt = validation_prompt.format_messages(query=query, chunk=chunk.page_content)
    # print(prompt)
    response = model.invoke(prompt).content.strip().lower()
    print(response)
    if 'yes' in response:
        validated_chunks.append(chunk)
        print("YES. Chunk is helpful, proceeding with the next steps")
    else:
        print("NO. Chunk not helpful, moving to next chunk")
        continue

Retrieved Chunk 1: 5. Calculate the temperature and humidity values.

Note: The calibration status check in the first step
only needs to be checked at power-on. No operation
is required during the normal acquisition process.

Trigger measurement data

|Col1|Col2|Col3|Col4|Col5|Col6|Col7|Col8|Col9|Col10|Col11|Col12|Col13|Col14|Col15|Col16|Col17|Col18|Col19|
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
||I²C address + write|||||||||Trigger measurement 0xAC|||||||||
no
NO. Chunk not helpful, moving to next chunk
Retrieved Chunk 2: Table 11 Soft Reset– The grey part is controlled by AHT20. 6.1 Relative humidity transformation

Relative humidity RH can be calculated according to
the relative humidity signal SRH output from SDA by
the following equation.
(The result is expressed in% RH) 6.2 Temperature transformation

Temperature T can be calculated by substituting the
temperature output signal ST into the following formula.
(The results are expressed as temp

In [None]:
# Consolidate the validated chunks
consolidated_chunks = ""
i = 1
for chunk in validated_chunks:
    consolidated_chunks += f"{i}. {chunk.page_content}\n"
    i += 1
    
print(f"Consolidated Chunks: {consolidated_chunks}")


Consolidated Chunks: 1. Table 11 Soft Reset– The grey part is controlled by AHT20. 6.1 Relative humidity transformation

Relative humidity RH can be calculated according to
the relative humidity signal SRH output from SDA by
the following equation.
(The result is expressed in% RH) 6.2 Temperature transformation

Temperature T can be calculated by substituting the
temperature output signal ST into the following formula.
(The results are expressed as temperature ℃ T)



In [None]:
# Chain of Thought Reasoning LLM to extract the I2C address from the consolidated chunks
# https://www.datacamp.com/tutorial/chain-of-thought-prompting
# We might want to iterate through all the sensor_data_key_val and find the formula for each of them
# I have extracted the parameters into a uint32_t. Now I need to convert the raw data into measurement units.
prompt_i2c_template = ChatPromptTemplate.from_template(
    """
    You are a helpful assistant and an expert in I2C sensors.

    Raw context (might be inaccurate):
    {chunk}

    Create a MATH formula to convert the {sensor_name} data into measurement units.

    Rules:
    1. I have extracted each {sensor_data_key_val} into uint32_t. We call this variable x.
    2. Valid operators are: arithmetic and bitwise operators and modulo.
    3. DO NOT use IF statements.
    4. Use decimal or float numbers. Do not use hex or binary numbers.
    ONLY use x as variable. From your knowledge, explain your reasoning step by step.

    """
)

prompt_i2c = prompt_i2c_template.format_messages(
    chunk=consolidated_chunks,
    sensor_name=sensor_name,
    sensor_data_key_val = sensor_data_key_val,
    size = data_size,
)

CoT_response = model.invoke(prompt_i2c).content.strip()
print(f"Response: {CoT_response}")

# The context is correct. The output is wrong, but in chatgpt website, the output is correct.
# Maybe they are using reasoning and chain of thought which might be super helpful.

Response: The AHT20 outputs a 20‐bit value for each measurement. That is, the sensor uses 2^20 = 1,048,576 steps for its full scale. In common practice, the conversion formulas are:

 • Relative Humidity (%RH) = (x * 100.0) / 1,048,576.0  
 • Temperature (°C) = ((x * 200.0) / 1,048,576.0) - 50.0

Here’s the reasoning step by step:

1. The sensor provides 20 bits of data, so the maximum value is 1,048,575. To obtain a normalized value, you divide by 1,048,576 (which represents the total number of distinct values).

2. For relative humidity, multiplying by 100 converts the normalized fraction into a percentage:
  RH = (x / 1,048,576) * 100

3. For temperature, the sensor’s range is 200°C wide (from -50°C to +150°C). Multiplying the normalized value by 200 gives the span, and then subtracting 50 shifts the range to the correct offset:
  T = (x / 1,048,576) * 200 - 50

In these formulas we use only the variable x (the raw uint32_t value) along with allowed arithmetic operations.


In [None]:
prompt_i2c_feedback_template = ChatPromptTemplate.from_template(
    """
    You are a helpful assistant and an expert in I2C Sensors.

    My expert told me:
    {i2c_CoT_response}

    Please provide the reverse polish notation for the conversion formula.
    Represent the raw_data as X.
    Provide one reverse polish notation for each parameter: {sensor_data_key_val}.
    """
)

prompt_i2c_feedback = prompt_i2c_feedback_template.format_messages(
    i2c_CoT_response=CoT_response,
    sensor_name=sensor_name,
    sensor_data_key_val = sensor_data_key_val
)
i2c_feedback_response = model.invoke(prompt_i2c_feedback).content.strip()
print(f"Response: {i2c_feedback_response}")

Response: For relative humidity, the conversion formula is:
  RH = (X * 100.0) / 1048576.0
In Reverse Polish Notation, you can express this as:

  X 100.0 * 1048576.0 /

For temperature, the conversion formula is:
  T = ((X * 200.0) / 1048576.0) - 50.0
In Reverse Polish Notation, this becomes:

  X 200.0 * 1048576.0 / 50.0 -

These two RPN expressions use the raw data value X along with the arithmetic operations required for each conversion.


In [None]:
import re

prompt_i2c_cleanup_template = ChatPromptTemplate.from_template(
    """
    You are a helpful assistant and an expert in I2C Sensors.

    My expert told me:
    {i2c_feedback_response}

    X is the raw data. For each parameter from {sensor_data_key_val}, please arrange it as follows:
    ONLY FILL IN the sentence, the measurement values are arranged as: (parameter1: "reverse_polish_notation1", parameter2: "reverse_polish_notation1", ...)
    """
)
prompt_i2c_cleanup = prompt_i2c_cleanup_template.format_messages(
    i2c_feedback_response=i2c_feedback_response,
    sensor_data_key_val = sensor_data_key_val
)
i2c_cleanup_response = model.invoke(prompt_i2c_cleanup).content.strip()
print(i2c_cleanup_response)


(humidity: "X 100.0 * 1048576.0 /", temperature: "X 200.0 * 1048576.0 / 50.0 -")


In [None]:
# Extract only content within parentheses

matches = re.findall(r'\((.*?)\)', i2c_cleanup_response)

extracted_content = matches[0] if matches else ""

print(f"Response: {extracted_content}")

Response: humidity: "X 100.0 * 1048576.0 /", temperature: "X 200.0 * 1048576.0 / 50.0 -"
