In [14]:
# ! pip install PyPDF2
# ! pip uninstall fitz
# ! pip install pymupdf
# ! pip install tabula
# ! pip install pdfplumber


Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com


ERROR: Could not find a version that satisfies the requirement some_library (from versions: none)
ERROR: No matching distribution found for some_library


In [1]:
from langchain_community.document_loaders import PyPDFLoader
from langchain.embeddings import SentenceTransformerEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.retrievers.multi_vector import MultiVectorRetriever

import pdfplumber
from bs4 import BeautifulSoup
from sentence_transformers import SentenceTransformer
import numpy as np
import faiss

# Function to convert table to text
def table_to_text(table):
    return "\n".join(["\t".join(map(str, row)) for row in table])

# Function to convert HTML table to plain text
def html_to_plain_text(html):
    soup = BeautifulSoup(html, "html.parser")
    return soup.get_text(separator="\n")

# Function to convert text to HTML table
def text_to_html_table(table_text):
    rows = table_text.split("\n")
    soup = BeautifulSoup("<table></table>", "html.parser")
    table_tag = soup.table
    for row in rows:
        row_tag = soup.new_tag("tr")
        table_tag.append(row_tag)
        for cell in row.split("\t"):
            cell_tag = soup.new_tag("td")
            cell_tag.string = cell
            row_tag.append(cell_tag)
    return str(table_tag)

# Load and split text from PDF
def load_and_split_pdf_text(pdf_path):
    loader = PyPDFLoader(pdf_path)
    text = loader.load()
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=0)
    return text_splitter.split_documents(text)

# Extract tables from PDF
def extract_tables_from_pdf(pdf_path):
    with pdfplumber.open(pdf_path) as pdf:
        tables = []
        for page in pdf.pages:
            tables.extend(page.extract_tables())
    all_tables_text = [table_to_text(table) for table in tables if table]
    return [text_to_html_table(table_text) for table_text in all_tables_text]

# Load text from both PDFs
pdf1_text_splits = load_and_split_pdf_text("AUTOSAR_CP_SWS_CANDriver.pdf")  # Generic AUTOSAR document
pdf2_text_splits = load_and_split_pdf_text("path_of_the_pdf")

# Extract tables from both PDFs
pdf1_html_tables = extract_tables_from_pdf("AUTOSAR_CP_SWS_CANDriver.pdf")  # # Generic AUTOSAR document
pdf2_html_tables = extract_tables_from_pdf("path_of_the_pdf")

# Combine text splits and tables
all_text_splits = pdf1_text_splits + pdf2_text_splits
all_html_tables = pdf1_html_tables + pdf2_html_tables

# Embed text
model = SentenceTransformer('all-MiniLM-L6-v2')
text_embeddings = model.encode([split.page_content for split in all_text_splits])
table_plain_texts = [html_to_plain_text(html_table) for html_table in all_html_tables]
table_embeddings = model.encode(table_plain_texts)

# Store text embeddings in FAISS
dimension = text_embeddings.shape[1]
text_index = faiss.IndexFlatL2(dimension)
text_index.add(np.array(text_embeddings))

# Store table embeddings in FAISS
dimension = table_embeddings.shape[1]
table_index = faiss.IndexFlatL2(dimension)
table_index.add(np.array(table_embeddings))

# Implement a multi-vector retriever
class MultiVectorRetriever:
    def __init__(self, text_index, table_index, model):
        self.text_index = text_index
        self.table_index = table_index
        self.model = model
    
    def retrieve(self, query, k=1):
        query_embedding = self.model.encode([query])
        text_distances, text_indices = self.text_index.search(query_embedding, k)
        table_distances, table_indices = self.table_index.search(query_embedding, k)
        
        retrieved_texts = [all_text_splits[idx].page_content for idx in text_indices[0]]
        retrieved_tables = [all_html_tables[idx] for idx in table_indices[0]]
        
        return retrieved_texts, retrieved_tables

retriever = MultiVectorRetriever(text_index, table_index, model)

# Query the retriever
query = "can_StartTimer?"
retrieved_texts, retrieved_tables = retriever.retrieve(query)

for i, text in enumerate(retrieved_texts):
    print(f"Text {i+1}:\n{text}\n")

for i, table in enumerate(retrieved_tables):
    print(f"Table {i+1}:\n{table}\n")


  from tqdm.autonotebook import tqdm, trange


Text 1:
R01UH0517EJ0100 Rev.1.00 Page 2219 of 3050
Jan 06, 2016RH850/P1x-C Section 25 Generic Timer Module (GTM)
25.19.4.7 GTM0MCSiCTRLSTAT
 
For detail, see Section 25.13.9.11, GTM0MCSiCTRLSTAT (i = 0, 1) .Access: This register can be read/written in 32-bit units.
Address: GTM0MCS0CTRLSTAT: <GTM_base> + 30064H
GTM0MCS1CTRLSTAT: <GTM_base> + 31064H
Value after reset: 000x 0000H
B i t 3 13 02 92 82 72 62 52 42 32 22 12 01 91 81 71 6
———————EN_TIM
_FOUT— — ERR_SRC_ID — —HLT_SP
_OFLRAM_
RST
V a l u e  a f t e r  r e s e t 000000000000000–
R / W RRRRRRRRRRRRRRRR
B i t 1 5 1 4 1 3 1 2 1 1 1 0 9876543210
———— S C D _ C H —————— S C D _ M O D E
V a l u e  a f t e r  r e s e t 0000000000000000
R / W RRRR R / W R R / W R / W RRRRRR R / W R / W

