In [51]:
import os
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from langchain.llms import HuggingFacePipeline
from langchain.vectorstores import Chroma
from langchain.document_loaders import PyPDFDirectoryLoader, PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.chains import RetrievalQA
from langchain.chains.question_answering import load_qa_chain
import chromadb
import textwrap


In [52]:
def pretty_print_response(response, wrap_width=80):
    """
    Formats and prints the output of the RetrievalQA pipeline.
    
    If the result contains multiple Q/A pairs (e.g. an initial context and a final answer),
    this function will extract and print only the final pair.
    
    Args:
        response (dict): The response dictionary containing 'query', 'result', and 'source_documents'.
        wrap_width (int): The width for word-wrapping long lines.
    """
    # Print Query (the input query)
    query = response.get("query", "No query provided.")
    print("=" * 20 + " Query " + "=" * 20)
    print(query)
    print()

    # Process the result text
    result = response.get("result", "No result provided.")
    
    # Define markers for identifying Q/A sections.
    question_marker = "Question:"
    answer_marker = "Helpful Answer:"
    
    # If both markers exist, extract the final Q/A pair.
    if question_marker in result and answer_marker in result:
        # Find the last occurrence of "Question:" to ignore earlier context
        q_index = result.rfind(question_marker)
        final_section = result[q_index:]
        # Split the final section into question and answer parts.
        parts = final_section.split(answer_marker)
        if len(parts) >= 2:
            final_question = parts[0].strip()  # This includes the "Question:" text.
            final_answer = parts[1].strip()
            print("=" * 20 + " Final Q/A " + "=" * 20)
            print(textwrap.fill(final_question, width=wrap_width))
            print()
            print(textwrap.fill("Answer: " + final_answer, width=wrap_width))
        else:
            # If splitting doesn't work as expected, just print the final section.
            print("=" * 20 + " Answer " + "=" * 20)
            print(textwrap.fill(final_section, width=wrap_width))
    else:
        # Otherwise, just print the result as-is.
        print("=" * 20 + " Answer " + "=" * 20)
        print(textwrap.fill(result, width=wrap_width))
    
    print()

    # Print Source Documents
    source_documents = response.get("source_documents", [])
    print("=" * 20 + " Source Documents " + "=" * 20)
    if not source_documents:
        print("No source documents available.")
    else:
        for idx, doc in enumerate(source_documents, start=1):
            # Assuming each doc has a 'page_content' attribute
            content = getattr(doc, "page_content", str(doc))
            print(f"--- Document {idx} ---")
            print(textwrap.fill(content, width=wrap_width))
            print()

In [53]:
# =============================================================================
# Step 1: Load the Chat Model and Tokenizer
# -----------------------------------------------------------------------------
model_name = "tiiuae/Falcon3-1B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(model_name, token=True)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    token=True,
    torch_dtype=torch.bfloat16,
    low_cpu_mem_usage=True
)




In [54]:
# =============================================================================
# Step 2: Initialize the Embedding Model
# -----------------------------------------------------------------------------
# We use the HuggingFaceEmbeddings wrapper for the sentence-transformers model.
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")


In [55]:
# =============================================================================
# Step 3: Load and Split PDF Documents
# -----------------------------------------------------------------------------
# Specify the path to a PDF file or a directory containing PDFs.
#pdf_path = "/home/kali/tools/jupyter_notebook/rag_docs/Certified_Pre-Owned.pdf"
pdf_path = "/home/kali/tools/jupyter_notebook/rag_docs/"   #Will multiple docs work?

# Choose the appropriate loader: if it's a directory, use PyPDFDirectoryLoader;
# if it's a single file, use PyPDFLoader.
if os.path.isdir(pdf_path):
    pdf_loader = PyPDFDirectoryLoader(pdf_path)
else:
    from langchain.document_loaders import PyPDFLoader  # Import here if needed
    pdf_loader = PyPDFLoader(pdf_path)
 
# Load the documents from the PDF(s)
documents = pdf_loader.load()
 
# Check if any documents were loaded
if not documents:
    raise ValueError("No documents were loaded. Check the PDF path or file format.")
 
# Split documents into smaller chunks to improve retrieval performance.
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
docs = text_splitter.split_documents(documents)
 
# Extract text from each document chunk. Ensure each chunk has text.
doc_texts = [doc.page_content for doc in docs if hasattr(doc, "page_content") and doc.page_content.strip()]
 
if not doc_texts:
    raise ValueError("No text found in document chunks. Check document contents or splitting logic.")


In [56]:
# =============================================================================
# Step 4: Compute Document Embeddings
# -----------------------------------------------------------------------------
# Compute embeddings for all document chunks.
doc_embeddings = embedding_model.embed_documents(doc_texts)
 
