In [None]:
!pip install transformers torch numpy google-cloud-storage

# Equipment Profile Vector Generation

This notebook provides **two separate methods** for generating SigLIP embeddings with **logical export destinations**:

## üìã Notebook Structure

### 1. üîß **Common Setup** 
- Install dependencies and load the SigLIP model
- **Run this first!**

### 2. üóÇÔ∏è **Local Storage Method**
- Process images from local directories (`dataset/equipment_test/`)
- **Export**: üíæ **LOCAL storage**

### 3. ‚òÅÔ∏è **Cloud Storage Method (GCS)**
- Process images from Google Cloud Storage
- **Export**: ‚òÅÔ∏è **CLOUD storage**

### 4. üöÄ **Execution Cells**
- **Local Storage Execution**: Local files ‚Üí Local save
- **Cloud Storage Execution**: GCS files ‚Üí Cloud save

## üéØ Logical Behavior

### üóÇÔ∏è **Local Method** (`generate_siglip_embeddings_local`)
- **Data Source**: Local directories
- **Export**: üíæ Local storage

### ‚òÅÔ∏è **Cloud Method** (`generate_siglip_embeddings_gcs`)
- **Data Source**: Google Cloud Storage
- **Export**: ‚òÅÔ∏è Cloud storage

## üèÉ‚Äç‚ôÇÔ∏è Quick Start

**For Local Processing:**
```python
# Process local files ‚Üí Save locally
generate_siglip_embeddings_local(equipment_base, filename)
```

**For Cloud Processing:**
```python
# Process cloud files ‚Üí Save to cloud
generate_siglip_embeddings_gcs(bucket, gcs_path, gcs_output_path)
```

---

In [7]:
import os
import json
import numpy as np
import torch
from PIL import Image
from transformers import AutoProcessor, AutoModel
from google.cloud import storage
import uuid
import io

# Load SigLIP Model (run once)
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {DEVICE}")

try:
    processor = AutoProcessor.from_pretrained("google/siglip-base-patch16-224")
    model = AutoModel.from_pretrained("google/siglip-base-patch16-224").to(DEVICE)
    model.eval()
    print("SigLIP model loaded successfully.")
except Exception as e:
    print(f"Error loading SigLIP model: {e}")
    exit()

print("‚úÖ Common setup completed - Model and processor ready!")

Using device: cpu
SigLIP model loaded successfully.
‚úÖ Common setup completed - Model and processor ready!


In [8]:
# --- Common Configuration ---
PROJECT_ID = "629242692180"
GCS_OUTPUT_PATH = "gs://axmt_equipment_profile/siglip_vectors/local_image_vectors.json" 
LOCAL_OUTPUT_FILENAME = "jsonl/local_image_vectors.json"

In [9]:
# ===== LOCAL STORAGE METHOD ===== 
# üóÇÔ∏è Process images from local directories ‚Üí Export to LOCAL ONLY

# Local Configuration
equipment_base_local = "dataset/equipment_train"
# equipment_base_local = "dataset/equipment_train"  # Alternative for training data

print(f"üóÇÔ∏è LOCAL STORAGE METHOD")
print(f"üìÇ Loading equipment data from local path: {equipment_base_local}")
print(f"üíæ Export: LOCAL storage ONLY")

def get_local_data_paths(equipment_base):
    """Get list of local equipment directories"""
    local_paths = []
    if os.path.exists(equipment_base):
        for item in sorted(os.listdir(equipment_base)):
            item_path = os.path.join(equipment_base, item)
            if os.path.isdir(item_path) and not item.startswith('.'):
                local_paths.append(item_path)
    return local_paths

def get_local_image_paths(local_dirs):
    """Get all image paths from local directories"""
    image_paths = []
    for local_dir in local_dirs:
        if not os.path.isdir(local_dir):
            print(f"Warning: Directory not found - {local_dir}")
            continue
        for filename in os.listdir(local_dir):
            if filename.lower().endswith(('.jpg', '.jpeg', '.png')):
                image_paths.append(os.path.join(local_dir, filename))
    return image_paths

