In [18]:
from git import Repo

# Loading the repo 
from langchain.document_loaders.generic import GenericLoader
from langchain_community.document_loaders.git import GitLoader

# Parsing through the documents to create chunks
from langchain.text_splitter import Language
from langchain.document_loaders.parsers import LanguageParser
from langchain.text_splitter import RecursiveCharacterTextSplitter


from langchain_openai import OpenAIEmbeddings, ChatOpenAI

from langchain.vectorstores import Chroma

from langchain.memory import ConversationSummaryMemory
from langchain.chains import ConversationalRetrievalChain

import os


In [2]:
repo_path = "test_repo/"
repo_link = "https://github.com/tarang1998/bodyScienceLLM"
repo = Repo.clone_from(repo_link, to_path=repo_path)

GitCommandError: Cmd('git') failed due to: exit code(128)
  cmdline: git clone -v -- https://github.com/tarang1998/bodyScienceLLM test_repo/
  stderr: 'fatal: destination path 'test_repo' already exists and is not an empty directory.
'

In [3]:
# Define supported programming languages and their file extensions
SUPPORTED_LANGUAGES =  {
    Language.PYTHON: {
        'suffixes': ['.py', '.pyx', '.pyi'],
        'language': Language.PYTHON
    },
    Language.JS: {
        'suffixes': ['.js', '.jsx', '.ts', '.tsx'],
        'language': Language.JS
    },
    Language.JAVA: {
        'suffixes': ['.java'],
        'language': Language.JAVA
    },
    Language.CPP: {
        'suffixes': ['.cpp', '.cc', '.cxx', '.h', '.hpp'],
        'language': Language.CPP
    },
    Language.CSHARP: {
        'suffixes': ['.cs'],
        'language': Language.CSHARP
    },
    Language.PHP: {
        'suffixes': ['.php'],
        'language': Language.PHP
    },
    Language.RUBY: {
        'suffixes': ['.rb'],
        'language': Language.RUBY
    },
    Language.RUST: {
        'suffixes': ['.rs'],
        'language': Language.RUST
    },
    Language.GO: {
        'suffixes': ['.go'],
        'language': Language.GO
    },
    Language.SCALA: {
        'suffixes': ['.scala'],
        'language': Language.SCALA
    },
    Language.KOTLIN: {
        'suffixes': ['.kt', '.kts'],
        'language': Language.KOTLIN
    },
    Language.LUA: {
        'suffixes': ['.lua'],
        'language': Language.LUA
    },
    Language.PERL: {
        'suffixes': ['.pl', '.pm'],
        'language': Language.PERL
    },
    Language.ELIXIR: {
        'suffixes': ['.ex', '.exs'],
        'language': Language.ELIXIR
    },
    Language.COBOL: {
        'suffixes': ['.cob', '.cbl'],
        'language': Language.COBOL
    }
}



In [4]:
# Text and document files (use default chunking)
TEXT_FILES = {
    '.txt', '.md', '.markdown', '.rst', '.adoc', '.asciidoc',
    '.json', '.yaml', '.yml', '.toml', '.ini', '.cfg', '.conf',
    '.html', '.htm', '.css', '.scss', '.sass',
    '.csv', '.tsv', '.xml', '.rss', '.atom',
    '.mdx', '.log', '.readme', '.license', '.changelog',
    '.dockerfile', '.dockerignore', '.gitignore', '.gitattributes',
    '.env', '.env.example', '.env.local',
    '.sh', '.bash', '.zsh', '.fish',  # Shell scripts
    '.sql', '.psql', '.mysql',  # SQL files
    '.dockerfile', '.docker-compose.yml', '.docker-compose.yaml',
    '.yaml', '.yml',  # YAML files
    '.json', '.jsonc',  # JSON files
    '.xml', '.xsd', '.xslt',  # XML files
    '.html', '.htm', '.xhtml',  # HTML files
    '.css', '.scss', '.sass', '.less',  # CSS files
    '.md', '.markdown', '.mdown',  # Markdown files
    '.txt', '.text',  # Plain text files
    '.log', '.out', '.err',  # Log files
    '.ini', '.cfg', '.conf', '.config',  # Config files
    '.toml', '.lock',  # TOML files
    '.rss', '.atom', '.feed',  # Feed files
    '.csv', '.tsv', '.tab',  # Data files
    '.readme', '.license', '.changelog', '.contributing',  # Documentation
    '.gitignore', '.gitattributes', '.gitmodules',  # Git files
    '.dockerignore', '.dockerfile',  # Docker files
    '.env', '.env.example', '.env.local', '.env.production',  # Environment files
}


In [5]:
def get_file_extension(file_path):
    """Extract file extension from path"""
    return os.path.splitext(file_path)[1].lower()

