In [73]:
# Step 1: Install necessary packages in Google Colab
!apt-get install -y tesseract-ocr
!pip install pytesseract
!pip install openai
!pip install chromadb
!pip install PyPDF2
!pip install moviepy
!pip install SpeechRecognition
!pip install nltk
!pip install tiktoken
!pip install -U openai-whisper
!pip install pydub
!pip install python-docx
!pip install python-pptx
!pip install ffmpeg-python
!pip install markdown2

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
tesseract-ocr is already the newest version (4.1.1-2.1build1).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.


In [74]:
import os
import re
import logging
import shutil
from typing import List, Dict
from uuid import uuid4
import openai
from chromadb.config import Settings
from chromadb import Client
import PyPDF2
import pytesseract
from PIL import Image, ImageEnhance, ImageFilter
import whisper
from pydub import AudioSegment
import cv2
import nltk
import json

nltk.download('punkt')

# Set up OpenAI API key
openai.api_key = "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"  # Replace with your OpenAI API key

# Configure logging
def setup_logging(log_file: str = "rag_pipeline.log", log_level: int = logging.INFO):
    """
    Sets up logging configuration to capture logs in a specified file and to the console.

    Parameters:
    - log_file (str): The name of the log file to store log outputs.
    - log_level (int): The logging level; controls the severity of messages logged.
    """
    logging.basicConfig(
        level=log_level,
        format="%(asctime)s - %(levelname)s - %(message)s",
        handlers=[logging.FileHandler(log_file), logging.StreamHandler()]
    )

setup_logging()

# Clear existing Chroma directory and initialize Chroma client with persistent storage
CHROMA_PATH = "./chroma_vectordb"
shutil.rmtree(CHROMA_PATH, ignore_errors=True)
client = Client(Settings(persist_directory=CHROMA_PATH, is_persistent=True))

# Load Whisper model for transcription
whisper_model = whisper.load_model("base")

# Function Definitions

def preprocess_image(image):
    """
    Converts an image to grayscale and enhances its contrast for improved text extraction.

    Parameters:
    - image (PIL.Image): The image to preprocess.

    Returns:
    - Processed image (PIL.Image) ready for OCR with enhanced contrast and filtered for clarity.
    """
    image = image.convert("L")
    enhancer = ImageEnhance.Contrast(image)
    return enhancer.enhance(2).filter(ImageFilter.MedianFilter())

def load_image(file_path: str) -> List[str]:
    """
    Loads an image file, preprocesses it, and extracts text using OCR.

    Parameters:
    - file_path (str): Path to the image file.

    Returns:
    - A list of strings containing text extracted from the image.
    """
    preprocessed_image = preprocess_image(Image.open(file_path))
    return [pytesseract.image_to_string(preprocessed_image)]

def load_pdf(file_path: str) -> List[str]:
    """
    Loads a PDF file and extracts text from each page.

    Parameters:
    - file_path (str): Path to the PDF file.

    Returns:
    - A list of strings containing the text extracted from each page of the PDF.
    """
    text = []
    try:
        with open(file_path, "rb") as pdf_file:
            pdf_reader = PyPDF2.PdfReader(pdf_file)
            text = [page.extract_text() or "" for page in pdf_reader.pages]
    except Exception as e:
        print(f"Error loading PDF file: {e}")
    return text

def extract_audio_from_video(video_path: str) -> str:
    """
    Extracts audio from a video file and saves it as a .wav file.

    Parameters:
    - video_path (str): Path to the video file.

    Returns:
    - Path to the extracted .wav audio file.
    """
    audio_path = os.path.splitext(video_path)[0] + ".wav"
    AudioSegment.from_file(video_path).export(audio_path, format="wav")
    return audio_path

def transcribe_audio(file_path: str) -> str:
    """
    Transcribes audio content from a file using Whisper.

    Parameters:
    - file_path (str): Path to the audio file.

    Returns:
    - A string containing the transcribed text from the audio.
    """
    result = whisper_model.transcribe(file_path)
    return result["text"]