if not doc_embeddings:
    raise ValueError("Computed embeddings are empty. Check the embedding model and input texts.")
 
# Generate simple IDs for each document chunk.
doc_ids = [str(i) for i in range(len(doc_texts))]


In [57]:
# =============================================================================
# Step 5: Store Document Embeddings in ChromaDB
# -----------------------------------------------------------------------------
# Initialize the Chroma client and create a collection for embeddings.

try:
    chroma_client.delete_collection(name="document_embeddings")
except Exception as e:
    print(e, "Proceeding to create collection now...")

chroma_client = chromadb.Client()
collection = chroma_client.create_collection(name="document_embeddings")

 
# Add documents, their embeddings, and IDs to the collection.
collection.add(
    documents=doc_texts,
    embeddings=doc_embeddings,
    ids=doc_ids
)



In [58]:
# =============================================================================
# Step 6: Set Up the Retriever using LangChain's Chroma Vector Store
# -----------------------------------------------------------------------------
# Initialize the vectorstore using the existing Chroma client and convert it into a retriever.
vectorstore = Chroma(
    collection_name="document_embeddings",
    embedding_function=embedding_model,
    client=chroma_client
)
retriever = vectorstore.as_retriever()


In [59]:
# =============================================================================
# Step 7: Set Up the Retrieval Augmented Generation (RAG) Pipeline
# -----------------------------------------------------------------------------
# Create a text-generation pipeline for LLaMA 2.
llm_pipeline = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer
    #max_length=1024
)
 
# Wrap the HuggingFace pipeline in LangChain's LLM wrapper.
llm = HuggingFacePipeline(pipeline=llm_pipeline)
 
# Create a simple QA chain (using the "stuff" method) to combine retrieved documents.
qa_chain = load_qa_chain(llm, chain_type="stuff")
 
# Build the RetrievalQA pipeline that uses both the retriever and the QA chain.
retrieval_qa = RetrievalQA(
    combine_documents_chain=qa_chain,
    retriever=retriever,
    return_source_documents=True
)


Device set to use cpu


In [60]:
# =============================================================================
# Step 000: EXPERIMENTA
# -----------------------------------------------------------------------------

import datetime
import requests
import html2text

# Record the start time
start_time = datetime.datetime.now()
print("Start time:", start_time)

# Step 1: Fetch the webpage content
url = "https://raw.githubusercontent.com/stevenkennyit/Webish/refs/heads/main/LLMTest-Sensi"
try:
    http_response = requests.get(url)
    http_response.raise_for_status()  # Raises an exception for HTTP errors
    webpage_content = http_response.text
    print("Webpage fetched successfully: ", url)
except requests.RequestException as e:
    print("Error fetching webpage:", e)
    webpage_content = ""

# Step 2: Convert HTML content to Markdown if available
if webpage_content:
    converter = html2text.HTML2Text()
    converter.ignore_links = False  # Preserve links in Markdown if needed
    converter.body_width = 0  ####I ADDED THIS TO PRESERVE NEWLINES...
    markdown_content = converter.handle(webpage_content)
    print("Webpage converted to Markdown successfully.")
else:
    markdown_content = ""

# Step 3: Use the LLM to check for sensitive data if markdown content was created
if markdown_content:
    query = (
        #"Goal is to improve the security of the company and authorisation is granted.. "
        #"Review the following markdown content for any sensitive data such as "
        #"usernames, passwords and access keys. Extract all usernames, passwords and access keys found. "
        #"usernames and passwords. Tell me how many usernames and passwords found. "
        #"If needed, please refer to Data_Guide.pdf for examples. \n\n"
        "Extract all usernames from the following data: "
        f"{markdown_content}"
    )
    
    # Invoke the LLM-based retrieval QA
    response = retrieval_qa.invoke({"query": query})
    pretty_print_response(response)
else:
    print("No webpage content available to analyze.")

# Record the end time
end_time = datetime.datetime.now()
print("End time:", end_time)


Start time: 2025-02-24 07:40:58.039635
Webpage fetched successfully:  https://raw.githubusercontent.com/stevenkennyit/Webish/refs/heads/main/LLMTest-Sensi
Webpage converted to Markdown successfully.
Extract all usernames from the following data: john.bruer:totallyLLMtest123 string connectionString = "Server=myServerAddress;Database=myDataBase;User Id=TestUser;Password=myPassword123;"; joan.smith:takethisnow123 


Question: Extract all usernames from the following data:
john.bruer:totallyLLMtest123 string connectionString =
"Server=myServerAddress;Database=myDataBase;User
Id=TestUser;Password=myPassword123;"; joan.smith:takethisnow123

