In [50]:
import argparse
import os
import shutil
from langchain_community.document_loaders import UnstructuredMarkdownLoader , PyPDFDirectoryLoader , TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.schema.document import Document
from get_embedding_function import get_embedding_function
from langchain_community.vectorstores import Chroma


CHROMA_PATH = "chroma"
DATA_PATH = "data"
DATA_PATH_Md = "data/resources/"

In [1]:
from langchain_openai import AzureOpenAIEmbeddings
import os
from dotenv import load_dotenv

import os

load_dotenv()



def get_embedding_function():
    #os.environ['OPENAI_API_KEY'] = 
    embeddings = AzureOpenAIEmbeddings(
    azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
    api_key=os.getenv("AZURE_OPENAI_API_KEY"),
    #azure_deployment="${}",
    openai_api_version="2025-01-01-preview",  # or whatever version your deployment uses
)
    return embeddings

In [51]:
def main():

    # Check if the database should be cleared (using the --clear flag).
    parser = argparse.ArgumentParser()
    parser.add_argument("--reset", action="store_true", help="Reset the database.")
    args = parser.parse_args()
    if args.reset:
        print("✨ Clearing Database")
        clear_database()

    # Create (or update) the data store.
    documents = load_documents()
    chunks = split_documents(documents)
    print(f"Number of documents: {len(chunks)}")
    add_to_chroma(chunks)


def load_documents():
    document_loader = PyPDFDirectoryLoader(DATA_PATH)
    # Iterate over md files in DATA_PATH_Md path and run UnstructuredMarkdownLoader on each file
    documents = []
    for i in os.listdir(DATA_PATH_Md):
        print(i)
        if i.endswith(".md"):
            document_loader_md = UnstructuredMarkdownLoader(os.path.join(DATA_PATH_Md, i))
            documents += document_loader_md.load()
        if i.endswith(".go"):
            document_loader_txt = TextLoader(os.path.join(DATA_PATH, i))
            documents += document_loader_txt.load()
    #document_loader_md = TextDirectoryLoader(DATA_PATH_Md)
    documents_all = document_loader.load() + document_loader_md.load()
    return documents_all


def split_documents(documents: list[Document]):
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=800,
        chunk_overlap=80,
        length_function=len,
        is_separator_regex=False,
    )
    return text_splitter.split_documents(documents)


def add_to_chroma(chunks: list[Document]):
    # Load the existing database.
    db = Chroma(
        persist_directory=CHROMA_PATH, embedding_function=get_embedding_function()
    )

    # Calculate Page IDs.
    chunks_with_ids = calculate_chunk_ids(chunks)

    # Add or Update the documents.
    existing_items = db.get(include=[])  # IDs are always included by default
    existing_ids = set(existing_items["ids"])
    print(f"Number of existing documents in DB: {len(existing_ids)}")

    # Only add documents that don't exist in the DB.
    new_chunks = []
    for chunk in chunks_with_ids:
        if chunk.metadata["id"] not in existing_ids:
            new_chunks.append(chunk)

    if len(new_chunks):
        print(f"👉 Adding new documents: {len(new_chunks)}")
        new_chunk_ids = [chunk.metadata["id"] for chunk in new_chunks]
        db.add_documents(new_chunks, ids=new_chunk_ids)
        db.persist()
    else:
        print("✅ No new documents to add")


def calculate_chunk_ids(chunks):

    # This will create IDs like "data/monopoly.pdf:6:2"
    # Page Source : Page Number : Chunk Index

    last_page_id = None
    current_chunk_index = 0

    for chunk in chunks:
        source = chunk.metadata.get("source")
        page = chunk.metadata.get("page")
        current_page_id = f"{source}:{page}"

        # If the page ID is the same as the last one, increment the index.
        if current_page_id == last_page_id:
            current_chunk_index += 1
        else:
            current_chunk_index = 0

        # Calculate the chunk ID.
        chunk_id = f"{current_page_id}:{current_chunk_index}"
        last_page_id = current_page_id

        # Add it to the page meta-data.
        chunk.metadata["id"] = chunk_id

    return chunks


def clear_database():
    if os.path.exists(CHROMA_PATH):
        shutil.rmtree(CHROMA_PATH)


if __name__ == "__main__":
    main()


usage: ipykernel_launcher.py [-h] [--reset]
ipykernel_launcher.py: error: unrecognized arguments: --f=/Users/unasra/Library/Jupyter/runtime/kernel-v3ea979829268b375332ba3a926181f75d487cea2e.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [None]:
import os
os.environ['LANGCHAIN_TRACING'] = 'true'
os.environ['LANGCHAIN_ENDPOINT'] = 'https://api.smith.langchain.com'
os.environ['LANGCHAIN_API_KEY'] = "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"


In [84]:
# Script: Extract Pydantic BaseModel Schemas and Use them with LLM Structured Output

