![test.webp](attachment:test.webp)

In [None]:
# Video/Audio to Text

In [None]:
# ============================
# Step 1: Provide Video URL and Choose Whisper Model
# ============================
VIDEO_URL = "https://www.youtube.com/live/AcK4Q60NVM8?si=Ae01ithMDwpB-uwI"  # @param {type:"string"}
whisper_model = "large"  # @param ["tiny", "base", "small", "medium", "large", "large-v2", "large-v3"]
save_text = True         # @param {type:"boolean"}
save_srt = False         # @param {type:"boolean"}

# ============================
# Step 2: Install System/Whisper/yt-dlp Dependencies
# ============================
!apt-get update
!apt-get install -y ffmpeg

!pip install -q git+https://github.com/openai/whisper.git
!pip install -q yt-dlp

import os
import torch
import whisper
from whisper.utils import get_writer
from pathlib import Path
import yt_dlp

# ============================
# Step 3: Download Video/Audio from the URL
# ============================
def download_video(url):
    """
    Download the audio from the given video URL using yt-dlp.
    This function downloads the best available audio and converts it to an MP3 file.
    Returns the local file path of the downloaded file.
    """
    ydl_opts = {
        'outtmpl': '%(id)s.%(ext)s',  # Output filename template (e.g., videoid.mp4)
        'format': 'bestaudio/best',    # Download the best available audio
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',  # Convert to MP3
            'preferredquality': '192',
        }],
        'quiet': True,
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=True)
        filename = ydl.prepare_filename(info)
        # Adjust filename if the file was post-processed (converted to mp3)
        base, _ = os.path.splitext(filename)
        mp3_filename = base + ".mp3"
        if os.path.exists(mp3_filename):
            print(f"Downloaded and converted to: {mp3_filename}")
            return mp3_filename
        else:
            print(f"Downloaded file: {filename}")
            return filename

# ============================
# Step 4: Transcribe the Downloaded Audio with Whisper
# ============================
def transcribe_audio(model, file_path, save_text=True, save_srt=False):
    file_path = Path(file_path)
    output_directory = file_path.parent

    print("\n=======================")
    print(f"📁 Video File Path: {file_path}")
    print(f"🤖 Whisper Model: {whisper_model}")
    print("=======================")

    print(f"\n==> [transcribe_audio] Transcribing file: {file_path.name}")
    result = model.transcribe(str(file_path), verbose=False)

    # -- Save Transcript as .txt --
    if save_text:
        txt_path = file_path.with_suffix(".txt")
        print(f"  -> Saving transcript to: {txt_path.name}")
        with open(txt_path, "w", encoding="utf-8") as txt:
            txt.write(result["text"])

    # -- Save Subtitle as .srt (if enabled) --
    if save_srt:
        print("  -> Creating .srt file for transcript.")
        srt_writer = get_writer("srt", str(output_directory))
        srt_writer(result, str(file_path.stem))

    # -- Download Transcript Files to Local Machine (if running in Colab) --
    try:
        from google.colab import files
        if save_text and txt_path.exists():
            print(f"  -> Downloading {txt_path.name} to local machine...")
            files.download(str(txt_path))
        if save_srt:
            srt_path = file_path.with_suffix(".srt")
            if srt_path.exists():
                print(f"  -> Downloading {srt_path.name} to local machine...")
                files.download(str(srt_path))
    except ImportError:
        print("  -> Google Colab's files module not found. Skipping file download.")

    print("\n✨ Transcription complete!")
    print("=======================")
    return result

# ============================
# Step 5: Download Audio and Transcribe
# ============================
# Download the audio from the provided VIDEO_URL
downloaded_file = download_video(VIDEO_URL)

# Load the Whisper model
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"\n[System] Using device: {device}")
print(f"[System] Loading Whisper model: {whisper_model}")
model = whisper.load_model(whisper_model).to(device)

# Transcribe the downloaded file
transcription_result = transcribe_audio(model, downloaded_file, save_text=save_text, save_srt=save_srt)

print("\n[System] Process complete!")


In [1]:
# Requirements:
!pip install langchain unstructured[all-docs] pydantic lxml openai chromadb tiktoken pytesseract langchain_google_genai
!pip install langchain-huggingface transformers torch
!pip install -U langchain-community
!pip install pytesseract

!apt-get install -y poppler-utils
!apt-get install -y tesseract-ocr

!pip install google-generativeai


Collecting chromadb
  Downloading chromadb-1.0.5-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.9 kB)
Collecting tiktoken
  Downloading tiktoken-0.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Collecting pytesseract
  Downloading pytesseract-0.3.13-py3-none-any.whl.metadata (11 kB)
Collecting langchain_google_genai
  Downloading langchain_google_genai-2.1.3-py3-none-any.whl.metadata (4.7 kB)
Collecting unstructured[all-docs]
  Downloading unstructured-0.17.2-py3-none-any.whl.metadata (24 kB)
Collecting filetype (from unstructured[all-docs])
  Downloading filetype-1.2.0-py2.py3-none-any.whl.metadata (6.5 kB)
Collecting python-magic (from unstructured[all-docs])
  Downloading python_magic-0.4.27-py2.py3-none-any.whl.metadata (5.8 kB)
Collecting emoji (from unstructured[all-docs])
  Downloading emoji-2.14.1-py3-none-any.whl.metadata (5.7 kB)
Collecting dataclasses-json (from unstructured[all-docs])
  Downloading dataclasses_json-0.6