Table 1:
<table><tr><td>ECNT_</td></tr><tr><td>RESET</td><td>ISL</td><td>DSL</td><td>CNTS_</td></tr><tr><td>SEL</td><td>GPR1_SEL</td><td>GPR0_SEL</td><td>—</td><td>CICTRL</td><td>ARU_E</td></tr><tr><td>N</td><td>OSM</td><td>TIM_MODE</td><td>TIM_EN</td></

In [2]:
! ollama -v

ollama version is 0.1.42


In [3]:
! ollama list

NAME            	ID          	SIZE  	MODIFIED   
codellama:latest	8fdf8f752f6e	3.8 GB	2 days ago	
codegemma:latest	0c96700aaada	5.0 GB	2 days ago	


In [7]:
import subprocess
from langchain_core.output_parsers import StrOutputParser
from langchain.prompts import ChatPromptTemplate
from langchain_community.chat_models import ChatOllama
from langchain_core.runnables import RunnableLambda, RunnablePassthrough

# Define the function to handle retriever queries (dummy function for now)
def retrieve_query(query):
    # Dummy function to simulate retrieval
    if "GPT" in query:
        return {"retrieved_texts": ["Dummy retrieval for " + query], "retrieved_tables": []}
    else:
        # Implement retrieval logic for other queries
        pass

# Define the few-shot learning examples
few_shot_examples = [
    {
        "query": "Generate C and H code for GPIO initialization on STM32F4",
        "desired_output": {
            "c_code": """
                #include "stm32f4_gpio.h"
                
                void GPIO_Init(GPIO_TypeDef* GPIOx, GPIO_InitTypeDef* GPIO_InitStruct) {
                    // Implementation for GPIO initialization
                }
                """,
            "h_code": """
                #ifndef STM32F4_GPIO_H
                #define STM32F4_GPIO_H
                
                // Header file content for GPIO initialization
                
                #endif // STM32F4_GPIO_H
                """
        }
    },
    {
        "query": "Generate C code for GPT driver",
        "desired_output": {
            "c_code": """
                // C code for GPT driver implementation
                // Include necessary headers and define functions
                """,
            "h_code": """
                // Header file for GPT driver
                // Define data structures and function prototypes
                """
        }
    },
    {
        "query": "Generate C and H code for PORT driver",
        "desired_output": {
            "c_code": """
                // C code for PORT driver implementation
                // Include necessary headers and define functions
                """,
            "h_code": """
                // Header file for PORT driver
                // Define data structures and function prototypes
                """
        }
    },
    {
        "query": "Generate C and H code for SPI driver",
        "desired_output": {
            "c_code": """
                // C code for SPI driver implementation
                // Include necessary headers and define functions
                """,
            "h_code": """
                // Header file for SPI driver
                // Define data structures and function prototypes
                """
        }
    },
    {
        "query": "Generate C and H code for CAN driver",
        "desired_output": {
            "c_code": """
                // C code for CAN driver implementation
                // Include necessary headers and define functions
                """,
            "h_code": """
                // Header file for CAN driver
                // Define data structures and function prototypes
                """
        }
    }
]

# Define a function to retrieve desired output based on query
def retrieve_desired_output(query):
    for example in few_shot_examples:
        if example["query"].lower() == query.lower():
            # Retrieve desired output
            desired_output = example["desired_output"]
            
            # Filter out non-header lines from the .h code
            h_code_lines = desired_output["h_code"].split('\n')
            filtered_h_code_lines = [line for line in h_code_lines if line.strip().startswith('#') or line.strip().startswith('typedef') or line.strip().startswith('struct')]
            desired_output["h_code"] = '\n'.join(filtered_h_code_lines)
            
            return desired_output
    return None

# Start ollama as a background process with interactive mode enabled
command = "nohup ollama serve --interactive &"
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print("Process ID:", process.pid)

# Prompt template
template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

# Local LLM
ollama_llm = "codellama"
model_local = ChatOllama(model=ollama_llm)

# Define the processing chain
chain = (
    {"context": RunnableLambda(retrieve_query), "question": RunnablePassthrough()}
    | prompt
    | model_local
    | StrOutputParser()
)

# Function to generate C and H code based on user query
def generate_mcal_code(query):
    retrieved_output = chain.invoke(query)
    return retrieved_output

# Function to print generated code instead of saving to a file
def print_generated_code(query):
    generated_code = generate_mcal_code(query)
    print("Generated C and H Code for " + query + ":\n")
    print(generated_code)

# Test the code generation and print for the driver
query = "Generate C and H code for CAN"
print_generated_code(query)


Process ID: 15756
Generated C and H Code for Generate C and H code for CAN:


C Code:
```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <canlib.h>

int main() {
    // Initialize the CAN controller and channel
    int handle = canOpenChannel(0, 0);
    if (handle >= 0) {
        printf("Successfully opened channel.\n");
    } else {
        printf("Failed to open channel. Error code: %d\n", handle);
    }

    // Set the baud rate of the CAN controller
    int ret = canSetBusParams(handle, 500000, 8, 2, 1);
    if (ret != 0) {
        printf("Failed to set bus params. Error code: %d\n", ret);
    } else {
        printf("Successfully set bus params.\n");
    }

    // Send a CAN message
    CAN_message_t msg;
    msg.id = 0x123;
    msg.ext = 0;
    msg.len = 8;
    msg.data[0] = 0xde;
    msg.data[1] = 0xad;
    msg.data[2] = 0xbe;
    msg.data[3] = 0xef;
    int res = canWrite(handle, &msg, CAN_SEND_MSG);
    if (res < 0) {
        printf("Failed to send messag