In [1]:
!pip install chromadb
!pip install sentence-transformers
!pip install langchain



In [2]:
!pip install langchain-community
print("Installed langchain-community.")

Installed langchain-community.


In [3]:
!pip uninstall -y google-generativeai langchain-google-genai google-ai-generativelanguage
!pip install --upgrade google-generativeai
!pip install --upgrade langchain-google-genai

Found existing installation: google-generativeai 0.8.5
Uninstalling google-generativeai-0.8.5:
  Successfully uninstalled google-generativeai-0.8.5
Found existing installation: langchain-google-genai 3.2.0
Uninstalling langchain-google-genai-3.2.0:
  Successfully uninstalled langchain-google-genai-3.2.0
Found existing installation: google-ai-generativelanguage 0.9.0
Uninstalling google-ai-generativelanguage-0.9.0:
  Successfully uninstalled google-ai-generativelanguage-0.9.0
Collecting google-generativeai
  Using cached google_generativeai-0.8.5-py3-none-any.whl.metadata (3.9 kB)
Collecting google-ai-generativelanguage==0.6.15 (from google-generativeai)
  Using cached google_ai_generativelanguage-0.6.15-py3-none-any.whl.metadata (5.7 kB)
Using cached google_generativeai-0.8.5-py3-none-any.whl (155 kB)
Using cached google_ai_generativelanguage-0.6.15-py3-none-any.whl (1.3 MB)
Installing collected packages: google-ai-generativelanguage, google-generativeai
Successfully installed google-a

Collecting langchain-google-genai
  Using cached langchain_google_genai-3.2.0-py3-none-any.whl.metadata (2.7 kB)
Collecting google-ai-generativelanguage<1.0.0,>=0.9.0 (from langchain-google-genai)
  Using cached google_ai_generativelanguage-0.9.0-py3-none-any.whl.metadata (10 kB)
Using cached langchain_google_genai-3.2.0-py3-none-any.whl (57 kB)
Using cached google_ai_generativelanguage-0.9.0-py3-none-any.whl (1.4 MB)
Installing collected packages: google-ai-generativelanguage, langchain-google-genai
  Attempting uninstall: google-ai-generativelanguage
    Found existing installation: google-ai-generativelanguage 0.6.15
    Uninstalling google-ai-generativelanguage-0.6.15:
      Successfully uninstalled google-ai-generativelanguage-0.6.15
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
google-generativeai 0.8.5 requires google-ai-generativelanguage==0.6.15

In [4]:
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import SentenceTransformerEmbeddings

def initialize_vector_store_and_embeddings():
  """
  Initializes a Chroma vector store and a SentenceTransformer embedding model.

  Returns:
    tuple: A tuple containing the embedding model and the Chroma vector store.
  """
  # Create an instance of SentenceTransformerEmbeddings
  # Using 'all-MiniLM-L6-v2' as the model name as specified.
  embedding_model = SentenceTransformerEmbeddings(model_name='all-MiniLM-L6-v2')

  # Create an in-memory instance of Chroma using the embedding model
  vector_store = Chroma(embedding_function=embedding_model)

  return embedding_model, vector_store

print("Function 'initialize_vector_store_and_embeddings' defined.")

Function 'initialize_vector_store_and_embeddings' defined.


In [5]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import DirectoryLoader
from langchain_core.documents import Document
import os

def load_and_index_code(code_input, embedding_model, vector_store):
  """
  Loads code from a directory or a list of snippets, splits them into chunks,
  embeds them, and adds them to the Chroma vector store.

  Args:
    code_input (str or list): A directory path (string) or a list of code snippets (list of strings).
    embedding_model: The SentenceTransformerEmbeddings model.
    vector_store: The Chroma vector store instance.

  Returns:
    str: A confirmation message indicating success.
  """
  documents = []

  if isinstance(code_input, str) and os.path.isdir(code_input):
    # Load from directory if code_input is a directory path
    loader = DirectoryLoader(code_input, glob="**/*.py")
    loaded_docs = loader.load()
    for doc in loaded_docs:
      documents.append(Document(page_content=doc.page_content, metadata=doc.metadata))
  elif isinstance(code_input, list):
    # Create documents from a list of code snippets
    for snippet in code_input:
      documents.append(Document(page_content=snippet))
  else:
    raise ValueError("code_input must be a directory path (string) or a list of code snippets (list).")

  # Initialize RecursiveCharacterTextSplitter
  text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)

  # Split documents into chunks
  code_chunks = text_splitter.split_documents(documents)

  # Add code chunks to the vector store
  vector_store.add_documents(code_chunks)

  return "Code loaded, chunked, embedded, and added to the vector store successfully."

print("Function 'load_and_index_code' defined.")

Function 'load_and_index_code' defined.


