In [0]:
# Dataset Item ‚Üí Numeric Label ‚Üí Food Name ‚Üí Our Processing
#      ‚Üì              ‚Üì             ‚Üì            ‚Üì
#    Image +      example['label']  labels[45]   if food_class in 
#    Label 45  ‚Üí      = 45      ‚Üí   = "pizza" ‚Üí  our target list

# Source to Bronze Volume Ingestion

This notebook ingests images directly from Hugging Face Food-101 dataset to Bronze Volume, skipping the landing layer for a streamlined approach.


## 1. Libraries & Imports

In [0]:
!pip install datasets 

In [0]:
# Import required libraries
import json
import time
import os
from PIL import Image
import io
from datasets import load_dataset
import datasets


## 2. Core Functions

### Clean up function

In [0]:
# # Clean up all files and directories in the landing volume
# def cleanup_landing_volume():
#     landing_dir = "/Volumes/cuisine_vision_catalog/landing/landing_volume/food-101-incremental"
#     status_dir = "/Volumes/cuisine_vision_catalog/landing/landing_volume/ingestion_status"
#     cache_dir = "/Volumes/cuisine_vision_catalog/landing/landing_volume/cache"
    
#     print(f"üßπ Cleaning up landing volume: {landing_dir}")
#     dbutils.fs.rm(landing_dir, recurse=True)
#     print(f"üßπ Cleaning up status directory: {status_dir}")
#     dbutils.fs.rm(status_dir, recurse=True)
#     print(f"üßπ Cleaning up cache directory: {cache_dir}")
#     dbutils.fs.rm(cache_dir, recurse=True)
#     print("‚úÖ Landing volume cleanup complete.")

# cleanup_landing_volume()

### config related function

In [0]:
def load_config_file(filename, config_volume_path):
    """Load configuration file from config volume"""
    try:
        config_path = f"{config_volume_path}/{filename}"
        print(f"üì• Loading config: {filename}")
        config_content = dbutils.fs.head(config_path, max_bytes=1000000)
        return json.loads(config_content)
    except Exception as e:
        print(f"‚ùå Error loading {filename}: {str(e)}")
        return None

### ingestion status functions

In [0]:
    
def load_ingestion_status(status_file_path):
    """Load current ingestion status from JSON file"""
    try:
        # Ensure directory exists
        status_dir = "/".join(status_file_path.split("/")[:-1])
        dbutils.fs.mkdirs(status_dir)
        
        # Try to read existing status file
        status_content = dbutils.fs.head(status_file_path, max_bytes=1000000)
        status = json.loads(status_content)
        print(f"üì• Loaded status for {len(status)} food types")
        return status
        
    except Exception as e:
        print(f"üìÑ No existing status file - starting fresh")
        return {}


def save_ingestion_status(status, status_file_path):
    """Save current ingestion status to JSON file"""
    try:
        # Convert to JSON string
        status_json = json.dumps(status, indent=2)
        
        # Save to temp file first
        temp_file = f"/tmp/ingestion_status_{int(time.time())}.json"
        with open(temp_file, 'w') as f:
            f.write(status_json)
        
        # Copy to Unity Catalog Volume
        dbutils.fs.cp(f"file://{temp_file}", status_file_path)
        
        # Clean up temp file
        os.remove(temp_file)
        
        print(f"üíæ Status saved: {len(status)} food types")
        return True
        
    except Exception as e:
        print(f"‚ùå Error saving status: {str(e)}")
        return False

def check_status(status_file_path):
    """Display current ingestion status"""
    status = load_ingestion_status(status_file_path)
    
    if not status:
        print("üìä No data ingested yet")
        return status
    
    print(f"üìä Current Status ({len(status)} food types):")
    print("=" * 50)
    
    for food_type in sorted(status.keys())[:20]:  # Show first 20
        food_data = status[food_type]
        if isinstance(food_data, dict):
            last_index = food_data.get('last_index', 'N/A')
            count = food_data.get('count', 0)
            print(f"   {food_type:<20}: {count} images | last_index: {last_index}")
        else:
            # Handle old format (just filename)
            print(f"   {food_type:<20}: {food_data} (old format)")
    
    if len(status) > 20:
        print(f"   ... and {len(status) - 20} more food types")
    
    return status

### ingestion functions

In [0]:
def get_last_processed_index(food_type, status):
    """Get last processed dataset index for a food type"""
    if food_type not in status:
        return -1  # Start from beginning
    
    food_data = status[food_type]
    if isinstance(food_data, dict):
        return food_data.get('last_index', -1)
    else:
        # Handle old format - convert to new format
        return -1