def generate_siglip_embeddings_local(
    equipment_base: str,
    output_filename: str
):
    """
    Create SigLIP Embeddings from LOCAL images ‚Üí Export to LOCAL storage ONLY
    :param equipment_base: Local directory containing equipment folders
    :param output_filename: Local file path where results will be saved
    """
    # Get local data paths
    local_data_paths = get_local_data_paths(equipment_base)
    all_image_paths = get_local_image_paths(local_data_paths)
    
    print(f"Found {len(local_data_paths)} directories in {equipment_base}:")
    for path in local_data_paths:
        print(f"  - {path}")
    print(f"Total images found: {len(all_image_paths)}")
    print(f"Export destination: LOCAL ({output_filename})")

    if not all_image_paths:
        print("No image files found in local directories. Exiting.")
        return

    count = 0
    
    # Create embeddings file
    with open(output_filename, "w") as f_out:
        for image_path in all_image_paths:
            # Get Class Label from local path
            label_class = os.path.basename(os.path.dirname(image_path))
            image_id = f"{label_class}_{uuid.uuid4()}"
            
            try:
                # 1. Load and prepare local image
                image = Image.open(image_path).convert("RGB")
                inputs = processor(images=image, return_tensors="pt").to(DEVICE)
                
                # 2. Create Embedding
                with torch.no_grad():
                    outputs = model.get_image_features(**inputs)
                    image_embedding = outputs / outputs.norm(p=2, dim=-1, keepdim=True)
                
                embedding_vector = image_embedding.squeeze(0).tolist()
                # --- New embedding_metadata field ---
                embedding_metadata = {
                    "label_class": label_class,
                    "original_path": image_path
                }
                # 3. Create JSONL Object
                jsonl_record = {
                    "id": image_id,
                    "embedding": embedding_vector,
                    "restricts": [{'namespace': 'class', 'allow': [label_class]}],
                    "embedding_metadata": embedding_metadata
                }
                
                # 4. Save locally
                f_out.write(json.dumps(jsonl_record) + "\n")
                count += 1
                
                if count % 100 == 0:
                    print(f"üìà Processed {count} images...")
                    
            except Exception as e:
                print(f"‚ùå Error processing {image_path}: {e}")
                continue
                
    print(f"\n‚úÖ LOCAL Processing complete: {count} items processed")
    print(f"üíæ File saved locally at: {output_filename}")

print("‚úÖ Local Storage Method functions loaded!")
print("üíæ Purpose: Process LOCAL files ‚Üí Save LOCAL ONLY")
print("ÔøΩ Run the Local Execution cell to start processing")

üóÇÔ∏è LOCAL STORAGE METHOD
üìÇ Loading equipment data from local path: dataset/equipment_train
üíæ Export: LOCAL storage ONLY
‚úÖ Local Storage Method functions loaded!
üíæ Purpose: Process LOCAL files ‚Üí Save LOCAL ONLY
ÔøΩ Run the Local Execution cell to start processing


In [10]:
# ===== LOCAL STORAGE EXECUTION ===== 
# üóÇÔ∏è Execute processing: Local files ‚Üí Save locally ONLY

print("üóÇÔ∏è EXECUTING LOCAL STORAGE METHOD")
print("="*50)

# üíæ Process LOCAL files ‚Üí Save LOCAL
generate_siglip_embeddings_local(
    equipment_base=equipment_base_local,
    output_filename=LOCAL_OUTPUT_FILENAME
)