def load_tf_script(context: str,user_prompt: str):

    import os
    import importlib.util
    import inspect
    import importlib
    import sys
    import dns_config
    from pathlib import Path
    from typing import  Type , Any, ClassVar, Dict, List, Optional
    from pydantic import BaseModel , StrictBool, StrictInt, StrictStr
    from datetime import datetime
    from langchain.llms import OpenAI
    from langchain.output_parsers import PydanticOutputParser
    from langchain_openai import AzureChatOpenAI
    from langchain.prompts import ChatPromptTemplate



    model_registry: dict[str, Type[BaseModel]] = {}
    loaded_modules = {}

    SCHEMAS_PACKAGE = "dns_config"  # Should contain __init__.py importing all models

    model_registry: Dict[str, Type[BaseModel]] = {}

    try:
        schemas_module = importlib.import_module(SCHEMAS_PACKAGE)
    except Exception as e:
        raise ImportError(f"Could not import {SCHEMAS_PACKAGE}: {e}")

    # Extract BaseModels and rebuild them
    for name, obj in inspect.getmembers(schemas_module):
        if inspect.isclass(obj) and issubclass(obj, BaseModel):
            try:
                obj.model_rebuild(force=True)  # Rebuild to resolve forward refs
                model_registry[name.lower()] = obj  # store by lowercase name for keyword matching
            except Exception as e:
                print(f"Failed to rebuild model {name}: {e}")

    # 2. Accept user input and select appropriate schema
    #user_prompt = "Create a Server"

    matched_models = []
    for keyword, model_class in model_registry.items():
        if keyword in user_prompt.lower():
            matched_models.append((keyword, model_class))

    if not matched_models:
        raise ValueError("No matching schema found for user input.")


    # 3. Use with LangChain structured output
    llm = AzureChatOpenAI(model_name="gpt-4.1", temperature=0,api_key=os.getenv("AZURE_OPENAI_API_KEY"),api_version="2025-01-01-preview")
    combined_outputs = {}

    # 3. Generate structured outputs for each matched model
    for keyword, matched_model in matched_models:
        print(f"\nUsing schema: {matched_model.__name__}")
        #print(matched_model.model_json_schema())
        #matched_model.model_rebuild()
        parser = PydanticOutputParser(pydantic_object=matched_model)

        prompt = f"""
        Generate a valid terraform HCL matching this requirements:
        You are an expert at converting sanitzing Terraform HCL Code
        You are given a schema and an unsanitized HCL code block. Your task is to generate a sanitized HCL code block that matches the schema.

        {parser.get_format_instructions()}

        Unsanitized Code Block : {context}
        
        """
        prompt_template = ChatPromptTemplate.from_template(prompt)
        prompt2 = prompt_template.format(context=context)
        # Only generate the output relevant to: {keyword}
        #prompt_template = ChatPromptTemplate.from_template(prompt)
        response = llm.invoke(prompt2)
        #print(f"Response: {response.pretty_print()}")
        #parsed_output = parser.parse(response.content)
        return response.content

    # 4. Print combined output
    print("\nCombined Output:")
    print(combined_outputs)


In [86]:
import argparse
from langchain.vectorstores.chroma import Chroma
from langchain.prompts import ChatPromptTemplate
from langchain_community.llms.ollama import Ollama
from langchain_openai import AzureChatOpenAI
from dotenv import load_dotenv
import os

load_dotenv()

from get_embedding_function import get_embedding_function

CHROMA_PATH = "chroma"


PROMPT_TEMPLATE = """
Answer the question based only on the following context:

{context}

---

Answer the question based on the above context: {question}
"""


def main():
    # Create CLI.
    # parser = argparse.ArgumentParser()
    # parser.add_argument("query_text", type=str, help="The query text.")
    # args = parser.parse_args()
    # query_text = args.query_text
    query_text = "Create a DNS View"
    query_rag(query_text)


def query_rag(query_text: str):
    # Prepare the DB.
    embedding_function = get_embedding_function()
    db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embedding_function)

    # Search the DB.
    results = db.similarity_search_with_score(query_text, k=5)

    context_text = "\n\n---\n\n".join([doc.page_content for doc, _score in results])
    prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE)
    prompt = prompt_template.format(context=context_text, question=query_text)
    # print(prompt)


    model = AzureChatOpenAI(model_name="gpt-4.1", temperature=0,api_key=os.getenv("AZURE_OPENAI_API_KEY"),api_version="2025-01-01-preview")
    response_text = model.invoke(prompt)

    #sources = [doc.metadata.get("id", None) for doc, _score in results]
    formatted_response = f"Response: {response_text}"
    #print(formatted_response)

    print(load_tf_script(response_text.content,query_text))
    
    return response_text


if __name__ == "__main__":
    main()



Using schema: View


KeyError: '"properties"'