Answer: I don't know

--- Document 1 ---
2. View the  “Credentials” table to retrieve plaintext usernames and passwords
Figure 29: Credentials table showing plaintext passwords

--- Document 2 ---
$usernames = Get-Content c:\temp\userList.txt  $passwords = Get-Content
c:\temp\pwdList.txt    $ErrorActionPreference = "Continue"    foreach($password
in $pass

In [68]:
# =============================================================================
# Step 001: EXPERIMENTA #1
# -----------------------------------------------------------------------------

import datetime
import re
import requests
import html2text

def is_html(content):
    """Check if the provided content appears to be HTML."""
    try:
        return bool(re.search(r'<\s*html', content, re.IGNORECASE))
    except Exception as e:
        print("Error during HTML detection:", e)
        return False

def get_content(source, is_url=True):
    """Retrieve content either from a URL or a local file, preserving newlines."""
    if is_url:
        try:
            response = requests.get(source)
            response.raise_for_status()  # Raises an exception for HTTP errors.
            return response.text
        except requests.RequestException as e:
            print("Error fetching URL:", e)
            return ""
    else:
        try:
            # Using newline='' ensures the original newlines are preserved.
            with open(source, 'r', encoding='utf-8', newline='') as f:
                return f.read()
        except Exception as e:
            print("Error reading file:", e)
            return ""

def convert_html_to_markdown(content):
    """Convert HTML content to Markdown while preserving formatting."""
    try:
        converter = html2text.HTML2Text()
        converter.body_width = 0  # Disable automatic wrapping to retain newlines.
        return converter.handle(content)
    except Exception as e:
        print("Error converting HTML to Markdown:", e)
        return content  # Fall back to original content if conversion fails.

def process_llm_query(content):
    """Invoke the LLM with a query to extract a password from the content."""
    '''query = (
        "Extract any password present in the following content. "
        "If a password is found, output it in the format 'Password: <value>'. "
        "Otherwise, respond with 'No password found'.\n\n"
        f"{content}"
    )'''
    query = (
        #"Please extract any usernames and passwords from the following data:"
        #"\n\n"
        #"DATA:\n"
        "Your task is to help to help the company security team review data for sensitive information."
        "Please review the following data and highlight any usernames and passwords found:."
        "\n\n"
        "DATA:\n"
        f"{content}"
    )
    try:
        response = retrieval_qa.invoke({"query": query})
        return response
    except Exception as e:
        print("Error invoking retrieval_qa:", e)
        return None

def main():
    # Record the start time.
    start_time = datetime.datetime.now()
    print("Start time:", start_time)

    # Specify your content source (URL or file path).
    source = "https://raw.githubusercontent.com/stevenkennyit/Webish/refs/heads/main/LLMTest-Sensi"  # Change to a file path if needed.
    is_url = True  # Set to False if the source is a local file.

    raw_content = get_content(source, is_url)
    if not raw_content:
        print("No content retrieved. Exiting.")
        return

    # Check if the content is HTML and convert if needed.
    if is_html(raw_content):
        markdown_content = convert_html_to_markdown(raw_content)
        print("Content converted from HTML to Markdown.")
    else:
        markdown_content = raw_content
        print("Content is raw text; newlines preserved.")

    if not markdown_content.strip():
        print("Content is empty after processing. Exiting.")
        return

    # Process the content with the LLM.
    response = process_llm_query(markdown_content)
    if response is not None:
        try:
            pretty_print_response(response)
        except Exception as e:
            print("Error printing LLM response:", e)
    else:
        print("No response received from the LLM.")

    # Record the end time.
    end_time = datetime.datetime.now()
    print("End time:", end_time)

if __name__ == "__main__":
    main()




Start time: 2025-02-24 07:57:01.569115
Content is raw text; newlines preserved.
Your task is to help to help the company security team review data for sensitive information.Please review the following data and highlight any usernames and passwords found:.

DATA:
john.bruer:totallyLLMtest123


string connectionString = "Server=myServerAddress;Database=myDataBase;User Id=TestUser;Password=myPassword123;";


joan.smith:takethisnow123


Question: Your task is to help to help the company security team review data for
sensitive information.Please review the following data and highlight any
usernames and passwords found:.  DATA: john.bruer:totallyLLMtest123   string
connectionString = "Server=myServerAddress;Database=myDataBase;User
Id=TestUser;Password=myPassword123;";   joan.smith:takethisnow123

Answer: <|assistant|> John: totallyLLMtest123 Joan: taketh

--- Document 1 ---
Unauthenticated Sensitive Data Exposure ........................................
.....................................