In [None]:
import json
import os
import time
from pathlib import Path
import vertexai
from vertexai.generative_models import GenerativeModel, Part
from google.cloud import storage
from google.cloud.storage import transfer_manager

# Configuration
project_id = "kiaraerica"
location = "us-central1"
bucket_name = "us_climate"
raw_folder = "initial-loads/ccus_facility/raw/"
llm_folder = "initial-loads/ccus_facility/llm_text/"
model_name = "gemini-1.5-flash-001"

# Updated prompt for CCS data extraction
prompt = """Extract structured Carbon Capture and Storage (CCS) facility data from this document.
Identify and return the following fields:
1. Facility Number (ID) - the number assigned to each facility in the dataset.
2. Facility Name
3. Organization
4. City
5. State
6. Category (e.g., Capture, Storage, Transport)
7. Status (e.g., Operational, In Development, Proposed, Under Construction, Inactive)
8. Industry (e.g., Power Generation, Hydrogen Production, Ethanol Production)

Return each facility as a separate JSON object, one per line (NDJSON format), like:
{"id": "integer", "facility": "string", "organization": "string", "city": "string", "state": "string", "category": "string", "status": "string", "industry": "string"}

Do not return an array. Do not add extra text. Return only NDJSON.
"""

def extract():
    """Extracts structured CCS facility data from all PDFs in `raw_folder` using Gemini and outputs NDJSON format."""
    vertexai.init(project=project_id, location=location)
    model = GenerativeModel(model_name)

    storage_client = storage.Client()
    blobs = storage_client.list_blobs(bucket_name, prefix=raw_folder)

    for blob in blobs:
        if blob.name == raw_folder:
            continue  # Skip folder itself

        # Generate correct NDJSON file path
        json_filename = Path(llm_folder) / (Path(blob.name).stem + ".jsonl")  # Using .jsonl for NDJSON
        print(f"Saving NDJSON file to: {json_filename}")

        # Skip if already processed
        if json_filename.exists():
            print(f"{json_filename} already exists, skipping...")
            continue

        print(f"Processing {blob.name}...")
        file_content = Part.from_uri(f"gs://{bucket_name}/{blob.name}", "application/pdf")
        resp = model.generate_content([file_content, prompt])

        # Remove any extraneous Markdown artifacts
        resp_text = resp.text.strip().replace("```json", "").replace("```", "")
        raw_lines = resp_text.split("\n")  # Split response line by line

        # Ensure folder exists
        json_filename.parent.mkdir(parents=True, exist_ok=True)

        valid_json_lines = []

        for line in raw_lines:
            line = line.strip()
            if not line:
                continue  # Skip empty lines

            try:
                json_obj = json.loads(line)  # Parse each line separately
                valid_json_lines.append(json.dumps(json_obj))  # Store valid JSON strings
            except json.JSONDecodeError:
                print(f"⚠️ Skipping malformed JSON line: {line}")

        if not valid_json_lines:
            print(f"❌ Error: No valid JSON extracted for {blob.name}. Skipping file.")
            continue  # Move to next file without saving invalid data

        # Save as newline-delimited JSON (NDJSON)
        with open(json_filename, "w", encoding="utf-8") as f:
            f.write("\n".join(valid_json_lines) + "\n")  # Write valid JSON objects line by line

        print(f"✅ Successfully saved NDJSON: {json_filename}")

def copy_to_GCS(local_folder, gcs_folder, file_extension):
    """Uploads all processed JSONL files to Google Cloud Storage."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    directory_as_path_obj = Path(local_folder).resolve()
    print(f"Checking for JSONL files in: {directory_as_path_obj}")

    file_paths = list(directory_as_path_obj.rglob(file_extension))
    print(f"Found JSONL files: {file_paths}")

    if not file_paths:
        print(f"⚠️ No JSONL files found in {local_folder}, skipping upload.")
        return

    correct_gcs_folder = gcs_folder.rstrip("/") + "/"

    results = transfer_manager.upload_many_from_filenames(
        bucket,
        [str(f.relative_to(directory_as_path_obj)) for f in file_paths],
        source_directory=str(directory_as_path_obj),
        blob_name_prefix=correct_gcs_folder,
        max_workers=5
    )

    for name, result in zip(file_paths, results):
        if isinstance(result, Exception):
            print(f"⚠️ Failed to upload {name} due to {result}")
        else:
            print(f"✅ Uploaded {name} to {bucket.name}/{correct_gcs_folder}")

if __name__ == "__main__":
    extract()  # Process ALL PDFs and convert to NDJSON
    copy_to_GCS(llm_folder, llm_folder, "*.jsonl")  # Upload ALL JSONL files


Saving NDJSON file to: initial-loads/ccus_facility/llm_text/CCSMap25.jsonl
Processing initial-loads/ccus_facility/raw/CCSMap25.pdf...
⚠️ Skipping malformed JSON line: {"id": 136, "facility": "Natural Gas", "organization": "Natural Gas", "city": "Cotton Cove", "state": "TX", "category": "Capture, Storage", "status": "Operational", "industry": "Natural Gas
✅ Successfully saved NDJSON: initial-loads/ccus_facility/llm_text/CCSMap25.jsonl
Checking for JSONL files in: /content/initial-loads/ccus_facility/llm_text
Found JSONL files: [PosixPath('/content/initial-loads/ccus_facility/llm_text/CCSMap25.jsonl')]
✅ Uploaded /content/initial-loads/ccus_facility/llm_text/CCSMap25.jsonl to us_climate/initial-loads/ccus_facility/llm_text/