Collecting langchain-huggingface
  Downloading langchain_huggingface-0.1.2-py3-none-any.whl.metadata (1.3 kB)
Downloading langchain_huggingface-0.1.2-py3-none-any.whl (21 kB)
Installing collected packages: langchain-huggingface
Successfully installed langchain-huggingface-0.1.2
Collecting langchain-community
  Downloading langchain_community-0.3.21-py3-none-any.whl.metadata (2.4 kB)
Collecting pydantic-settings<3.0.0,>=2.4.0 (from langchain-community)
  Downloading pydantic_settings-2.9.1-py3-none-any.whl.metadata (3.8 kB)
Collecting httpx-sse<1.0.0,>=0.4.0 (from langchain-community)
  Downloading httpx_sse-0.4.0-py3-none-any.whl.metadata (9.0 kB)
Downloading langchain_community-0.3.21-py3-none-any.whl (2.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m45.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading httpx_sse-0.4.0-py3-none-any.whl (7.8 kB)
Downloading pydantic_settings-2.9.1-py3-none-any.whl (44 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
#from google.colab import drive
#drive.mount('/content/drive')
#!rm -rf Downloads/

**Downloads Google Drives files**

In [3]:
import gdown
import os

def download_from_gdrive(link, download_path="Downloads"):
    """
    Downloads a file or folder from a Google Drive link.
    :param link: The Google Drive link (file or folder).
    :param download_path: The directory where the file/folder will be downloaded.
    """
    # Create the download directory if it doesn't exist
    if not os.path.exists(download_path):
        os.makedirs(download_path)

    try:
        # Check if the link is for a file or folder
        if "file/d/" in link:
            # It's a file
            print("Detected a file. Downloading...")
            file_id = link.split("file/d/")[1].split("/")[0]
            file_url = f"https://drive.google.com/uc?id={file_id}"
            gdown.download(file_url, output=os.path.join(download_path, "file"), fuzzy=True)
            print(f"File downloaded to: {download_path}")
        elif "drive/folders/" in link:
            # It's a folder
            print("Detected a folder. Downloading...")
            folder_id = link.split("folders/")[1].split("?")[0]
            gdown.download_folder(f"https://drive.google.com/drive/folders/{folder_id}", output=download_path)
            print(f"Folder downloaded to: {download_path}")
        else:
            print("Invalid Google Drive link. Please provide a valid file or folder link.")
    except Exception as e:
        print(f"An error occurred: {e}")
        print("Please check the link and permissions.")

# Example usage
if __name__ == "__main__":
    # Replace with your Google Drive link
    gdrive_link = "https://drive.google.com/drive/folders/1AGvMSrnTkZqON4hRSy6R00mlX3RM04qv?usp=drive_link" # information Security
    # Download to the "Downloads" directory (or specify a custom path)
    download_from_gdrive(gdrive_link, download_path="Downloads")


    #gdrive_link = "https://drive.google.com/drive/folders/1KE8Uyh-xGYyZr6sZGAYwtYmluJVZDyEN?usp=drive_link"  # or Computerr_vision
    #download_from_gdrive(gdrive_link, download_path="Downloads")


Detected a folder. Downloading...


Retrieving folder contents


Retrieving folder 1L4xqKCLcKCk-bK1O-mjU6nRfWsfsTmKE chat_gpt
Processing file 1IjRNGT4y5-GyqYpl2NWngiAfESECaUHV 2.9_gpt.pdf
Processing file 1OSmenWkno13AzYpJFiQMN03wFI6CRD99 2.10_gpt.pdf
Processing file 1VkwNP1UOWJy1KCwuvAkh8uregCzxtgmL 3.1_gpt.pdf
Processing file 1q5yRXLo9WJAHzOUKhjcoFbfZW6zDY3Y8 3.2_explanation_gpt.pdf
Processing file 1qup_-e_iJFNsd-3cFIpRlKzMJzhUsWvv 3.3_summary_gpt.pdf
Processing file 1Sdgz38fbJH7jKIVT24FuZvJcTBZw1yUe 3.4_gpt.pdf
Processing file 1kyIsH3nZcBj6fv8QyQRp6f7KLFWu7LD- 3.4bind_reverse_shell_exploitation.pdf
Processing file 1jrZYEimvFGoslhWVR--QtJSnYsN_-KVk 3.5_gpt.pdf
Processing file 1Z2g1vewfcl-QkeDvXwCnZ7oMSNQ5MFI9 3.6_gpt.pdf
Processing file 1EvG2ZQiVE02hJGJMM9d2Edjapnryn7ua expoit_bof_vulnerability(3.3).pdf
Processing file 1o2HEVEAVSM2iVvBiqe2YNwse8lPBBgAq log_files_gpt.pdf
Processing file 1HgYAefpCwCOoijd8b-ab0uflApBwfn1L ~$dsf21m043_ass_01.docx
Processing file 1spyJ9miy47bfJjAVpBfFBlbQWk1Haa5m 3.1 complet_ctst_gpt.pdf
Processing file 1F03du3PZkZgi66u

Retrieving folder contents completed
Building directory structure
Building directory structure completed
Downloading...
From: https://drive.google.com/uc?id=1IjRNGT4y5-GyqYpl2NWngiAfESECaUHV
To: /content/Downloads/chat_gpt/2.9_gpt.pdf
100%|██████████| 231k/231k [00:00<00:00, 4.91MB/s]
Downloading...
From: https://drive.google.com/uc?id=1OSmenWkno13AzYpJFiQMN03wFI6CRD99
To: /content/Downloads/chat_gpt/2.10_gpt.pdf
100%|██████████| 131k/131k [00:00<00:00, 3.54MB/s]
Downloading...
From: https://drive.google.com/uc?id=1VkwNP1UOWJy1KCwuvAkh8uregCzxtgmL
To: /content/Downloads/chat_gpt/3.1_gpt.pdf
100%|██████████| 669k/669k [00:00<00:00, 7.19MB/s]
Downloading...
From: https://drive.google.com/uc?id=1q5yRXLo9WJAHzOUKhjcoFbfZW6zDY3Y8
To: /content/Downloads/chat_gpt/3.2_explanation_gpt.pdf
100%|██████████| 450k/450k [00:00<00:00, 6.12MB/s]
Downloading...
From: https://drive.google.com/uc?id=1qup_-e_iJFNsd-3cFIpRlKzMJzhUsWvv
To: /content/Downloads/chat_gpt/3.3_summary_gpt.pdf
100%|██████████| 156

Folder downloaded to: Downloads



Download completed


**Handle Also the files Extraction from zip files**

In [5]:
import gdown
import os
import zipfile

def download_and_extract(link, download_path="Downloads"):
    """
    Downloads a file or folder from Google Drive.
    If a ZIP file is found (at any level), it extracts it.
    """
    if not os.path.exists(download_path):
        os.makedirs(download_path)

    try:
        if "file/d/" in link:
            # Download a single file
            file_id = link.split("file/d/")[1].split("/")[0]
            file_url = f"https://drive.google.com/uc?id={file_id}"
            downloaded_file_path = os.path.join(download_path, "file")
            gdown.download(file_url, output=downloaded_file_path, fuzzy=True)
            extract_zip_if_needed(downloaded_file_path, download_path)

        elif "drive/folders/" in link:
            # Download a folder
            folder_id = link.split("folders/")[1].split("?")[0]
            gdown.download_folder(f"https://drive.google.com/drive/folders/{folder_id}", output=download_path)          # Recursively check for ZIP files within the downloaded folder
            for root, _, files in os.walk(download_path):
                for file in files:
                    file_path = os.path.join(root, file)
                    extract_zip_if_needed(file_path, download_path)

        else:
            print("Invalid Google Drive link. Please provide a valid file or folder link.")

    except Exception as e:
        print(f"An error occurred: {e}")
        print("Please check the link and permissions.")

def extract_zip_if_needed(file_path, download_path):
    """Extracts a ZIP file if it's found."""
    if file_path.lower().endswith(".zip"):
        with zipfile.ZipFile(file_path, 'r') as zip_ref:
            extract_path = os.path.join(download_path, os.path.splitext(file_path)[0])
            zip_ref.extractall(extract_path)
            print(f"Extracted ZIP file to: {extract_path}")
        # Remove original ZIP file (optional)
        os.remove(file_path)

if __name__ == "__main__":

    gdrive_link = "https://drive.google.com/drive/folders/1AGvMSrnTkZqON4hRSy6R00mlX3RM04qv?usp=drive_link"  # information-Security
    download_and_extract(gdrive_link, download_path="Downloads2")

Retrieving folder contents


Retrieving folder 1L4xqKCLcKCk-bK1O-mjU6nRfWsfsTmKE chat_gpt
Processing file 1IjRNGT4y5-GyqYpl2NWngiAfESECaUHV 2.9_gpt.pdf
Processing file 1OSmenWkno13AzYpJFiQMN03wFI6CRD99 2.10_gpt.pdf
Processing file 1VkwNP1UOWJy1KCwuvAkh8uregCzxtgmL 3.1_gpt.pdf
Processing file 1q5yRXLo9WJAHzOUKhjcoFbfZW6zDY3Y8 3.2_explanation_gpt.pdf
Processing file 1qup_-e_iJFNsd-3cFIpRlKzMJzhUsWvv 3.3_summary_gpt.pdf
Processing file 1Sdgz38fbJH7jKIVT24FuZvJcTBZw1yUe 3.4_gpt.pdf
Processing file 1kyIsH3nZcBj6fv8QyQRp6f7KLFWu7LD- 3.4bind_reverse_shell_exploitation.pdf
Processing file 1jrZYEimvFGoslhWVR--QtJSnYsN_-KVk 3.5_gpt.pdf
Processing file 1Z2g1vewfcl-QkeDvXwCnZ7oMSNQ5MFI9 3.6_gpt.pdf
Processing file 1EvG2ZQiVE02hJGJMM9d2Edjapnryn7ua expoit_bof_vulnerability(3.3).pdf
Processing file 1o2HEVEAVSM2iVvBiqe2YNwse8lPBBgAq log_files_gpt.pdf
Processing file 1HgYAefpCwCOoijd8b-ab0uflApBwfn1L ~$dsf21m043_ass_01.docx
Processing file 1spyJ9miy47bfJjAVpBfFBlbQWk1Haa5m 3.1 complet_ctst_gpt.pdf
Processing file 1F03du3PZkZgi66u

Retrieving folder contents completed
Building directory structure
Building directory structure completed
Downloading...
From: https://drive.google.com/uc?id=1IjRNGT4y5-GyqYpl2NWngiAfESECaUHV
To: /content/Downloads2/chat_gpt/2.9_gpt.pdf
100%|██████████| 231k/231k [00:00<00:00, 4.80MB/s]
Downloading...
From: https://drive.google.com/uc?id=1OSmenWkno13AzYpJFiQMN03wFI6CRD99
To: /content/Downloads2/chat_gpt/2.10_gpt.pdf
100%|██████████| 131k/131k [00:00<00:00, 3.73MB/s]
Downloading...
From: https://drive.google.com/uc?id=1VkwNP1UOWJy1KCwuvAkh8uregCzxtgmL
To: /content/Downloads2/chat_gpt/3.1_gpt.pdf
100%|██████████| 669k/669k [00:00<00:00, 8.19MB/s]
Downloading...
From: https://drive.google.com/uc?id=1q5yRXLo9WJAHzOUKhjcoFbfZW6zDY3Y8
To: /content/Downloads2/chat_gpt/3.2_explanation_gpt.pdf
100%|██████████| 450k/450k [00:00<00:00, 5.12MB/s]
Downloading...
From: https://drive.google.com/uc?id=1qup_-e_iJFNsd-3cFIpRlKzMJzhUsWvv
To: /content/Downloads2/chat_gpt/3.3_summary_gpt.pdf
100%|██████████

In [10]:
import gdown
import os
import zipfile

def download_and_extract(link, download_path="Downloads"):
    """
    Downloads a file or folder from Google Drive.
    If a ZIP file is found (at any level), it extracts it.
    Skips problematic files and displays a message.
    """
    if not os.path.exists(download_path):
        os.makedirs(download_path)

    try:
        if "file/d/" in link:
            # Download a single file
            file_id = link.split("file/d/")[1].split("/")[0]
            file_url = f"https://drive.google.com/uc?id={file_id}"
            downloaded_file_path = os.path.join(download_path, "file")
            try:
                gdown.download(file_url, output=downloaded_file_path, fuzzy=True)
                extract_zip_if_needed(downloaded_file_path, download_path)
            except Exception as e:
                print(f"Error downloading or extracting file: {file_url}")
                print(f"Error message: {e}")
                print("Skipping this file...\n")

        elif "drive/folders/" in link:
            # Download a folder
            folder_id = link.split("folders/")[1].split("?")[0]
            try:
                gdown.download_folder(f"https://drive.google.com/drive/folders/{folder_id}", output=download_path)
                # Recursively check for ZIP files within the downloaded folder
                for root, _, files in os.walk(download_path):
                    for file in files:
                        file_path = os.path.join(root, file)
                        try:
                            extract_zip_if_needed(file_path, download_path)
                        except Exception as e:
                            print(f"Error extracting ZIP file: {file_path}")
                            print(f"Error message: {e}")
                            print("Skipping this file...\n")
            except Exception as e:
                print(f"Error downloading or extracting folder: {folder_id}")
                print(f"Error message: {e}")
                print("Skipping this folder...\n")

        else:
            print("Invalid Google Drive link. Please provide a valid file or folder link.")

    except Exception as e:
        print(f"An error occurred: {e}")
        print("Please check the link and permissions.")

def extract_zip_if_needed(file_path, download_path):
    """Extracts a ZIP file if it's found."""
    if file_path.lower().endswith(".zip"):
        try:
            with zipfile.ZipFile(file_path, 'r') as zip_ref:
                extract_path = os.path.join(download_path, os.path.splitext(file_path)[0])
                zip_ref.extractall(extract_path)
                print(f"Extracted ZIP file to: {extract_path}")
            # Remove original ZIP file (optional)
            os.remove(file_path)
        except Exception as e:
            print(f"Error extracting ZIP file: {file_path}")
            print(f"Error message: {e}")
            print("Skipping this file...\n")


if __name__ == "__main__":
    gdrive_link = "https://drive.google.com/drive/folders/1KE8Uyh-xGYyZr6sZGAYwtYmluJVZDyEN?usp=drive_link"
    download_and_extract(gdrive_link, download_path="Downloads7")

Retrieving folder contents


Retrieving folder 1R1mP3BIEuVCLf8q6W1KaizWLkbN39WNR 2nd_jan
Retrieving folder 1rxjUlNwDkvPPyd4RLQyXPzp8oXu1it4r Pytorch-Action-Recognition-master - BSDSF21M052
Retrieving folder 1YP3JN1awuJ19NxPdMOsEEQhigsOI15oK Pytorch-Action-Recognition-master
Processing file 1R6tIoFMcB0sH0T8B_v_rWjVTd38_GhJV featrues_extraction.py
Processing file 1DYttJpxffB4T9tu9MuasFCttm80o4L_L information to run this project.md
Processing file 164z9LR1xqrfpVgZENrAq5aADwcC9GiLs train_LSTM.py
Retrieving folder 1KMOulLbp9aNKfoNFzQv6OkgjHyfhGn6d Assignments
Retrieving folder 1GklcWmehkcjZXLpCmFDYHQbfz8u62PaN Ass3
Retrieving folder 1IV4kSOmQUbFiqi6xqZequ9953_kZPSsf BSDSF21M043-Ahmar Akram - Assignment 3
Retrieving folder 1UiIDkPvvBouhL7MarwpZ0Dqg2CByIfy6 Assignment 3
Processing file 1KzTEkNQmGr3GA9QG0Xt6x01NxRl9EwZz assignment 3.pdf
Processing file 1kf5aHY-jaa4Q9VzaEyVB7KFv0-fqTPzX grid_blue.jpg
Processing file 1ST6Mbjuj_PWu88QRhQkosxdMJshpyGMQ grid_green.jpg
Processing file 14JTHEhTTeRhMtyELdLl2S1Gp6bNL8rvQ grid_red.

Retrieving folder contents completed
Building directory structure
Building directory structure completed


Error downloading or extracting folder: 1KE8Uyh-xGYyZr6sZGAYwtYmluJVZDyEN
Error message: Failed to retrieve file url:

	Cannot retrieve the public link of the file. You may need to change
	the permission to 'Anyone with the link', or have had many accesses.
	Check FAQ in https://github.com/wkentaro/gdown?tab=readme-ov-file#faq.

You may still be able to access the file from the browser:

	https://drive.google.com/uc?id=1R6tIoFMcB0sH0T8B_v_rWjVTd38_GhJV

but Gdown can't. Please check connections and permissions.
Skipping this folder...



In [None]:
# its a Mirror

!ls /content/drive/MyDrive/GradeMate_Documents/Computer-Vision/

'1. Texture.pptx'		      adv_rag_t_one_note.pdf
'2. Local Image Descriptors.pptx'     Assignments
 2nd_jan			      Contour_line_detection_numericals.pdf
 2nd_jan.zip			     'DA2 Semester Project - Fitness Tracker Data (1).pdf'
 31st_lab			      Lecture-7.pdf
 31st_lab.zip			      Pre-Mid
'3. Corner_contour_shapes.pptx'       segmeataion_concepts.pdf
 4-Segmentation.pptx		      sift_numerical.pdf
'50 Multiple Choice Questions.docx'   texture_numerical_example_part_1.pdf
'5.1Mean Shift_examples.pptx'	      texture_numericals_local_binary_pattern_part_2.pdf
'5-K- means Clustering.pptx'	      WaterSheld_psd.pdf
 6-CNN.pptx


In [None]:
import os
from unstructured.partition.pdf import partition_pdf
import pytesseract
import uuid

#from langchain.embeddings import OpenAIEmbeddings
from sentence_transformers import SentenceTransformer

from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.schema.document import Document
from langchain.storage import InMemoryStore
from langchain.vectorstores import Chroma

import base64
# we are not using the openai paid chatbot
#from langchain.chat_models import ChatOpenAI
from langchain_google_genai import ChatGoogleGenerativeAI


from langchain.schema.messages import HumanMessage, AIMessage
from dotenv import load_dotenv

from langchain.schema.runnable import RunnablePassthrough
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser

load_dotenv()

False

In [None]:
# we will use the googe gemin 1.5 api key
#os.environ["OPENAI_API_KEY"]= gemini_api_key

In [None]:
# pytesseract.pytesseract.tesseract_cmd = r'Downloads\Untitled Folder\tesseract.exe'



In [None]:
from langchain_community.document_loaders import DirectoryLoader
from langchain_community.document_loaders.unstructured import UnstructuredFileLoader

loader = DirectoryLoader(
    path='Downloads',
    glob='*.pdf',
    loader_cls=UnstructuredFileLoader
)

docs = loader.lazy_load()

In [None]:
i = 3
for document in docs:
  print(document.metadata)
  print(document.page_content)
  if i == 0:
    break
  i -= 1

{'source': 'Downloads/Handout 3.6 (Mitigation Techniques for BoF Vulnerability).pdf'}
Department of Computer Science FC College University

Department of Data Science University of the Punjab

Handout: 3.6 Mitigation Techniques for BoF Vulnerability

Overview

Buffer Overflow (BoF) vulnerabilities occur when a program writes more data to a buffer than it can hold, leading to potential overwriting of adjacent memory. This can result in crashes, data corruption, or security exploits, including arbitrary code execution. Various security measures and mitigation techniques have been developed to prevent or mitigate the risk of buffer overflows. We have categorized them into developer-based, OS-based, and compiler-based techniques. By employing a combination of these techniques, systems can be hardened against buffer overflow attacks and other memory corruption vulnerabilities.

Developer-Based Techniques These are techniques that developers can/should use during the design and coding phases

import pytesseract

pytesseract.pytesseract.tesseract_cmd = "/usr/bin/tesseract"

In [None]:
print('a')

In [None]:
input_path = os.getcwd()
output_path = os.path.join(os.getcwd(), "figures")
# is
raw_pdf_elements = partition_pdf(
    filename=os.path.join(input_path, "finstatements.pdf"),
    extract_images_in_pdf=True,
    infer_table_structure=True,
    chunking_strategy="by_title",
    max_characters=4000,
    new_after_n_chars=3800,
    combine_text_under_n_chars=2000,
    image_output_dir_path=output_path,
)

yolox_l0.05.onnx:   0%|          | 0.00/217M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.47k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/115M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/46.8M [00:00<?, ?B/s]

In [None]:
!pip install rarfile

Collecting rarfile
  Downloading rarfile-4.2-py3-none-any.whl.metadata (4.4 kB)
Downloading rarfile-4.2-py3-none-any.whl (29 kB)
Installing collected packages: rarfile
Successfully installed rarfile-4.2


In [None]:
import os
import zipfile
import rarfile
from unstructured.partition.auto import partition

# Function to process a single file and return its elements
def process_file(file_path, output_path):
    try:
        # Use partition to handle all supported file types
        elements = partition(
            filename=file_path,
            extract_images_in_pdf=True,
            infer_table_structure=True,
            chunking_strategy="by_title",
            max_characters=4000,
            new_after_n_chars=3800,
            combine_text_under_n_chars=2000,
            image_output_dir_path=output_path,
        )
        print(f"Processed {file_path}: {len(elements)} elements extracted.")
        return elements
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None

# Function to extract ZIP/RAR files
def extract_archive(archive_path, extract_dir, password=None):
    try:
        if archive_path.endswith(".zip"):
            with zipfile.ZipFile(archive_path, "r") as zip_ref:
                if password:
                    zip_ref.extractall(extract_dir, pwd=password.encode())
                else:
                    zip_ref.extractall(extract_dir)
        elif archive_path.endswith(".rar"):
            with rarfile.RarFile(archive_path, "r") as rar_ref:
                if password:
                    rar_ref.extractall(extract_dir, pwd=password)
                else:
                    rar_ref.extractall(extract_dir)
        else:
            print(f"Unsupported archive format: {archive_path}")
    except Exception as e:
        print(f"Error extracting {archive_path}: {e}")

# Main function to traverse the folder and process files
def process_folder(folder_path, output_path, archive_password=None):
    extracted_elements = []  # List to store elements of each file
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            file_path = os.path.join(root, file)
            file_extension = os.path.splitext(file_path)[1].lower()

            # Handle ZIP/RAR files
            if file_extension in [".zip", ".rar"]:
                print(f"Extracting archive: {file_path}")
                extract_dir = os.path.join(output_path, "extracted", os.path.splitext(file)[0])
                os.makedirs(extract_dir, exist_ok=True)
                extract_archive(file_path, extract_dir, password=archive_password)
                # Process the extracted files
                extracted_elements.extend(process_folder(extract_dir, output_path, archive_password))
            # Process supported file types
            elif file_extension in [".pdf", ".docx", ".pptx", ".ptx", ".doc", ".ppt", ".txt"]:
                print(f"Processing file: {file_path}")
                elements = process_file(file_path, output_path)
                if elements:
                    extracted_elements.append((file_path, elements))  # Store file path and its elements
            else:
                print(f"Skipping unsupported file: {file_path}")
    return extracted_elements

# Input and output paths
input_path = "/content/Downloads/"  # Replace with your actual input folder path
output_path = "/content/Output/"  # Replace with your desired output directory

# Password for protected archives (if any)
archive_password = "your_password_here"  # Replace with the actual password or set to None if no password

# Ensure the output directory exists
os.makedirs(output_path, exist_ok=True)

# Start processing and store extracted elements
extracted_elements = process_folder(input_path, output_path, archive_password)

# Print or process the extracted elements
for file_path, elements in extracted_elements:
    print(f"File: {file_path}")
    print(f"Number of elements: {len(elements)}")
    # You can further process the elements here

Processing file: /content/Downloads/Handout 2.9 (Web App Pen Testing - I).pdf
Error processing /content/Downloads/Handout 2.9 (Web App Pen Testing - I).pdf: tesseract is not installed or it's not in your PATH. See README file for more information.
Processing file: /content/Downloads/Week 09_ Select and Train a Model.pdf
Error processing /content/Downloads/Week 09_ Select and Train a Model.pdf: tesseract is not installed or it's not in your PATH. See README file for more information.
Processing file: /content/Downloads/Assignment 01.pdf
Error processing /content/Downloads/Assignment 01.pdf: tesseract is not installed or it's not in your PATH. See README file for more information.
Processing file: /content/Downloads/Handout 3.4 (Writing Shell Codes).pdf
Error processing /content/Downloads/Handout 3.4 (Writing Shell Codes).pdf: tesseract is not installed or it's not in your PATH. See README file for more information.
Processing file: /content/Downloads/Handout 3.3 (BoF Vulnerability).pdf


In [None]:
raw_pdf_elements

[<unstructured.documents.elements.CompositeElement at 0x7eaa3b5fdc90>,
 <unstructured.documents.elements.CompositeElement at 0x7eaa3b5fc8d0>,
 <unstructured.documents.elements.CompositeElement at 0x7eaa3b5f84d0>,
 <unstructured.documents.elements.CompositeElement at 0x7eaa3b5f8250>,
 <unstructured.documents.elements.CompositeElement at 0x7eaa3b5f9b50>,
 <unstructured.documents.elements.CompositeElement at 0x7eaa3b5e4150>,
 <unstructured.documents.elements.CompositeElement at 0x7eaa3b5f36d0>,
 <unstructured.documents.elements.CompositeElement at 0x7eaa3b5e6e10>,
 <unstructured.documents.elements.CompositeElement at 0x7eaa3b5589d0>,
 <unstructured.documents.elements.CompositeElement at 0x7eaa3b559310>,
 <unstructured.documents.elements.CompositeElement at 0x7eaa3b598910>,
 <unstructured.documents.elements.CompositeElement at 0x7eaa3b587f50>,
 <unstructured.documents.elements.CompositeElement at 0x7eaa8c3b5e50>,
 <unstructured.documents.elements.CompositeElement at 0x7eaa8c3a6ad0>,
 <unst

In [None]:
text_elements = []
table_elements = []
image_elements = []

def encode_image(image_path):
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode('utf-8')

for element in raw_pdf_elements:
    if 'CompositeElement' in str(type(element)):
        text_elements.append(element)
    elif 'Table' in str(type(element)):
        table_elements.append(element)

table_elements = [i.text for i in table_elements]
text_elements = [i.text for i in text_elements]

# Tables
print("The length of table elements are :", len(table_elements))

# Text
print("The length of text elements are :", len(text_elements))

for image_file in os.listdir(output_path):
    if image_file.endswith(('.png', '.jpg', '.jpeg')):
        image_path = os.path.join(output_path, image_file)
        encoded_image = encode_image(image_path)
        image_elements.append(encoded_image)

# image
print("The length of image elements are :",len(image_elements))

The length of table elements are : 0
The length of text elements are : 27
The length of image elements are : 27


In [None]:
from transformers import BlipProcessor, BlipForConditionalGeneration
import torch
from PIL import Image

# Load processor and model
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to("cuda" if torch.cuda.is_available() else "cpu")


preprocessor_config.json:   0%|          | 0.00/287 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/506 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/711k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/4.56k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/990M [00:00<?, ?B/s]

In [None]:
#chain_gpt = ChatGoogleGenerativeAI(model='gemini-1.5-pro', google_api_key="AIzaSyASv07qroqsPaXxmV0nw3IlZ_zc3TgL95s") # best method

In [None]:
import torch
from PIL import Image
from transformers import BlipProcessor, BlipForConditionalGeneration
import base64
import io

# Load BLIP model and processor
#processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
#model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to("cuda" if torch.cuda.is_available() else "cpu")

def summarize_image(encoded_image):
    """
    Uses BLIP to generate a description of an image from a base64-encoded string.

    Args:
        encoded_image (str): Base64-encoded image string.

    Returns:
        str: Generated description of the image.
    """
    # Decode base64 image
    image_bytes = base64.b64decode(encoded_image)
    image = Image.open(io.BytesIO(image_bytes)).convert("RGB")

    # Process image with BLIP
    inputs = processor(image, return_tensors="pt").to("cuda" if torch.cuda.is_available() else "cpu")

    # Generate image description
    out = model.generate(**inputs)
    caption = processor.decode(out[0], skip_special_tokens=True)

    return caption


In [None]:
# text_summaries=[summarize_text(i) for i in text_elements]
# table_summaries=[summarize_table(i) for i in table_elements]
image_summaries=[summarize_image(i) for i in image_elements]

model.safetensors:   0%|          | 0.00/990M [00:00<?, ?B/s]

In [None]:
 image_summaries

['the logo for the new york school',
 'a diagram showing the different types of the fpgt',
 'a line of code code code code code code code code code code code code code code code code code',
 'a diagram of a simple system',
 'a screenshot of a black background with a text description of the text',
 'a program for the program, the program is a program that includes a program that includes a program that',
 'a diagram of the mechanism of a mechanism',
 'passwords are the same numbers',
 'a diagram of a tree with several different types',
 "a black background with a white and blue text that reads, ' ' ' ' ' ' ' '",
 'a diagram showing the different types of the ppp',
 'a diagram showing the different types of the ppp',
 'the apple logo',
 'a diagram showing the different types of the data',
 'a diagram of the system',
 "a screenshot of a computer screen with the text ' ' ' ' ' ' ' ' ' '",
 'a diagram of a data flow',
 'the structure of a process',
 'the logo for the university of cambridge

In [None]:
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.storage import InMemoryStore
from langchain.retrievers.multi_vector import MultiVectorRetriever

# Load a free sentence-transformer model
embedding_function = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")


  embedding_function = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.7k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

1_Pooling%2Fconfig.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

In [None]:

# Initialize Chroma vector store
vectorstore = Chroma(collection_name="summaris", embedding_function=embedding_function)

# Initialize an in-memory store for metadata
store = InMemoryStore()

# Define ID key
id_key = "doc_id"

# Create Multi-Vector Retriever
retriever = MultiVectorRetriever(vectorstore=vectorstore, docstore=store, id_key=id_key, top_k=1)


  vectorstore = Chroma(collection_name="summaris", embedding_function=embedding_function)


In [None]:
import uuid
from langchain.schema import Document

def add_documents_to_retriever(summaries, original_contents):
    if not summaries:  # Check if summaries is empty
        print("Warning: No summaries provided. Skipping...")
        return

    doc_ids = [str(uuid.uuid4()) for _ in summaries]
    summary_docs = [
        Document(page_content=s, metadata={id_key: doc_ids[i]})
        for i, s in enumerate(summaries)
    ]

    if not summary_docs:  # Check again to prevent adding empty lists
        print("Warning: No valid summary documents to add.")
        return

    # Add to vectorstore only if summaries are not empty
    retriever.vectorstore.add_documents(summary_docs)
    retriever.docstore.mset(list(zip(doc_ids, original_contents)))



In [None]:
import google.generativeai as genai
from langchain.prompts import ChatPromptTemplate
from langchain.schema.runnable import RunnablePassthrough
from langchain.schema.output_parser import StrOutputParser

In [None]:
template = """Answer the question based only on the following context, which can include text, images and tables:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

# model = ChatOpenAI(temperature=0, model="gpt-4o")
#genai.configure(api_key="AIzaSyAoyHcYrG-G2vsQZfu1dTYudDUOvRkJ8M8")
model = ChatGoogleGenerativeAI(model='gemini-1.5-pro', google_api_key="AIzaSyAoyHcYrG-G2vsQZfu1dTYudDUOvRkJ8M8") # best method


chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)
question= "Overview of Operating System"
answer = chain.invoke(question)
print(answer)

An Operating System (OS) is a program, usually called the kernel, that controls application program execution and acts as an interface between the user and computer hardware.  Its primary goal is user convenience, and its secondary goal is efficient computer operation.  Key OS functions include:

* **Resource Management:** Manages hardware resources like CPU, memory, disk space, and I/O devices, preventing interference between programs and users.
* **File Management:** Handles file and directory creation, deletion, organization, permissions, and access control.
* **Process Management:** Manages program execution, including multitasking and resource allocation.
* **User Interface:** Provides a command-line (CLI), graphical (GUI), or Natural User Interface (NUI) for user interaction.
* **Security and Access Control:** Prevents unauthorized access through user accounts, passwords, and permissions.
* **Device Management:** Controls hardware peripherals via device drivers.


In [None]:
ret = chain.first
contexts=ret.invoke("Overview of the Operating System")["context"]
contexts

['Types of Operating Systems\n\nPopular operating systems include Windows, macOS, Linux, Android and iOS. Each of these operating systems has its own set of features and design philosophies, tailored to different types of users and hardware. Here’s a detailed overview of various operating systems, that you need NOT to memorize J, however, do give them a bird’s eye view',
 'Department of Computer Science FC College University\n\nDepartment of Data Science University of the Punjab\n\nHO#1.2: A Recap of OS with Linux\n\nOverview of Operating Systems\n\nAn OS is a program running on the computer (usually called the kernel), that controls the execution of application programs and acts as an interface between the user of a computer and the computer hardware. The primary goal of OS is convenience of user and secondary goal is efficient operation of the computer system. It manages computer hardware and provides services for computer programs.\n\nHere’s a quick rundown of its key functions:\n\n

In [None]:
ret = chain.first
contexts=ret.invoke("What are the main parts and concepts of operating system")["context"]
contexts

['Department of Computer Science FC College University\n\nDepartment of Data Science University of the Punjab\n\nHO#1.2: A Recap of OS with Linux\n\nOverview of Operating Systems\n\nAn OS is a program running on the computer (usually called the kernel), that controls the execution of application programs and acts as an interface between the user of a computer and the computer hardware. The primary goal of OS is convenience of user and secondary goal is efficient operation of the computer system. It manages computer hardware and provides services for computer programs.\n\nHere’s a quick rundown of its key functions:\n\nAe tes tee\n\n• Resource Management:\n\nmanages the computer’s hardware resources, such as the CPU, memory, disk space, and input/output devices. It ensures that different programs and users running on the computer don’t interfere with each other’s operations.\n\n• File Management: It handles the creation, deletion, and organization of files and directories on storage dev

In [None]:
import nltk
from nltk.translate.meteor_score import meteor_score

In [None]:
reference = ground_truth
generated = answer

meteor = meteor_score([reference.split()], generated.split())
print(f"METEOR Score: {meteor:.4f}")

In [None]:
reference_tokens = nltk.word_tokenize(reference.lower())
generated_tokens = nltk.word_tokenize(generated.lower())

reference_set = set(reference_tokens)
generated_set = set(generated_tokens)

In [None]:
# Calculate True Positives (correct matches), False Positives, and False Negatives
tp = len(reference_set.intersection(generated_set))
fp = len(generated_set - reference_set)
fn = len(reference_set - generated_set)

# Precision, Recall, and F1 Score calculations
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

In [None]:
from bert_score import BERTScorer

In [None]:
reference = answer
candidate = ground_truth

In [None]:
scorer = BERTScorer(model_type='bert-base-uncased')
P, R, F1 = scorer.score([candidate], [reference])
print(f"BERTScore Precision: {P.mean():.4f}, Recall: {R.mean():.4f}, F1: {F1.mean():.4f}")

In [None]:
import mlflow
import openai
import os
import pandas as pd
import dagshub

dagshub.init(repo_owner='priyanshusingh8518', repo_name='my-first-repo', mlflow=True)
mlflow.set_tracking_uri("https://dagshub.com/priyanshusingh8518/my-first-repo.mlflow")

eval_data = pd.DataFrame(
    {
        "inputs": [
            "What is MLflow?",
            "What is Spark?",
        ],
        "ground_truth": [
            "MLflow is an open-source platform for managing the end-to-end machine learning (ML) "
            "lifecycle. It was developed by Databricks, a company that specializes in big data and "
            "machine learning solutions. MLflow is designed to address the challenges that data "
            "scientists and machine learning engineers face when developing, training, and deploying "
            "machine learning models.",
            "Apache Spark is an open-source, distributed computing system designed for big data "
            "processing and analytics. It was developed in response to limitations of the Hadoop "
            "MapReduce computing model, offering improvements in speed and ease of use. Spark "
            "provides libraries for various tasks such as data ingestion, processing, and analysis "
            "through its components like Spark SQL for structured data, Spark Streaming for "
            "real-time data processing, and MLlib for machine learning tasks",
        ],
    }
)

mlflow.set_experiment("LLM Evaluation")

with mlflow.start_run() as run:
    system_prompt = "Answer the following question in two sentences"
    # Wrap "gpt-4" as an MLflow model.
    logged_model_info = mlflow.openai.log_model(
        model="gpt-4",
        task=openai.chat.completions,
        artifact_path="model",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": "{question}"},
        ],
    )

    # Configure column mapping for evaluation
    evaluator_config = {
        'col_mapping': {'context': 'inputs'}
    }

    # Use predefined question-answering metrics to evaluate our model.
    results = mlflow.evaluate(
        logged_model_info.model_uri,
        eval_data,
        targets="ground_truth",
        model_type="question-answering",
        extra_metrics=[
            mlflow.metrics.genai.answer_correctness(),
            mlflow.metrics.genai.answer_relevance(),
            mlflow.metrics.genai.faithfulness()
        ],
        evaluator_config=evaluator_config
    )

    print(f"See aggregated evaluation results below: \n{results.metrics}")

    # Evaluation result for each data record is available in `results.tables`.
    eval_table = results.tables["eval_results_table"]
    df = pd.DataFrame(eval_table)
    df.to_csv('eval.csv')
    print(f"See evaluation table below: \n{eval_table}")