def ingest_food_images_to_bronze(food_types_list, bronze_volume_path, status_file_path, images_per_food_type=10, image_quality=95):
    """
    Ingest images for specific food types directly to bronze volume, continuing from last processed dataset index
    
    Args:
        food_types_list: List of food types to ingest (e.g., ["pizza", "sushi"])
        bronze_volume_path: Path to bronze volume directory (direct save)
        status_file_path: Path to status JSON file
        images_per_food_type: Number of images to add per food type (default: 10)
        image_quality: JPEG quality 1-100 (default: 95)
    
    Returns:
        Dictionary with results
    """
    print(f"üöÄ Starting direct ingestion to bronze volume for {len(food_types_list)} food types")
    print(f"üéØ Target: {images_per_food_type} images per food type")
    print(f"üé® Image quality: {image_quality}%")
    print(f"üì¶ Bronze volume: {bronze_volume_path}")
    
    # Load current status
    status = load_ingestion_status(status_file_path)
    
    # Load Food-101 dataset
    print("üì° Loading Food-101 dataset...")
    dataset = load_dataset("food101", split="train", streaming=True)
    labels = dataset.features['label'].names
    
    # Validate food types
    valid_food_types = [ft for ft in food_types_list if ft in labels]
    if len(valid_food_types) != len(food_types_list):
        invalid = [ft for ft in food_types_list if ft not in labels]
        print(f"‚ö†Ô∏è  Invalid food types (skipped): {invalid}")
    
    print(f"‚úÖ Processing {len(valid_food_types)} valid food types")
    
    # Show starting plan
    print("\nüìã Ingestion Plan:")
    for food_type in valid_food_types:
        last_index = get_last_processed_index(food_type, status)
        current_count = status.get(food_type, {}).get('count', 0) if isinstance(status.get(food_type, {}), dict) else 0
        print(f"   {food_type}: last_index={last_index} | current_count={current_count} | target=+{images_per_food_type}")
    
    # Track progress
    progress = {food_type: 0 for food_type in valid_food_types}
    total_saved = 0
    
    # Process dataset
    print(f"\nüîÑ Processing images...")
    for idx, example in enumerate(dataset):
        # Get food class name
        label_idx = example['label']
        food_class = labels[label_idx]
        
        # Skip if not in our target list
        if food_class not in valid_food_types:
            continue
        
        # Check if we've collected enough for this food type
        if progress[food_class] >= images_per_food_type:
            continue
        
        # Check if we've already processed this index for this food type
        last_processed_index = get_last_processed_index(food_class, status)
        if idx <= last_processed_index:
            continue
        
        # Create filename using dataset index
        filename = f"{food_class}_idx_{idx:06d}.jpg"
        
        # Show processing details
        print(f"üì∏ Processing: {food_class} | Dataset_Index: {idx} | Saving as: {filename} | Size: {example['image'].size}")
        
        try:
            # Create directory for this food type in bronze volume
            food_dir = f"{bronze_volume_path}/{food_class}"
            dbutils.fs.mkdirs(food_dir)
            
            # Convert PIL image to bytes using provided image quality
            img_bytes = io.BytesIO()
            example['image'].save(img_bytes, format='JPEG', quality=image_quality)
            img_data = img_bytes.getvalue()
            
            # Save to temp file
            temp_path = f"/tmp/{filename}"
            with open(temp_path, 'wb') as f:
                f.write(img_data)
            
            # Copy directly to bronze volume (not landing)
            final_path = f"{food_dir}/{filename}"
            dbutils.fs.cp(f"file://{temp_path}", final_path)
            
            # Clean up temp file
            os.remove(temp_path)
            
            # Update progress and status
            progress[food_class] += 1
            total_saved += 1
            
            # Update status with new format
            if food_class not in status:
                status[food_class] = {'count': 0, 'last_index': -1}
            elif not isinstance(status[food_class], dict):
                # Convert old format to new format
                status[food_class] = {'count': 0, 'last_index': -1}
            
            status[food_class]['count'] += 1
            status[food_class]['last_index'] = idx
            status[food_class]['last_filename'] = filename
            
            # Progress reporting
            if total_saved % 10 == 0:
                completed_types = len([ft for ft in valid_food_types if progress[ft] >= images_per_food_type])
                print(f"üìä {total_saved} images saved, {completed_types}/{len(valid_food_types)} food types complete")
            
        except Exception as e:
            print(f"‚ùå Error saving {food_class}/{filename}: {str(e)}")
            continue
        
        # Check if all food types are complete
        if all(progress[ft] >= images_per_food_type for ft in valid_food_types):
            print("‚úÖ All target images collected!")
            break
    
    # Save final status
    save_success = save_ingestion_status(status, status_file_path)
    
    # Summary
    print(f"\nüìä Direct Bronze Volume Ingestion Complete!")
    print(f"‚úÖ Total NEW images saved: {total_saved}")
    print(f"üìù Status saved: {'‚úÖ' if save_success else '‚ùå'}")
    
    print(f"\nüìà Results by food type:")
    for food_type in valid_food_types:
        saved_count = progress[food_type]
        food_data = status.get(food_type, {})
        if isinstance(food_data, dict):
            total_count = food_data.get('count', 0)
            last_index = food_data.get('last_index', 'N/A')
            last_file = food_data.get('last_filename', 'N/A')
            print(f"   {food_type}: +{saved_count} new | {total_count} total | last_index: {last_index}")
        else:
            print(f"   {food_type}: +{saved_count} new images")
    
    return {
        'total_saved': total_saved,
        'progress': progress,
        'status_saved': save_success
    }