üóÇÔ∏è EXECUTING LOCAL STORAGE METHOD
Found 21 directories in dataset/equipment_train:
  - dataset/equipment_train/AI1
  - dataset/equipment_train/AI10
  - dataset/equipment_train/AI11
  - dataset/equipment_train/AI12
  - dataset/equipment_train/AI13
  - dataset/equipment_train/AI14
  - dataset/equipment_train/AI15
  - dataset/equipment_train/AI16
  - dataset/equipment_train/AI17
  - dataset/equipment_train/AI18
  - dataset/equipment_train/AI19
  - dataset/equipment_train/AI2
  - dataset/equipment_train/AI21
  - dataset/equipment_train/AI22
  - dataset/equipment_train/AI3
  - dataset/equipment_train/AI4
  - dataset/equipment_train/AI5
  - dataset/equipment_train/AI6
  - dataset/equipment_train/AI7
  - dataset/equipment_train/AI8
  - dataset/equipment_train/AI9
Total images found: 2107
Export destination: LOCAL (jsonl/local_image_vectors.json)
üìà Processed 100 images...
üìà Processed 200 images...
üìà Processed 300 images...
üìà Processed 400 images...
üìà Processed 500 images...

In [None]:
# ===== CLOUD STORAGE METHOD (GCS) ===== 
# ‚òÅÔ∏è Process images from Google Cloud Storage ‚Üí Export to CLOUD ONLY

# GCS Configuration
GCS_BUCKET_NAME = "axmt_equipment_profile"  # Your GCS bucket name
GCS_EQUIPMENT_BASE = "equipment_test"  # Path in GCS bucket where equipment folders are stored
# GCS_EQUIPMENT_BASE = "equipment_train"  # Alternative path for training data

print(f"‚òÅÔ∏è CLOUD STORAGE METHOD (GCS)")
print(f"ü™£ Loading equipment data from GCS bucket: {GCS_BUCKET_NAME}/{GCS_EQUIPMENT_BASE}")
print(f"‚òÅÔ∏è Export: CLOUD STORAGE ONLY")

# Initialize GCS client
try:
    storage_client = storage.Client(project=PROJECT_ID)
    bucket = storage_client.bucket(GCS_BUCKET_NAME)
    print("‚úÖ GCS client initialized successfully.")
except Exception as e:
    print(f"‚ùå Error initializing GCS client: {e}")
    exit()

def list_gcs_directories(bucket, prefix):
    """List directories (equipment types) in GCS bucket"""
    blobs = bucket.list_blobs(prefix=prefix, delimiter='/')
    directories = []
    for page in blobs.pages:
        directories.extend([prefix.rstrip('/') for prefix in page.prefixes])
    return sorted(directories)

def list_gcs_images(bucket, directory_path):
    """List image files in a GCS directory"""
    blobs = bucket.list_blobs(prefix=directory_path)
    image_paths = []
    for blob in blobs:
        if blob.name.lower().endswith(('.jpg', '.jpeg', '.png')) and not blob.name.endswith('/'):
            image_paths.append(blob.name)
    return image_paths

def download_image_from_gcs(bucket, blob_name):
    """Download image from GCS and return PIL Image"""
    try:
        blob = bucket.blob(blob_name)
        image_data = blob.download_as_bytes()
        image = Image.open(io.BytesIO(image_data)).convert("RGB")
        return image
    except Exception as e:
        print(f"‚ùå Error downloading image {blob_name}: {e}")
        return None

def get_gcs_data_paths(bucket, gcs_equipment_base):
    """Get list of GCS equipment directories and their image paths"""
    gcs_equipment_paths = list_gcs_directories(bucket, gcs_equipment_base + '/')
    all_image_paths = []
    for equipment_dir in gcs_equipment_paths:
        image_paths = list_gcs_images(bucket, equipment_dir)
        all_image_paths.extend(image_paths)
    return gcs_equipment_paths, all_image_paths