def create_language_filter(languages=None):
    """Create a file filter for specified languages"""
    if languages is None:
        # Include all supported extensions
        allowed_extensions = []
        for value in SUPPORTED_LANGUAGES.values():
            allowed_extensions.extend(value["suffixes"])

        allowed_extensions.extend(TEXT_FILES)
    else:
        # Include only specified languages
        allowed_extensions = []
        for lang in languages:
            if lang in SUPPORTED_LANGUAGES:
                allowed_extensions.extend(SUPPORTED_LANGUAGES[lang]["suffixes"])
            elif lang == "TEXT":
                allowed_extensions.extend(TEXT_FILES)
    def file_filter(file_path):
        return get_file_extension(file_path) in allowed_extensions
    
    return file_filter



In [6]:

# Load documents for each language
all_documents = []

for language, config in SUPPORTED_LANGUAGES.items():
    try:
        loader = GenericLoader.from_filesystem(
            repo_path,
            glob="**/*",
            suffixes=config['suffixes'],
            parser=LanguageParser(
                language=config['language'],
                parser_threshold=50
            )
        )
        
        documents = loader.load()
        all_documents.extend(documents)
        print(f"Loaded {len(documents)} {language.name} files")
        print(documents)
        
    except Exception as e:
        print(f"Failed to load {language.name} files: {e}")

print(f"Total documents loaded: {len(all_documents)}")