## 3. Get Food types from `Food-101`

### Notebook params

In [0]:
# Create notebook parameters for runtime settings
dbutils.widgets.text("images_per_run", "5", "Images per food type per run")
dbutils.widgets.text("image_quality", "95", "JPEG quality (1-100)")
dbutils.widgets.text("CONFIG_VOLUME_PATH", "/Volumes/cuisine_vision_catalog/config/config_volume", "CONFIG_VOLUME_PATH")
dbutils.widgets.text("STATUS_FILE_PATH", "/Volumes/cuisine_vision_catalog/bronze/bronze_volume/ingestion_status/ingestion_status.json", "STATUS_FILE_PATH")
dbutils.widgets.text("BRONZE_VOLUME_PATH", "/Volumes/cuisine_vision_catalog/bronze/bronze_volume", "BRONZE_VOLUME_PATH")

### Fetch the configs

In [0]:
# ============================================================================
# üìã CONFIGURATION AND EXECUTION SETUP
# ============================================================================
print("=" * 50)

# Get parameter values
images_per_run = int(dbutils.widgets.get("images_per_run"))
image_quality = int(dbutils.widgets.get("image_quality"))
CONFIG_VOLUME_PATH = dbutils.widgets.get("CONFIG_VOLUME_PATH")
STATUS_FILE_PATH = dbutils.widgets.get("STATUS_FILE_PATH")
BRONZE_VOLUME_PATH = dbutils.widgets.get("BRONZE_VOLUME_PATH")

print(f"üìÅ Paths configured:")
print(f"   ‚öôÔ∏è  Config volume: {CONFIG_VOLUME_PATH}")
print(f"   üìÑ Status file: {STATUS_FILE_PATH}")
print(f"   ü•â Bronze volume: {BRONZE_VOLUME_PATH}")

print(f"\nüìä Runtime Parameters:")
print(f"   üì∏ Images per run: {images_per_run}")
print(f"   üé® Image quality: {image_quality}%")

# Load configuration files
print(f"\nüìã Loading Configuration Files...")

# Load food types directly from config
food_types = load_config_file("food_types.json", CONFIG_VOLUME_PATH)
if not food_types:
    raise ValueError(f"Food types configuration not available at `{CONFIG_VOLUME_PATH}/food_types.json`")
else:
    print(f"‚úÖ Food types loaded: {len(food_types)} types")

print(f"\n‚úÖ Configuration loaded - ready for direct bronze volume ingestion!")
print("=" * 50)

In [0]:
import os
os.environ["HF_DATASETS_CACHE"] = f"{BRONZE_VOLUME_PATH}/cache"
datasets.utils.logging.disable_progress_bar()

### Ingestion starts here

In [0]:
# ============================================================================
# üöÄ EXECUTE DIRECT BRONZE VOLUME INGESTION
# ============================================================================

print("üöÄ Executing Direct Bronze Volume Ingestion")
print("=" * 50)

# Run the ingestion with all parameters passed explicitly
result = ingest_food_images_to_bronze(
    food_types_list=food_types,           # Using the strategic food selection from config
    bronze_volume_path=BRONZE_VOLUME_PATH,        # Direct to bronze volume
    status_file_path=STATUS_FILE_PATH,    # Pass status file path as argument  
    images_per_food_type=images_per_run,  # Pass parameter as argument
    image_quality=image_quality           # Pass parameter as argument
)

# Display results
print(f"\nüìä Bronze Volume Ingestion Results:")
print(f"‚úÖ Total NEW images saved: {result['total_saved']}")
print(f"üìù Status saved successfully: {result['status_saved']}")

# Show current status summary
print(f"\nüìã Current Status Check:")
status_summary = check_status(STATUS_FILE_PATH)

In [0]:
# # Quick Test: Start with 3 representative foods
# test_foods = ["pizza", "sushi", "hamburger"]  # Italian, Japanese, American

# print("üß™ Quick Test Run (3 foods)")
# result = ingest_food_images(
#     food_types_list=test_foods,
#     images_per_food_type=5  # Small test batch
# )