In [6]:
import google.generativeai as genai
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
import json

# Re-defining api_key for this cell's scope to resolve NameError
api_key = 'AIzaSyAlYZ1Jc__9nfT3opMVaq_lNURYJXrK8dM' # Directly assigning your API key.

genai.configure(api_key=api_key)

def analyze_code(query: str, embedding_model, vector_store) -> str:
  """
  Analyzes code using the Gemini API via LangChain, retrieving relevant snippets from a vector store based on a query.

  Args:
    query (str): The natural language query describing the code analysis needed.
    embedding_model: The SentenceTransformerEmbeddings model to embed the query.
    vector_store: The Chroma vector store instance to retrieve relevant code snippets.

  Returns:
    str: A JSON string containing the analysis results.
  """

  # 1. Create an embedding for the query
  query_embedding = embedding_model.embed_query(query)

  # 2. Retrieve top N relevant code snippets from the vector store
  # We'll retrieve top 5 for a good balance of context and conciseness.
  relevant_docs = vector_store.similarity_search_by_vector(query_embedding, k=5)

  # 3. Concatenate the page_content of the retrieved documents
  # This combined code will be what Gemini analyzes.
  combined_code_for_analysis = "\n\n".join([doc.page_content for doc in relevant_docs])

  llm = ChatGoogleGenerativeAI(model="gemini-2.5-pro", google_api_key=api_key)

  prompt_template = ChatPromptTemplate.from_messages([
      ("system", "You are a helpful assistant that analyzes code for potential issues and returns the analysis in a structured JSON format."),
      ("human", "Analyze the following code snippet for potential issues. \nProvide the analysis in a structured JSON format, including 'title', 'type', 'severity' (e.g., 'Low', 'Medium', 'High', 'Critical'), 'lineNumber', 'description', and 'suggestedFix' for each issue found.\n\nCode:\n```python\n{code_snippet}\n```\n\nExample JSON format for issues:\n{{\"issues\": [{{\"title\": \"Issue Title\", \"type\": \"Bug\", \"severity\": \"High\", \"lineNumber\": 10, \"description\": \"Detailed description of the issue.\", \"suggestedFix\": \"Recommended fix for the issue.\"}}]}}")
  ])

  output_parser = StrOutputParser()

  chain = prompt_template | llm | output_parser

  # Pass the combined code from RAG to the Gemini API
  response = chain.invoke({"code_snippet": combined_code_for_analysis})

  # Remove markdown code block if present in the response
  if response.startswith('```json') and response.endswith('```'):
    response = response.replace('```json\n', '', 1)
    response = response.replace('\n```', '', 1)

  try:
    parsed_response = json.loads(response)
    return parsed_response
  except json.JSONDecodeError:
    print(f"Error: Invalid JSON string received from model: {response}")
    return {"issues": []}

print("Function 'analyze_code' updated for RAG integration.")

Function 'analyze_code' updated for RAG integration.


In [7]:
import json
import google.generativeai as genai
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

def get_code_metrics(query: str, embedding_model, vector_store) -> dict:
  """
  Calculates summary metrics and issue distribution by directly querying the Gemini API,
  retrieving relevant snippets from a vector store based on a query.

  Args:
    query (str): The natural language query describing the code analysis needed.
    embedding_model: The SentenceTransformerEmbeddings model to embed the query.
    vector_store: The Chroma vector store instance to retrieve relevant code snippets.

  Returns:
    dict: A dictionary containing 'summary_metrics' and 'issue_distribution' from Gemini.
  """
  genai.configure(api_key=api_key)
  llm = ChatGoogleGenerativeAI(model="gemini-2.5-pro", google_api_key=api_key)

  # 1. Create an embedding for the query
  query_embedding = embedding_model.embed_query(query)

  # 2. Retrieve top N relevant code snippets from the vector store
  # We'll retrieve top 5 for a good balance of context and conciseness.
  relevant_docs = vector_store.similarity_search_by_vector(query_embedding, k=5)

  # 3. Concatenate the page_content of the retrieved documents
  # This combined code will be what Gemini analyzes.
  combined_code_for_metrics = "\n\n".join([doc.page_content for doc in relevant_docs])

  prompt_template = ChatPromptTemplate.from_messages([
      ("system", "You are a helpful assistant that analyzes code and provides summary metrics and issue distribution in a structured JSON format."),
      ("human", """Analyze the following code snippet and provide the analysis in a structured JSON format. I need two main sections: 'summary_metrics' and 'issue_distribution'.\n\nFor 'summary_metrics', include:\n- 'code_quality_score' (an integer from 0-100 where higher is better)\n- 'security_rating' (an integer from 0-100 where higher is better)\n- 'bug_density' (count of bugs/runtime errors)\n- 'critical_issue_count' (count of critical severity issues)\n\nFor 'issue_distribution', include:\n- 'security_vulnerabilities' (count of security/vulnerability issues)\n- 'code_smells' (count of code smell issues)\n- 'best_practices' (count of best practice violations, if any)\n- 'performance_issues' (count of performance-related issues, if any)\n\nEnsure the output is a single JSON object. Here's an example of the desired JSON format:\n```json\n{{\n  "summary_metrics": {{\n    "code_quality_score": 85,\n    "security_rating": 90,\n    "bug_density": 1,\n    "critical_issue_count": 0\n  }},\n  "issue_distribution": {{\n    "security_vulnerabilities": 0,\n    "code_smells": 2,\n    "best_practices": 1,\n    "performance_issues": 0\n  }}\n}}\n```\n\nCode:\n```python\n{code_snippet}\n```""")
  ])

  output_parser = StrOutputParser()

  chain = prompt_template | llm | output_parser

  response = chain.invoke({"code_snippet": combined_code_for_metrics})

  # Remove markdown code block if present in the response
  if response.startswith('```json') and response.endswith('```'):
    response = response.replace('```json\n', '', 1)
    response = response.replace('\n```', '', 1)

  try:
    parsed_response = json.loads(response)
    return parsed_response
  except json.JSONDecodeError:
    print(f"Error: Invalid JSON string received from model: {response}")
    return {
        "summary_metrics": {},
        "issue_distribution": {}
    }

