In [2]:
import sys

sys.path.append("..")

from function.cloud import list_files

venue = "The Thursday Club"


def is_processed(venue):
    return (
        len(
            list_files(filter=rf"processed/adobe_extracted/{venue}/structuredData.json")
        )
        > 0
    )


is_processed(venue)

True

In [3]:
all_jsons = list_files(filter=r"processed/adobe_extracted/.*/structuredData.json")


In [4]:
import re

pattern = re.compile(r"processed/adobe_extracted/(.*)/structuredData.json")

all_processed_venues = set([pattern.findall(json)[0] for json in all_jsons])


In [5]:
from function.retriever import get_all_venue_names_on_cloud

all_venues = get_all_venue_names_on_cloud()

# is_present = [is_processed(venue) for venue in all_venues()]

In [6]:
all_processed_venues - set(all_venues)

set()

In [None]:
import io
import os
import re
import sys
import time
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Dict, List
from zipfile import ZipFile

import pandas as pd
from dotenv import load_dotenv
from google.cloud import storage
from google.cloud.storage import Client, transfer_manager

sys.path.append("..")
from function.pdf_loader import *

load_dotenv()

In [1]:
def list_files(bucket_name, filter=None):
    if filter is not None:
        filter = re.compile(filter, re.IGNORECASE)

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blobs = bucket.list_blobs()
    return [blob.name for blob in blobs if filter is None or filter.search(blob.name)]


def download_file(bucket_name: str, source_blob_name: str, destination_file_name: str):
    os.makedirs(os.path.dirname(destination_file_name), exist_ok=True)

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(source_blob_name)

    blob.download_to_filename(destination_file_name)
    return destination_file_name


def download_files(bucket_name: str, files: list[str]):
    client = Client()
    bucket = client.bucket(bucket_name)

    # Create download objects
    downloads = [(bucket.blob(file_name), file_name) for file_name in files]

    # Make sure directories exist
    for _, dest_path in downloads:
        os.makedirs(os.path.dirname(dest_path), exist_ok=True)

    # Use transfer_manager to download files in parallel
    results = transfer_manager.download_many(
        downloads,
        max_workers=10,  # adjust number of workers as needed
    )

    return results


def get_storage_client() -> storage.Client:
    """Initialize and return Google Cloud Storage client"""
    return storage.Client()


def upload_file(source_file_path: str, destination_blob_name: str | None = None) -> str:
    """
    Upload a single file to Google Cloud Storage.

    Parameters
    ----------
    source_file_path : str
        Local path to the file to upload
    destination_blob_name : str, optional
        Destination path in the bucket. If None, uses the source filename

    Returns
    -------
    str
        The public URL of the uploaded file
    """
    if destination_blob_name is None:
        destination_blob_name = Path(source_file_path).name

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_path)

    return blob.public_url


def upload_files(file_pairs: list[tuple[str, str]]) -> list[str]:
    """
    Upload multiple files to Google Cloud Storage in parallel.

    Parameters
    ----------
    file_pairs : list[tuple[str, str]]
        List of (source_path, destination_path) tuples

    Returns
    -------
    list[str]
        List of public URLs for the uploaded files
    """
    client = Client()
    bucket = client.bucket(bucket_name)

    # Create list of (source_file, destination_blob) tuples
    uploads = [
        (file_path, bucket.blob(dest_path)) for file_path, dest_path in file_pairs
    ]

    # Upload files in parallel
    results = transfer_manager.upload_many(
        uploads,
        max_workers=10,
    )

    # Return public URLs of uploaded files
    return [blob.public_url for _, blob in uploads]


def upload_directory(local_directory: str, bucket_prefix: str = "") -> list[str]:
    """
    Upload an entire directory and its contents recursively.

    Parameters
    ----------
    local_directory : str
        Path to local directory to upload
    bucket_prefix : str, optional
        Prefix to add to all files in the bucket

    Returns
    -------
    list[str]
        List of public URLs for all uploaded files
    """
    local_dir = Path(local_directory)

    # Get all files in directory and subdirectories
    all_files = [
        (str(path), os.path.join(bucket_prefix, path.relative_to(local_dir).as_posix()))
        for path in local_dir.rglob("*")
        if path.is_file()
    ]

    return upload_files(all_files)