def process_media_file(file_path: str) -> List[str]:
    """
    Processes a media file (audio or video), extracts and transcribes text if audio is present.
    If no audio is detected, extracts text from frames in the video.

    Parameters:
    - file_path (str): Path to the media file.

    Returns:
    - A list of strings containing the extracted and/or transcribed text.
    """
    if file_path.lower().endswith(('.mp4', '.avi', '.mov', '.mkv')):
        audio_path = extract_audio_from_video(file_path)

        audio = AudioSegment.from_file(audio_path)
        if audio.dBFS == float('-inf'):  # Check for silence
            os.remove(audio_path)
            return extract_text_from_video_frames(file_path)

        transcription_text = transcribe_audio(audio_path)
        os.remove(audio_path)
        return [transcription_text]
    else:
        return [transcribe_audio(file_path)]

def extract_text_from_video_frames(video_path: str) -> List[str]:
    """
    Extracts text from specific frames of a video using OCR.

    Parameters:
    - video_path (str): Path to the video file.

    Returns:
    - A list of strings containing text extracted from key frames in the video.
    """
    cap = cv2.VideoCapture(video_path)
    frame_texts = []
    frame_interval = 30

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        if int(cap.get(cv2.CAP_PROP_POS_FRAMES)) % frame_interval == 0:
            image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            preprocessed_image = preprocess_image(image)
            text = pytesseract.image_to_string(preprocessed_image)
            if text.strip():
                frame_texts.append(text.strip())

    cap.release()
    return frame_texts

def load_document(file_path: str) -> List[str]:
    """
    Loads a document based on its file type (PDF, image, or media file) and extracts text.

    Parameters:
    - file_path (str): Path to the document file.

    Returns:
    - A list of strings containing the text extracted from the document.
    """
    if file_path.lower().endswith('.pdf'):
        return load_pdf(file_path)
    elif file_path.lower().endswith(('.jpg', '.jpeg', '.png')):
        return load_image(file_path)
    elif file_path.lower().endswith(('.mp3', '.wav', '.mp4', '.avi', '.mov', '.mkv')):
        return process_media_file(file_path)
    else:
        raise ValueError("Unsupported file format")

def preprocess_text(text: str) -> str:
    """
    Preprocesses text by removing extra whitespace and normalizing spacing.

    Parameters:
    - text (str): The text to preprocess.

    Returns:
    - A cleaned, preprocessed string.
    """
    return re.sub(r'\s+', ' ', text).strip()

def classify_document(text: str, file_hint: str = "") -> str:
    """
    Classifies the document text as one of several predefined document types.
    Uses OpenAI's API for classification with an enhanced prompt.

    Parameters:
    - text (str): The text content to classify.
    - file_hint (str): Optional hint to add to the classification prompt (e.g., "audio transcription").

    Returns:
    - Classification label as a string.
    """
    prompt = (
        f"Classify the following document text as one of the following types: 'invoice', 'passport', "
        "'aadhar card', 'south africa id', 'driving license', 'audio', 'video' or 'other'. "
        "Respond only with a single word indicating the classification. "
        "If uncertain, choose 'other'.\n\n"
        f"Hint: {file_hint}\n\nDocument Text:\n\n{text[:1000]}"
    )
    try:
        response = openai.ChatCompletion.create(
            model="gpt-4o-mini",
            messages=[{"role": "system", "content": "You are a document classification assistant."},
                      {"role": "user", "content": prompt}],
            max_tokens=15,  # Slightly increase tokens for clearer responses
            temperature=0
        )
        classification = response['choices'][0]['message']['content'].strip().lower()

        # Verify if the classification is valid
        valid_classes = {"invoice", "passport", "aadhar card", "south africa id", "driving license", "audio", "video", "other"}
        return classification if classification in valid_classes else "other"

    except openai.error.InvalidRequestError as e:
        print(f"An error occurred: {e}")
        return "other"