print("Function 'get_code_metrics' updated for RAG integration.")

Function 'get_code_metrics' updated for RAG integration.


In [8]:
import json
import google.generativeai as genai
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

def inject_bugs(query: str, embedding_model, vector_store, bug_type: str, severity_level: int, num_bugs: int) -> dict:
  """
  Injects specified types and number of bugs into a given code snippet, retrieved via RAG, using the Gemini API.

  Args:
    query (str): The natural language query describing the code context for bug injection.
    embedding_model: The SentenceTransformerEmbeddings model to embed the query.
    vector_store: The Chroma vector store instance to retrieve relevant code snippets.
    bug_type (str): The type of bug to inject (e.g., 'SQL Injection', 'Division by Zero').
    severity_level (int): The severity level of the bugs (e.g., 1 for low, 5 for critical).
    num_bugs (int): The number of bugs to inject.

  Returns:
    dict: A dictionary containing the modified code with injected bugs and details
          about the injected bugs (e.g., their locations, types, and severities).
  """
  genai.configure(api_key=api_key)
  llm = ChatGoogleGenerativeAI(model="gemini-2.5-pro", google_api_key=api_key)

  # 1. Create an embedding for the query
  query_embedding = embedding_model.embed_query(query)

  # 2. Retrieve top N relevant code snippets from the vector store
  # We'll retrieve top 5 for a good balance of context and conciseness.
  relevant_docs = vector_store.similarity_search_by_vector(query_embedding, k=5)

  # 3. Concatenate the page_content of the retrieved documents
  # This combined code will be what Gemini injects bugs into.
  combined_code_for_injection = "\n\n".join([doc.page_content for doc in relevant_docs])

  prompt_template = ChatPromptTemplate.from_messages([
      ("system", "You are a helpful assistant that injects bugs into code based on given parameters and returns the modified code and bug details in JSON format."),
      ("human", """Inject {num_bugs} bugs of type '{bug_type}' with severity level {severity_level} into the following Python code snippet.
Provide the output in a structured JSON format with two keys: 'buggy_code' (containing the full modified code) and 'bugs_injected' (an array of objects, where each object describes an injected bug with 'type', 'line_number', and 'description').

Code:
```python
{code_snippet}
```

Example JSON format:
{{
  "buggy_code": "def example_function():\n    # Some example code without further template variables\n    return 0",
  "bugs_injected": [
    {{
      "type": "{bug_type}", "line_number": 2, "description": "Description of the injected bug."
    }}
  ]
}}
""")
  ])

  output_parser = StrOutputParser()

  chain = prompt_template | llm | output_parser

  response = chain.invoke({
      "code_snippet": combined_code_for_injection,
      "num_bugs": num_bugs,
      "bug_type": bug_type,
      "severity_level": severity_level
  })

  # Remove markdown code block if present in the response
  if response.startswith('```json') and response.endswith('```'):
    response = response.replace('```json\n', '', 1)
    response = response.replace('\n```', '', 1)

  try:
    parsed_response = json.loads(response)
    return parsed_response
  except json.JSONDecodeError:
    print(f"Error: Invalid JSON string received from model: {response}")
    return {
        "buggy_code": combined_code_for_injection, # Return original code retrieved from RAG on error
        "bugs_injected": []
    }

print("Function 'inject_bugs' updated for RAG integration.")

Function 'inject_bugs' updated for RAG integration.