In [2]:
def process_pdf_from_cloud_with_temp_file(
    bucket_name: str,
    blob_name: str,
    output_folder: str,
    client_id: str,
    client_secret: str,
    max_retries: int = 3,
    retry_delay: int = 5,
):
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)

    # Get PDF name and create subfolder
    pdf_name = os.path.splitext(os.path.basename(blob_name))[0]
    output_pdf_dir = os.path.join(output_folder, pdf_name)  # Organized by PDF name
    os.makedirs(output_pdf_dir, exist_ok=True)

    output_zip_path = os.path.join(output_pdf_dir, "sdk.zip")  # ZIP inside subfolder
    temp_file_path = None
    success = False

    # Check if extracted folder already exists in GCS
    gcs_extracted_prefix = f"processed/adobe_extracted/{pdf_name}/"
    blobs = list(bucket.list_blobs(prefix=gcs_extracted_prefix))
    if blobs:  # Skip if extracted files already exist
        print(f"Skipping {pdf_name}: Extracted files already exist in GCS.")
        return True

    # Retry mechanism
    for attempt in range(1, max_retries + 1):
        try:
            print(f"Processing PDF: {pdf_name} (Attempt {attempt}/{max_retries})...")

            # Step 1: Download PDF to a temp file
            with NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file:
                temp_file_path = temp_file.name
                blob.download_to_filename(temp_file_path)

            # Step 2: Process PDF with adobeLoader
            adobeLoader(
                temp_file_path,
                output_zip_path,
                client_id=client_id,
                client_secret=client_secret,
            )

            # Step 3: Upload the processed ZIP file to GCS
            gcs_destination = f"processed/adobe_result/{pdf_name}/sdk.zip"
            public_url = upload_file(output_zip_path, gcs_destination)
            print(f"Uploaded to GCS: {public_url}")

            # Step 4: Unzip and upload extracted files to GCS
            print(f"Extracting ZIP for {pdf_name} and uploading files to GCS...")
            with open(output_zip_path, "rb") as zip_file_stream:
                with ZipFile(zip_file_stream) as zip_file:
                    for file_name in zip_file.namelist():
                        file_data = zip_file.read(file_name)  # Read extracted content
                        extracted_blob_name = (
                            f"processed/adobe_extracted/{pdf_name}/{file_name}"
                        )

                        # Upload extracted file to GCS
                        extracted_blob = bucket.blob(extracted_blob_name)
                        extracted_blob.upload_from_string(file_data)
                        print(f"Uploaded extracted file to GCS: {extracted_blob_name}")

            success = True
            break  # Exit loop on success

        except Exception as e:
            print(f"Error processing PDF {pdf_name} (Attempt {attempt}): {e}")
            if attempt < max_retries:
                print(f"Retrying in {retry_delay} seconds...")
                time.sleep(retry_delay)
            else:
                print(f"Failed to process PDF {pdf_name} after {max_retries} attempts.")

        finally:
            # Cleanup temporary file
            if temp_file_path and os.path.exists(temp_file_path):
                os.remove(temp_file_path)

    return success


In [8]:
load_dotenv(override=True)

True

In [10]:
def main():
    load_dotenv(override=True)

    # Get configuration
    bucket_name = "wedding-venues-001"
    client_id = os.getenv("ADOBE_CLIENT_ID")
    client_secret = os.getenv("ADOBE_CLIENT_SECRET")
    if not all([bucket_name, client_id, client_secret]):
        raise ValueError("Missing required environment variables")
    print(f"client id is {client_id}")

    # List all files
    print("Listing files in bucket...")
    files = list_files(bucket_name=bucket_name, filter="venues/")
    pdf_files = [f for f in files if f.lower().endswith(".pdf")]
    pdf_files = pdf_files[195:]  # Process first 100 files
    print(f"Found {len(pdf_files)} PDF files")

    # GCS folder for processed results
    output_folder = "processed/adobe_result"

    # Process each PDF
    results = {"successful": 0, "failed": 0, "failed_files": []}

    for pdf_file in pdf_files:
        success = process_pdf_from_cloud_with_temp_file(
            bucket_name=bucket_name,
            blob_name=pdf_file,
            output_folder=output_folder,
            client_id=client_id,
            client_secret=client_secret,
            max_retries=3,
            retry_delay=5,
        )

        if success:
            results["successful"] += 1
        else:
            results["failed"] += 1
            results["failed_files"].append(pdf_file)

    # Print summary
    print("\nProcessing Summary:")
    print(f"Successfully processed: {results['successful']}")
    print(f"Failed: {results['failed']}")
    if results["failed_files"]:
        print("\nFailed files:")
        for file in results["failed_files"]:
            print(f"- {file}")


In [11]:
bucket_name = "wedding-venues-001"
if __name__ == "__main__":
    main()

client id is cd7c3784d3954b52bfd060cdf1ec04f0
Listing files in bucket...
Found 85 PDF files
Processing PDF: San Ysidro Ranch (Attempt 1/3)...
Uploaded to GCS: https://storage.googleapis.com/wedding-venues-001/processed/adobe_result/San%20Ysidro%20Ranch/sdk.zip
Extracting ZIP for San Ysidro Ranch and uploading files to GCS...
Uploaded extracted file to GCS: processed/adobe_extracted/San Ysidro Ranch/structuredData.json
Uploaded extracted file to GCS: processed/adobe_extracted/San Ysidro Ranch/figures/fileoutpart9.png
Uploaded extracted file to GCS: processed/adobe_extracted/San Ysidro Ranch/figures/fileoutpart4.png
Uploaded extracted file to GCS: processed/adobe_extracted/San Ysidro Ranch/figures/fileoutpart3.png
Uploaded extracted file to GCS: processed/adobe_extracted/San Ysidro Ranch/figures/fileoutpart6.png
Uploaded extracted file to GCS: processed/adobe_extracted/San Ysidro Ranch/figures/fileoutpart8.png
Uploaded extracted file to GCS: processed/adobe_extracted/San Ysidro Ranch/fig

In [12]:
import re

from google.cloud import storage


def list_main_folders(bucket_name, filter=None):
    if filter is not None:
        filter = re.compile(filter, re.IGNORECASE)

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blobs = bucket.list_blobs()

    # Collect only primary folders (ignoring deeper paths like "figures" or "tables")
    main_folders = set()
    for blob in blobs:
        if "/" in blob.name:  # Check for folder-like structure
            folder_path = "/".join(blob.name.split("/")[:-1])  # Exclude the file itself
            if filter is None or filter.search(folder_path):
                # Only include top-level folders (filter out deeper paths)
                if not any(
                    folder_path.endswith(extra) for extra in ["/figures", "/tables"]
                ):
                    main_folders.add(folder_path)

    return sorted(main_folders)  # Return sorted folder names


# Example usage
bucket_name = "wedding-venues-001"
folders = list_main_folders(bucket_name=bucket_name, filter="processed/adobe_extracted")
len(folders)


271