def query_parameters(text: str, doc_type: str) -> Dict[str, str]:
    """
    Extracts relevant fields from the document based on its type using prompts to OpenAI's API.

    Parameters:
    - text (str): The text content to query.
    - doc_type (str): Type of the document which dictates the fields to extract.

    Returns:
    - A dictionary where keys are field names and values are the extracted information.
    """
    questions = {
        "passport": [
            ("passport number", "What is the passport number? Respond with only the number."),
            ("name", "What is the name on the passport? Respond with only the name."),
            ("nationality", "What is the nationality on the passport? Respond with only the nationality."),
            ("date of birth", "What is the date of birth on the passport? Respond with only the date."),
            ("expiration date", "What is the expiration date on the passport? Respond with only the date.")
        ],
        "invoice": [
            ("invoice number", "What is the invoice number? Respond with only the number."),
            ("date", "What is the date on the invoice? Respond with only the date."),
            ("total amount", "What is the total amount on the invoice? Respond with only the amount."),
            ("billing address", "What is the billing address on the invoice? Respond with only the address.")
        ],
        "south africa id": [
            ("id number", "What is the id number? Respond with only the number."),
            ("name", "What is the name on the id? Respond with only the name."),
            ("nationality", "What is the nationality on the id? Respond with only the nationality."),
            ("date of birth", "What is the date of birth on the id? Respond with only the date."),
            ("expiration date", "What is the expiration date on the id? Respond with only the date.")
        ],
        # Add questions for other document types as needed

        "audio": [
            ("transcription", "What is the transcription of the audio? Respond with only the transcription."),
            ("name", "What is the name of the speaker? Respond with only the name."),

        ],
        "video": [
            ("transcription", "What is the transcription of the video? Respond with only the transcription."),
            ("video text", "Explain the summury of the text in the video?"),
            ("name", "What is the name of the language? Respond with only the name."),

        ],
    }
    extracted_params = {}
    for field_name, question in questions.get(doc_type, []):
        prompt = f"Context: {text[:1000]}\n\nQuestion: {question}"
        try:
            response = openai.ChatCompletion.create(
                model="gpt-4o-mini",
                messages=[{"role": "system", "content": "Extract information from documents."},
                          {"role": "user", "content": prompt}],
                max_tokens=50, temperature=0
            )
            extracted_params[field_name] = response['choices'][0]['message']['content'].strip()
        except openai.error.InvalidRequestError as e:
            print(f"Error querying '{question}': {e}")
            extracted_params[field_name] = None
    return extracted_params

# Run the pipeline
file_path = "/content/Why You Should Learn C++.mp4"  # Update this path to your document file

all_pages = load_document(file_path)
doc_text = " ".join(all_pages)

# Determine document type
if file_path.lower().endswith(('.mp3', '.wav')):
    doc_type = "audio"
elif file_path.lower().endswith(('.mp4', '.avi', '.mov', '.mkv')):
    doc_type = "video"
else:
    doc_type = classify_document(doc_text, file_hint="audio transcription" if "audio" in file_path else "video transcription")

# Extract parameters
if doc_type in ["invoice", "passport", "aadhar card", "south africa id", "driving license", "audio", "video"]:
    params = query_parameters(doc_text, doc_type)

    # Print the document type and extracted parameters
    print(f"Document type: {doc_type}")
    print(json.dumps(params, indent=4))
else:
    print("Document type is not recognized; skipping parameter extraction.")


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
  checkpoint = torch.load(fp, map_location=device)


Document type: video
{
    "transcription": "So moving on to my second recommendation. This is something very different from Python and this is C++. Now C++ is objectively a very powerful programming language. It has really unlimited use cases. You want to make a AAA game? Yeah, you can",
    "video text": "The speaker recommends C++ as a powerful programming language with a wide range of applications, including game development, web development, and compiler creation. However, they caution that C++ is more challenging and not beginner-friendly, suggesting it may not be the best",
    "name": "C++"
}
