# Installing Libraries

In [None]:
pip install huggingface_hub


In [None]:
pip install google-cloud-storage


# Inporting and using required libraries

In [None]:
import os
from huggingface_hub import snapshot_download
from google.cloud import storage
import subprocess
import shutil

In [None]:
# Function to snapshot the gaia-benchmark/GAIA dataset to local

In [None]:
import os
from huggingface_hub import snapshot_download

# Function to download the dataset to a desired local path
def download_dataset_to_local(repo_id, download_path):
   
    print(f"Downloading dataset from Hugging Face to {download_path}...")
    local_dir = snapshot_download(repo_id, repo_type="dataset", local_dir=download_path, revision='main')
    print(f"Dataset downloaded to {local_dir}")
    return local_dir


HUGGING_FACE_REPO_ID = "gaia-benchmark/GAIA" 
DESIRED_LOCAL_PATH = "/Users/shubhamagarwal/Documents/Northeastern/Big_Data_Assignment_2/gaia_dataset"  


local_dataset_path = download_dataset_to_local(HUGGING_FACE_REPO_ID, DESIRED_LOCAL_PATH)


# Fuction to scan the dataset for malware clamscan library 

In [None]:
import os
import subprocess

# Function to scan a file for malware using ClamAV
def scan_for_malware(file_path):
    """Scans a file for malware using ClamAV."""
    result = subprocess.run(['clamscan', file_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    output = result.stdout.decode()

    if "Infected files: 0" in output:
        return True 
    else:
        return False  

# Function to scan all files in the local dataset directory
def scan_local_dataset(local_dir):

    total_files = 0
    clean_files = 0
    infected_files = 0
    total_dirs = 0
    
    print("Scanning files for malware...")
    for root, dirs, files in os.walk(local_dir):
        total_dirs += 1
        for file in files:
            total_files += 1
            file_path = os.path.join(root, file)

            # Scan each file and record the result
            if scan_for_malware(file_path):
                clean_files += 1
            else:
                infected_files += 1
    
    print(f"Scanning complete. Directories scanned: {total_dirs}, Files scanned: {total_files}, Clean files: {clean_files}, Infected files: {infected_files}")
    return total_dirs, total_files, clean_files, infected_files

local_dataset_path = "/Users/shubhamagarwal/Documents/Northeastern/Big_Data_Assignment_2/gaia_dataset"

scan_local_dataset(local_dataset_path)


# Setup to use GCP Service account key

# Function to upload scanned files to GCP Object bucket path

In [None]:
import os
from google.cloud import storage

# Function to upload files to GCP while preserving folder structure
def upload_to_gcp(bucket_name, source_file_path, destination_blob_name, project_id):
    """Uploads a file to GCP bucket."""
    storage_client = storage.Client(project=project_id)
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Upload the file
    blob.upload_from_filename(source_file_path)
    print(f"File {source_file_path} uploaded to {destination_blob_name}.")

# Function to upload all files to GCP, preserving folder structure
def upload_all_files_to_gcp(local_dir, gcp_bucket_name, target_gcp_dir="", project_id=None):

    if project_id is None:
        raise ValueError("Project ID is required to upload files to GCP.")

    print(f"Uploading files from {local_dir} to GCP bucket {gcp_bucket_name}...")

    for root, dirs, files in os.walk(local_dir):
        for file in files:
            file_path = os.path.join(root, file)

            # Create the relative path from the base local directory
            relative_path = os.path.relpath(file_path, local_dir)

            # Add the target directory in GCP if provided
            destination_blob_name = os.path.join(target_gcp_dir, relative_path) if target_gcp_dir else relative_path

            # Upload the file
            upload_to_gcp(gcp_bucket_name, file_path, destination_blob_name, project_id)

    print("All files uploaded successfully.")

# Example usage
GCP_BUCKET_NAME = "bigdataia_fall2024_team9_assignment1_bucket" 
LOCAL_DIR = "/Users/shubhamagarwal/Documents/Northeastern/Big_Data_Assignment_2/gaia_dataset" 
TARGET_GCP_DIR = "project_2"  
PROJECT_ID = "civil-tube-436417-k8" 

upload_all_files_to_gcp(LOCAL_DIR, GCP_BUCKET_NAME, TARGET_GCP_DIR, project_id=PROJECT_ID)


In [None]:
! pip install google-cloud-documentai


In [None]:
# !pip install --upgrade protobuf
# ! pip install --upgrade google-cloud-storage
# ! pip install --upgrade google-cloud-documentai
# ! pip install --upgrade protobuf
# ! pip install protobuf


In [None]:
! pip install protobuf


In [15]:
# pip install PyPDF2


Collecting PyPDF2
  Downloading pypdf2-3.0.1-py3-none-any.whl.metadata (6.8 kB)
Downloading pypdf2-3.0.1-py3-none-any.whl (232 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m232.6/232.6 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: PyPDF2
Successfully installed PyPDF2-3.0.1
Note: you may need to restart the kernel to use updated packages.


# Uploading Combined JSON Results to Google Cloud Storage

In [8]:
import os
from google.cloud import storage, documentai_v1 as documentai
from PyPDF2 import PdfReader, PdfWriter

# Initialize the Document AI client
def get_document_ai_client():
    return documentai.DocumentProcessorServiceClient()

# Process a PDF file using Document AI
def process_document(project_id, location, processor_id, file_path):
    client = get_document_ai_client()

    # Read the PDF from local storage
    with open(file_path, "rb") as pdf_file:
        content = pdf_file.read()

    # Configure the request for Document AI API
    raw_document = {"content": content, "mime_type": "application/pdf"}
    name = f"projects/{project_id}/locations/{location}/processors/{processor_id}"

    # Call the API to process the document
    request = documentai.ProcessRequest(name=name, raw_document=raw_document)
    result = client.process_document(request=request)
    return result.document

# Upload combined JSON results to GCP bucket
def upload_json_to_gcp(bucket_name, destination_blob_name, content):
    storage_client = storage.Client(project="civil-tube-436417-k8")
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Upload the JSON content
    blob.upload_from_string(content)
    print(f"Uploaded combined result to {destination_blob_name} in bucket {bucket_name}")

# Split the PDF into chunks of a maximum of 15 pages
def split_pdf(input_pdf_path, output_dir, chunk_size=15):
    pdf_reader = PdfReader(input_pdf_path)
    total_pages = len(pdf_reader.pages)
    chunk_paths = []

    for i in range(0, total_pages, chunk_size):
        pdf_writer = PdfWriter()
        chunk_filename = os.path.join(output_dir, f"split_{i // chunk_size + 1}.pdf")

        for j in range(i, min(i + chunk_size, total_pages)):
            pdf_writer.add_page(pdf_reader.pages[j])

        with open(chunk_filename, 'wb') as output_pdf:
            pdf_writer.write(output_pdf)
        
        chunk_paths.append(chunk_filename)

    return chunk_paths

# Process each chunked PDF, combine the results, and upload the combined result
def process_and_upload_combined_chunks(project_id, location, processor_id, bucket_name, pdf_file, output_dir):
    # Split the large PDF into chunks
    chunk_paths = split_pdf(pdf_file, "/tmp")

    combined_text = ""
    for chunk_path in chunk_paths:
        print(f"Processing chunk: {chunk_path}")
        processed_doc = process_document(project_id, location, processor_id, chunk_path)

        # Combine the text from each processed chunk
        combined_text += processed_doc.text

    # Upload the combined result as one JSON file
    json_output_path = f"{output_dir}/{os.path.basename(pdf_file).replace('.pdf', '_combined.json')}"
    full_output_path = os.path.join("project_2", json_output_path)  
    upload_json_to_gcp(bucket_name, full_output_path, combined_text)

# Function to list PDF files in nested directories of the GCP bucket
def list_pdfs_in_bucket(bucket_name, prefix):
    storage_client = storage.Client(project="civil-tube-436417-k8")
    bucket = storage_client.bucket(bucket_name)

    # Recursively list all blobs in the bucket with the specified prefix (directory)
    blobs = bucket.list_blobs(prefix=prefix)
    pdf_files = [blob.name for blob in blobs if blob.name.endswith(".pdf")]
    return pdf_files

# Main function to process PDFs from GCP bucket, combine results, and upload the combined result
def process_pdfs_in_bucket(bucket_name, prefix, project_id, location, processor_id, output_dir):
    # List all PDF files in the GCP bucket
    pdf_files = list_pdfs_in_bucket(bucket_name, prefix)

    # Process each PDF and combine results
    for pdf_file in pdf_files:
        print(f"Processing file: {pdf_file}")

        # Download the file to local temporarily
        local_pdf_path = f"/tmp/{os.path.basename(pdf_file)}"
        storage_client = storage.Client(project="civil-tube-436417-k8")
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(pdf_file)
        blob.download_to_filename(local_pdf_path)

        # Process the PDF using Document AI API in chunks and combine results
        process_and_upload_combined_chunks(project_id, location, processor_id, bucket_name, local_pdf_path, output_dir)

# Parameters
# PROJECT_ID = "civil-tube-436417-k8"  
# LOCATION = "us" 
# PROCESSOR_ID = "d32ef0b7fb707d"  
# BUCKET_NAME = "bigdataia_fall2024_team9_assignment1_bucket" 
# PREFIX = "project_2" 
# OUTPUT_DIR = "gcp_document_api_processed"  

# Call the function to process and store the parsed results
process_pdfs_in_bucket(BUCKET_NAME, PREFIX, PROJECT_ID, LOCATION, PROCESSOR_ID, OUTPUT_DIR)


Processing file: project_2/2023/test/021a5339-744f-42b7-bd9b-9368b3efda7a.pdf
Processing chunk: /tmp/split_1.pdf
Uploaded combined result to project_2/gcp_document_api_processed/021a5339-744f-42b7-bd9b-9368b3efda7a_combined.json in bucket bigdataia_fall2024_team9_assignment1_bucket
Processing file: project_2/2023/test/32f386b9-73ee-4455-b412-ddad508aa979.pdf
Processing chunk: /tmp/split_1.pdf
Uploaded combined result to project_2/gcp_document_api_processed/32f386b9-73ee-4455-b412-ddad508aa979_combined.json in bucket bigdataia_fall2024_team9_assignment1_bucket
Processing file: project_2/2023/test/4044eab7-1282-42bd-a559-3bf3a4d5858e.pdf
Processing chunk: /tmp/split_1.pdf
Uploaded combined result to project_2/gcp_document_api_processed/4044eab7-1282-42bd-a559-3bf3a4d5858e_combined.json in bucket bigdataia_fall2024_team9_assignment1_bucket
Processing file: project_2/2023/test/634fca59-03b2-4cdf-9ce4-0205df22f256.pdf
Processing chunk: /tmp/split_1.pdf
Uploaded combined result to project_2/