Loaded 10 PYTHON files
[Document(metadata={'source': 'test_repo\\ test.py', 'language': <Language.PYTHON: 'python'>}, page_content=''), Document(metadata={'source': 'test_repo\\app.py', 'content_type': 'functions_classes', 'language': <Language.PYTHON: 'python'>}, page_content="def index():\n    return render_template('chat.html')"), Document(metadata={'source': 'test_repo\\app.py', 'content_type': 'functions_classes', 'language': <Language.PYTHON: 'python'>}, page_content='def chat():\n    msg = request.form["msg"]\n    input = msg\n    print(input)\n    response = rag_chain.invoke({"input": msg})\n    print("Response : ", response["answer"])\n    return str(response["answer"])'), Document(metadata={'source': 'test_repo\\app.py', 'content_type': 'simplified_code', 'language': <Language.PYTHON: 'python'>}, page_content='from flask import Flask, render_template, jsonify, request\nfrom src.helper import download_hugging_face_embeddings\nfrom langchain_pinecone import PineconeVectorStore\

In [7]:
# Load with GitLoader first
git_loader = GitLoader(
    
    repo_path= repo_path,
    clone_url= repo_link,
    branch = "main",
    file_filter=create_language_filter()  # All languages
)

# Load documents
documents = git_loader.load()
documents



[Document(metadata={'source': ' test.py', 'file_path': ' test.py', 'file_name': ' test.py', 'file_type': '.py'}, page_content=''),
 Document(metadata={'source': 'README.md', 'file_path': 'README.md', 'file_name': 'README.md', 'file_type': '.md'}, page_content='# BodyScienceLLM\r\n\r\n![alt text](./screenshots/image.png)\r\n\r\n### Techology and Tools used\r\n- Python (version 3.9)\r\n- Lang Chain \r\n- Hugging Face Embedding Model \r\n- PineCone\r\n- OpenAI \r\n- Flask\r\n\r\n### Running the application\r\n\r\n- Clone the repository\r\n```\r\ngit clone https://github.com/tarang1998/bodyScienceLLM.git\r\n```\r\n- Create and activate the python environment\r\n\r\n```\r\nconda create -n bodyScience python=3.10 -y\r\nconda activate bodyScience\r\n```\r\n\r\n- Install the requirements \r\n```\r\npip install -r requirements.txt\r\n```\r\n\r\n- Add the OPENAI_API_KEY and PINECONE_API_KEY in the .env file \r\n\r\n- Save the vector embeddings to Pinecone\r\n```\r\npython store_index.py\r\n```\r

In [8]:
# Process documents with language-specific parsing + fallback
processed_docs = []

for doc in documents:
    file_ext = get_file_extension(doc.metadata.get('source', ''))
    
    # Determine language from file extension
    language = None
    for lang, value in SUPPORTED_LANGUAGES.items():
        if file_ext in value["suffixes"]:
            language = lang
            break
    
    # if language:
    #     # Use language-specific parser for supported languages
    #     # Makes code semantics explicit
    #     # Improves AI understanding
    #     # Enables better search and retrieval
    #     # Facilitates code analysis
    #     # Enhances documentation generation
    #     # Improves vector embeddings
    #     # Enables precise code queries
    #     try:
    #         language_parser = LanguageParser(
    #             language=language,
    #             # parser_threshold=500
    #         )
    #         # Parse the content with language-specific parser
    #         parsed_content = language_parser.parse(doc)
    #         doc.page_content = parsed_content
    #         print(f"Used language parser for {file_ext}: {doc.metadata.get('source', 'Unknown')}")
    #     except Exception as e:
    #         print(f"Language parser failed for {file_ext}, using default: {e}")
    #         # Fallback to default parsing (no change to content)
    # else:
    #     # Use default parsing for text files (no language parser)
    #     print(f"Using default parser for {file_ext}: {doc.metadata.get('source', 'Unknown')}")

    
    # Apply text splitting (language-aware if possible, otherwise default)
    # Language specific enables the parser to understand Python code structure (functions, classes, imports, etc.)
    # It can intelligently split code based on Python syntax rather than just arbitrary text breaks
    # The parser will try to break Python code at logical boundaries (like function definitions, class boundaries, etc.)
    # Instead of breaking mid-function or mid-statement, it respects Python syntax
    # Preserves context: Keeps related code together (e.g., a function and its docstring)
    # Maintains structure: Respects Python's indentation and block structure
    # Better embeddings: Creates more meaningful chunks for AI analysis
    try:
        if language:
            # Use language-specific text splitter
            text_splitter = RecursiveCharacterTextSplitter.from_language(
                language=language,
                chunk_size=500,
                chunk_overlap=20
            )
            print("Using language-specific text splitter : ", doc.metadata)
        else:
            # Use default text splitter for non-programming files
            text_splitter = RecursiveCharacterTextSplitter(
                chunk_size=500,
                chunk_overlap=20
            )
            print("Using default text splitter : ", doc.metadata)
        
        chunks = text_splitter.split_documents([doc])
        processed_docs.extend(chunks)

    except Exception as e:
        print(f"Text splitting failed for {file_ext}, using default: {e}")
        # Fallback to default text splitting
        default_splitter = RecursiveCharacterTextSplitter(
            chunk_size=500,
            chunk_overlap=20
        )
        chunks = default_splitter.split_documents([doc])
        processed_docs.extend(chunks)

print(f"\nTotal documents loaded: {len(documents)}")
print(f"Total chunks created: {len(processed_docs)}")
print(f"Files processed:")
for doc in documents:
    file_path = doc.metadata.get('source', 'Unknown')
    file_ext = get_file_extension(file_path)
    print(f"  - {file_path} ({file_ext})")


Using language-specific text splitter :  {'source': ' test.py', 'file_path': ' test.py', 'file_name': ' test.py', 'file_type': '.py'}
Using default text splitter :  {'source': 'README.md', 'file_path': 'README.md', 'file_name': 'README.md', 'file_type': '.md'}
Using language-specific text splitter :  {'source': 'app.py', 'file_path': 'app.py', 'file_name': 'app.py', 'file_type': '.py'}
Using default text splitter :  {'source': 'requirements.txt', 'file_path': 'requirements.txt', 'file_name': 'requirements.txt', 'file_type': '.txt'}
Using language-specific text splitter :  {'source': 'setup.py', 'file_path': 'setup.py', 'file_name': 'setup.py', 'file_type': '.py'}
Using language-specific text splitter :  {'source': 'store_index.py', 'file_path': 'store_index.py', 'file_name': 'store_index.py', 'file_type': '.py'}
Using language-specific text splitter :  {'source': 'template.py', 'file_path': 'template.py', 'file_name': 'template.py', 'file_type': '.py'}
Using language-specific text spli

In [11]:
from dotenv import load_dotenv
load_dotenv()

OPENAI_API_KEY=os.environ.get('OPENAI_API_KEY')


In [14]:
embeddings=OpenAIEmbeddings(disallowed_special=())

In [15]:

vectordb = Chroma.from_documents(processed_docs, embedding=embeddings, persist_directory='./db')

In [20]:
# llm = ChatOpenAI(model_name="gpt-4")
llm = ChatOpenAI()

In [23]:
# This is a memory class from LangChain that keeps track of the conversation history, 
# but instead of storing the entire chat log, it stores a summary of the conversation so far. 
# This is useful for long conversations where you want to keep context but not overload the model with too much text.

memory = ConversationSummaryMemory(llm=llm, memory_key="chat_history", return_messages=True)

In [24]:
# This creates a chain that:
# Accepts user questions
# Retrieves relevant documents from a vector database
# Uses an LLM to generate answers, considering both the retrieved documents and the conversation history


qa = ConversationalRetrievalChain.from_llm(
    llm, 
    # MMR stands for Maximum Marginal Relevance.
    # Instead of fetching the most similar documents outright (like normal similarity search), MMR balances:
    # Relevance to your query.
    # Diversity among returned documents.
    retriever=vectordb.as_retriever(search_type="mmr", search_kwargs={"k":8}), 
    memory=memory)


In [26]:
question = "what is download_hugging_face_embeddings funtion?"
result = qa(question)
print(result['answer'])

The `download_hugging_face_embeddings` function is defined in the code to download embeddings using the Hugging Face model called 'sentence-transformers/all-MiniLM-L6-v2'. This function returns the embeddings generated by the specified Hugging Face model, which typically has 384 dimensions.


In [27]:
question = "what is load_pdf_file funtion?"
result = qa(question)
print(result['answer'])

The purpose of the `load_pdf_file` function is to extract data from PDF files. It uses a directory loader to load PDF documents and returns the extracted documents.