def generate_siglip_embeddings_gcs(
    bucket,
    gcs_equipment_base: str,
    gcs_output_path: str
):
    """
    Create SigLIP Embeddings from GCS images ‚Üí Export to CLOUD STORAGE ONLY
    :param bucket: GCS bucket object
    :param gcs_equipment_base: GCS path containing equipment folders
    :param gcs_output_path: GCS path where results will be stored
    """
    # Get GCS data paths
    gcs_equipment_paths, all_image_paths = get_gcs_data_paths(bucket, gcs_equipment_base)
    
    print(f"Found {len(gcs_equipment_paths)} directories in GCS {bucket.name}/{gcs_equipment_base}:")
    for path in gcs_equipment_paths:
        print(f"  - {path}")
    print(f"Total images found: {len(all_image_paths)}")
    print(f"‚òÅÔ∏è Export destination: CLOUD STORAGE ({gcs_output_path})")

    if not all_image_paths:
        print("No image files found in GCS. Exiting.")
        return

    count = 0
    temp_filename = "temp_gcs_processing.jsonl"
    
    # Create temporary embeddings file locally (will be uploaded and deleted)
    with open(temp_filename, "w") as f_out:
        for gcs_image_path in all_image_paths:
            # Get Class Label from GCS path
            path_parts = gcs_image_path.split('/')
            label_class = path_parts[-2] if len(path_parts) >= 2 else "unknown"
            image_id = f"{label_class}_{uuid.uuid4()}"
            
            try:
                # 1. Download and prepare image from GCS
                image = download_image_from_gcs(bucket, gcs_image_path)
                if image is None:
                    continue
                    
                inputs = processor(images=image, return_tensors="pt").to(DEVICE)
                
                # 2. Create Embedding
                with torch.no_grad():
                    outputs = model.get_image_features(**inputs)
                    image_embedding = outputs / outputs.norm(p=2, dim=-1, keepdim=True)
                
                embedding_vector = image_embedding.squeeze(0).tolist()
                
                # 3. Create JSONL Object
                jsonl_record = {
                    "id": image_id,
                    "embedding": embedding_vector,
                    "original_path": gcs_image_path,
                    "label_class": label_class
                }
                
                # 4. Save to temporary local file
                f_out.write(json.dumps(jsonl_record) + "\n")
                count += 1
                
                if count % 100 == 0:
                    print(f"üìà Processed {count} images...")
                    
            except Exception as e:
                print(f"‚ùå Error processing {gcs_image_path}: {e}")
                continue
                
    print(f"\n‚úÖ GCS Processing complete: {count} items processed")

    # Upload to cloud storage and delete temporary file
    print("üì§ Uploading to cloud storage...")
    if upload_to_cloud(temp_filename, gcs_output_path):
        os.remove(temp_filename)
        print(f"üóëÔ∏è Temporary file deleted after successful upload")
        print(f"‚òÅÔ∏è Results saved to cloud: {gcs_output_path}")
    else:
        print(f"‚ùå Upload failed. Temporary file remains at: {temp_filename}")

def upload_to_cloud(local_path: str, gcs_path: str):
    """Upload file to Google Cloud Storage and return success status"""
    try:
        bucket_name = gcs_path.replace("gs://", "").split("/")[0]
        blob_path = "/".join(gcs_path.replace("gs://", "").split("/")[1:])
        
        upload_bucket = storage_client.bucket(bucket_name)
        blob = upload_bucket.blob(blob_path)
        
        blob.upload_from_filename(local_path)
        print(f"‚òÅÔ∏è File uploaded to GCS successfully")
        return True
    except Exception as e:
        print(f"‚ùå ERROR: GCS upload failed: {e}")
        return False

print("‚úÖ Cloud Storage Method functions loaded!")
print("‚òÅÔ∏è Purpose: Process GCS files ‚Üí Save to CLOUD ONLY")
print("üìù Run the Cloud Execution cell to start processing")

In [None]:
# ===== CLOUD STORAGE EXECUTION ===== 
# ‚òÅÔ∏è Execute processing: GCS files ‚Üí Save to cloud ONLY

print("‚òÅÔ∏è EXECUTING CLOUD STORAGE METHOD")
print("="*50)

# ‚òÅÔ∏è Process GCS files ‚Üí Save to CLOUD
generate_siglip_embeddings_gcs(
    bucket=bucket,
    gcs_equipment_base=GCS_EQUIPMENT_BASE,
    gcs_output_path=GCS_OUTPUT_PATH
)