In [12]:
import json

# Provided code snippet for testing
long_sample_code = """import sqlite3
import threading
import time

class UserManager:
    def __init__(self, db_path="users.db"):
        self.conn = sqlite3.connect(db_path)
        self.cursor = self.conn.cursor()
        self.cache = {}

    def add_user(self, username, password):
        # Logic error: storing password in plain text
        self.cursor.execute(f"INSERT INTO users (name, password) VALUES ('{username}', '{password}')")
        self.conn.commit()

    def get_user(self, username):
        # Security vulnerability: SQL injection risk
        query = f"SELECT * FROM users WHERE name = '{username}'"
        self.cursor.execute(query)
        return self.cursor.fetchall()

    def cache_user(self, username):
        # Potential memory leak: cache never cleared
        user = self.get_user(username)
        self.cache[username] = user

def worker_task(manager, username):
    # Concurrency bug: race condition on shared cache
    for _ in range(1000):
        manager.cache_user(username)

def heavy_computation(n):
    # Performance issue: inefficient recursion
    if n <= 1:
        return n
    return heavy_computation(n-1) + heavy_computation(n-2)

def main():
    manager = UserManager()
    manager.add_user("Alice", "password123")
    manager.add_user("Bob", "hunter2")

    # Start multiple threads (race condition risk)
    threads = []
    for name in ["Alice", "Bob"]:
        t = threading.Thread(target=worker_task, args=(manager, name))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    # Trigger performance bottleneck
    print("Fibonacci(30):"), heavy_computation(30))

    # Crash bug: division by zero
    print("Divide:", 10 / 0)

if __name__ == "__main__":
    main()
"""

# 1. Initialize vector store and embeddings
# This assumes initialize_vector_store_and_embeddings is already defined and working.
embedding_model, vector_store = initialize_vector_store_and_embeddings()
print("RAG system initialized.")

# 2. Load the new sample code into the vector store
load_and_index_code([long_sample_code], embedding_model, vector_store)
print("Long sample code loaded into vector store.")

# --- Test analyze_code ---
print("\n=== Testing analyze_code ===")
# query_analyze_long_code = "Analyze the UserManager class and main function for all potential issues."
query_analyze_long_code = "Analyze the code for potential bugs, security risks, performance issues, and bestâ€‘practice violations."
analysis_result_long = analyze_code(
    query=query_analyze_long_code,
    embedding_model=embedding_model,
    vector_store=vector_store
)
print("\n--- Analysis Result (long_sample_code) ---\n")
print(json.dumps(analysis_result_long, indent=2))

# --- Test get_code_metrics ---
print("\n=== Testing get_code_metrics ===")
# query_metrics_long_code = "Provide code quality and security metrics for the UserManager class and related functions."
query_metrics_long_code = "Provide overall code quality, security metrics, bug density, and issue distribution."
metrics_result_long = get_code_metrics(
    query=query_metrics_long_code,
    embedding_model=embedding_model,
    vector_store=vector_store
)
print("\n--- Metrics Result (long_sample_code) ---\n")
print(json.dumps(metrics_result_long, indent=2))

# --- Test inject_bugs ---
print("\n=== Testing inject_bugs ===")
# query_inject_bug_long_code = "Inject a security vulnerability into the get_user function and a performance issue into heavy_computation."
query_inject_bug_long_code = "Inject bugs into the code and return details."
bug_injection_result_long = inject_bugs(
    query=query_inject_bug_long_code,
    embedding_model=embedding_model,
    vector_store=vector_store,
    bug_type="Security Vulnerability",
    severity_level=5,
    num_bugs=2
)
print("\n--- Bug Injection Result (long_sample_code) ---\n")

print("Buggy Code:\n```python\n" + bug_injection_result_long['buggy_code'] + "\n```")
print("\nBugs Injected:\n")
print(json.dumps(bug_injection_result_long['bugs_injected'], indent=2))

RAG system initialized.
Long sample code loaded into vector store.

=== Testing analyze_code ===

--- Analysis Result (long_sample_code) ---

{
  "issues": [
    {
      "title": "Storing Passwords in Plain Text",
      "type": "Security",
      "severity": "Critical",
      "lineNumber": 161,
      "description": "The `add_user` method stores user passwords directly in the database without any hashing. This is a major security risk, as a database breach would expose all user passwords.",
      "suggestedFix": "Hash passwords using a strong, salted hashing algorithm like Argon2, scrypt, or bcrypt before storing them. Use a library like `passlib` for robust password handling."
    },
    {
      "title": "SQL Injection Vulnerability",
      "type": "Security",
      "severity": "Critical",
      "lineNumber": 161,
      "description": "The database queries in `add_user` (line 161) and `get_user` (line 166) are constructed using f-strings, which directly embeds user input into the SQL st