In [1]:
!pip install openai faiss-cpu transformers tiktoken pytesseract pymupdf Pillow




[notice] A new release of pip is available: 25.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
import os
from openai import OpenAI
import openai
from langchain_openai import OpenAIEmbeddings
import base64
from mimetypes import guess_type
from PIL import Image
import fitz
import pytesseract
import tiktoken
import faiss
import numpy as np
from transformers import CLIPProcessor, CLIPModel

os.environ["OPENAI_API_KEY"] = ""
# Set your OpenAI API key
client = OpenAI(api_key=os.environ.get('OPENAI_API_KEY'))

In [22]:
# --------------------------
# Utility Functions
# --------------------------

def extract_text_from_pdf(pdf_path):
    """Extract full text from a PDF using PyMuPDF (fitz)."""
    doc = fitz.open(pdf_path)
    full_text = ""
    for page in doc:
        full_text += page.get_text()
    return full_text

def ocr_image(image_path):
    """Extract text from an image using pytesseract."""
    image = Image.open(image_path)
    return pytesseract.image_to_string(image)

def image_to_data_url(image_path: str) -> str:
    """Convert an image file to a base64 data URL."""
    mime_type, _ = guess_type(image_path)
    if mime_type is None:
        mime_type = 'application/octet-stream'
    with open(image_path, "rb") as img_file:
        encoded = base64.b64encode(img_file.read()).decode("utf-8")
    return f"data:{mime_type};base64,{encoded}"

def split_text_into_chunks(text: str, max_tokens: int = 2000, overlap: int = 100) -> list:
    """Split text into overlapping chunks based on token count using tiktoken."""
    encoding = tiktoken.get_encoding("cl100k_base")
    tokens = encoding.encode(text)
    chunks = []
    start = 0
    while start < len(tokens):
        end = start + max_tokens
        chunk = encoding.decode(tokens[start:end])
        chunks.append(chunk)
        start = end - overlap  # overlap for context
    return chunks


In [43]:
# --------------------------
# Embedding and Retrieval Functions
# --------------------------

def get_text_embedding(text: str, model="text-embedding-3-small"): #text-embedding-ada-002
    """Get text embedding from OpenAI."""
    response = OpenAIEmbeddings(model=model)
    return np.array(response)

def build_faiss_index(chunks: list) -> (faiss.IndexFlatL2, list):
    """Compute embeddings for each chunk and build a FAISS index."""
    embeddings = []
    for chunk in chunks:
        emb = get_text_embedding(chunk)
        embeddings.append(emb)
    embedding_dim = len(embeddings[0])
    embedding_matrix = np.array(embeddings)
    index = faiss.IndexFlatL2(embedding_dim)
    index.add(embedding_matrix)
    return index, embeddings

def get_clip_image_embedding(image_path: str, clip_model, clip_processor) -> np.array:
    """Compute the CLIP image embedding for the image."""
    image = Image.open(image_path).convert("RGB")
    inputs = clip_processor(images=image, return_tensors="pt")
    outputs = clip_model.get_image_features(**inputs)
    return outputs.detach().numpy().flatten().astype("float32")

def retrieve_relevant_chunks(image_embedding: np.array, index: faiss.IndexFlatL2, chunks: list, k: int = 3) -> str:
    """Retrieve the top k most relevant chunks based on cosine similarity."""
    # FAISS by default computes L2 distances.
    # Normalize vectors to use cosine similarity.
    def normalize(v):
        return v / np.linalg.norm(v)
    query = normalize(image_embedding).reshape(1, -1)
    # Normalize index vectors manually: if your embeddings are not normalized, do it here.
    distances, indices = index.search(query, k)
    retrieved = [chunks[i] for i in indices[0]]
    return "\n\n".join(retrieved)


In [44]:
# --------------------------
# Final Mapping Function
# --------------------------
    
def map_image_to_pci_requirement(retrieved_context: str, image_path: str, image_text: str) -> str:
    """
    Sends a prompt with the extracted PCI-DSS template text and an image
    (in data URL format) to GPT-4 Vision and returns the model's response.
    """
    # Extracting text from image:
    image_text = ocr_image(image_path)
    
    # Preprocess the client screenshot image
    print("Converting image to data URL...")
    image_data_url = image_to_data_url(image_path)
    
    # Detailed prompt using the PCI-DSS controls text
    prompt = f"""
    You are an expert in PCI-DSS compliance. 
    Below is the relevant context extracted from the PCI-DSS Report on Compliance Template containing the controls and requirements.
    A client has provided a screenshot showing details of their network and security configuration.
    Analyze the image and identify which specific control requirement is being addressed.
    Provide the control requirement code along with a detailed explanation of 
    how the information in the given image satisfies that requirement.
    PCI-DSS Template Excerpt: \n
    {retrieved_context}...\n

    Context text from image: \n
    {image_text}
    Please be as specific as possible in your mapping.
    """

    # Construct the messages for the ChatCompletion API.
    # The user message is given as an array with both a text segment and the image.
    messages = [
        {"role": "system", "content": "You are a PCI-DSS compliance expert."},
        {
            "role": "user",
            "content": [
                {"type": "text", "text": prompt},
                {"type": "image_url", "image_url": {"url": image_data_url}}
            ]
        }
    ]

    # Call the GPT-4 Vision API (model name may vary, e.g., "gpt-4-vision-preview")
    response = client.chat.completions.create(
        model="gpt-4o",  # adjust to your available model identifier
        messages=messages,
        max_tokens=500
    )

    # Extract and return the answer text from the response.
    answer = response.choices[0].message.content
    return answer

In [45]:
# --------------------------
# Main Application Flow
# --------------------------

def main():
    # Paths to files
    pdf_path = "PCI-DSS-ROC-Template.pdf"       # your PCI-DSS template PDF
    image_path = "Connfido Network Diagram.png"  # client's screenshot image

    print("Extracting PDF text...")
    full_pdf_text = extract_text_from_pdf(pdf_path)
    if not full_pdf_text.strip():
        print("No text extracted from PDF. Check the file.")
        return

    # Split PDF text into chunks
    chunks = split_text_into_chunks(full_pdf_text, max_tokens=2000, overlap=100)
    print(f"Total chunks: {len(chunks)}")

    # Build FAISS index for text chunks
    print("Building FAISS index for text chunks...")
    faiss_index, _ = build_faiss_index(chunks)

    # Initialize CLIP model and processor for image embeddings
    clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
    clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

    # Compute image embedding using CLIP
    print("Computing image embedding using CLIP...")
    img_emb = get_clip_image_embedding(image_path, clip_model, clip_processor)

    # Retrieve relevant text chunks using image embedding for retrieval
    print("Retrieving relevant text chunks based on image content...")
    retrieved_context = retrieve_relevant_chunks(img_emb, faiss_index, chunks, k=3)
    print("Retrieved context:")
    print(retrieved_context)

    # Extract OCR text from image (optional extra context)
    image_text = ocr_image(image_path)

    # Map the image to PCI-DSS requirement using GPT-4 Vision
    print("Mapping image to PCI-DSS requirement...")
    final_response = map_image_to_pci_requirement(retrieved_context, image_path, image_text)
    print("Final GPT Response:")
    print(final_response)


In [46]:
if __name__ == "__main__":
    main()

Extracting PDF text...
Total chunks: 89
Building FAISS index for text chunks...


TypeError: len() of unsized object