#Generate the json database

In [None]:
"""
To rename image files in subdirectories to a standardized format:
<parent_folder_name>-<index>.<original_extension>
"""

import os

# Root folder path
root_dir = '/content/drive/MyDrive/images/'

# Loop through all subdirectories and files
for dirpath, dirnames, filenames in os.walk(root_dir):
    image_files = [f for f in filenames if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp'))]
    parent_folder = os.path.basename(dirpath)

    for idx, filename in enumerate(image_files):
        file_ext = os.path.splitext(filename)[1]
        new_name = f"{parent_folder}-{idx}{file_ext}"
        old_path = os.path.join(dirpath, filename)
        new_path = os.path.join(dirpath, new_name)

        # Rename the file
        os.rename(old_path, new_path)
        print(f"Renamed: {old_path} → {new_path}")

In [None]:
"""
To create a JSON database of products from images in a directory structure:
{
    "database_info": { ... },
    "category_mapping": { ... },
    "products": [ ... ]
}
"""

import os
import json
from datetime import datetime

# Set the root image directory
root_dir = '/content/drive/MyDrive/vis-suggestion/images/'

products_database = []
category_mapping = {}
product_id = 1

# Allowed image file extensions
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp')

# Traverse directory
for main_category in os.listdir(root_dir):
    main_cat_path = os.path.join(root_dir, main_category)
    if not os.path.isdir(main_cat_path):
        continue

    for category in os.listdir(main_cat_path):
        category_path = os.path.join(main_cat_path, category)
        if not os.path.isdir(category_path):
            continue

        # Update category mapping
        if main_category not in category_mapping:
            category_mapping[main_category] = []
        if category not in category_mapping[main_category]:
            category_mapping[main_category].append(category)

        for i, filename in enumerate(os.listdir(category_path)):
            if not filename.lower().endswith(image_extensions):
                continue

            product_name = os.path.splitext(filename)[0].replace('_', ' ')
            ext = os.path.splitext(filename)[1]
            image_path = os.path.join(root_dir, main_category, category, filename)

            product = {
                "id": product_id,
                "name": product_name,
                "category": category,
                "main_category": main_category,
                "image_path": image_path,
                "description": f"High-quality {category.replace('_', ' ')} product: {product_name}",
                "price": round(50 + (product_id * 7.5) % 500, 2),
                "brand": f"Brand_{category[:3].upper()}",
                "imagga_tags": None,
                "confidence_scores": None,
                "created_at": datetime.now().isoformat()
            }

            products_database.append(product)
            product_id += 1

# Build the final JSON structure
output_data = {
    "database_info": {
        "total_products": len(products_database),
        "categories": len(category_mapping),
        "created_at": datetime.now().isoformat(),
        "description": "Visual Product Matcher Database Schema"
    },
    "category_mapping": category_mapping,
    "products": products_database
}

# Save to file
with open('products_database.json', 'w', encoding='utf-8') as f:
    json.dump(output_data, f, indent=2, ensure_ascii=False)

print(f"Total products: {len(products_database)}")
print(f"File: products_database.json")

In [None]:
"""
To process images using Imagga API and update the JSON database with tags and confidence scores.
"""

import json
import requests
import time
import os
from datetime import datetime
from google.colab import userdata

class ImageProcessor:
    def __init__(self, api_key, api_secret):
        """Initialize with Imagga API credentials"""
        self.api_key = api_key
        self.api_secret = api_secret
        self.base_url = "https://api.imagga.com/v2/tags"

    def process_single_image(self, image_path):
        """Process a single image and return tags"""
        try:
            with open(image_path, 'rb') as image_file:
                response = requests.post(
                    self.base_url,
                    auth=(self.api_key, self.api_secret),
                    files={'image': image_file}
                )

            if response.status_code == 200:
                data = response.json()
                tags = data.get('result', {}).get('tags', [])
                return {
                    'status': 'success',
                    'tags': tags,
                    'processed_at': datetime.now().isoformat()
                }
            else:
                return {
                    'status': 'error',
                    'error': f"API Error: {response.status_code}",
                    'processed_at': datetime.now().isoformat()
                }

        except Exception as e:
            return {
                'status': 'error',
                'error': str(e),
                'processed_at': datetime.now().isoformat()
            }

    def process_database(self, json_file_path):
        """Process all images in the database JSON"""

        # Load the database
        if os.path.exists(json_file_path) and os.path.getsize(json_file_path) > 0:
            with open(json_file_path, 'r') as f:
                database = json.load(f)
        else:
            # Initialize with an empty structure if the file is empty or doesn't exist
            database = {"products": [], "database_info": {}}


        products = database['products']
        total_products = len(products)
        processed_count = 0

        for i, product in enumerate(products):
            image_path = product['image_path']

            # Check if image exists
            if not os.path.exists(image_path):
                print(f"Image not found: {image_path}")
                product['imagga_tags'] = {'status': 'error', 'error': 'Image file not found'}
                continue

            # Process the image
            print(f"Processing {i+1}/{total_products}: {product['name']}")
            result = self.process_single_image(image_path)

            if result['status'] == 'success':
                # Extract tags and confidence scores
                tags = result['tags'][:5]  # Keep top 5 tags
                product['imagga_tags'] = [
                    {
                        'tag': tag['tag']['en'],
                        'confidence': tag['confidence']
                    } for tag in tags
                ]
                product['primary_tags'] = ','.join([tag['tag']['en'] for tag in tags])
                product['confidence_scores'] = ','.join([str(tag['confidence']) for tag in tags])
                processed_count += 1
                print(f"Success: {product['name']} - Top tag: {tags[0]['tag']['en']} ({tags[0]['confidence']:.1f}%)")
            else:
                product['imagga_tags'] = result
                print(f"Failed: {product['name']} - {result.get('error', 'Unknown error')}")

            # Rate limiting: 1 request per second for free tier
            if i < total_products - 1:
                time.sleep(1)

        # Update the database with processed results
        database['database_info']['last_processed'] = datetime.now().isoformat()
        database['database_info']['processed_count'] = processed_count

        # Save updated database
        output_file = json_file_path.replace('.json', '_processed.json')
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(database, f, indent=2, ensure_ascii=False)

        print(f"Successfully processed: {processed_count}/{total_products} images")
        print(f"Updated database saved as: {output_file}")

        return output_file

def main():
    """Main function to run the image processing"""

    API_KEY = userdata.get('imagga_api')
    API_SECRET = userdata.get('imagga_secret')

    processor = ImageProcessor(API_KEY, API_SECRET)

    # Process the database
    json_file = "products_database.json"
    if os.path.exists(json_file):
        processor.process_database(json_file)
    else:
        print(f"Database file not found: {json_file}")

if __name__ == "__main__":
    main()

In [None]:
"""
To upload images to Cloudinary and update the JSON database with new image URLs and rounded confidence scores.
"""

import os
import json
import cloudinary
import cloudinary.uploader
from dotenv import load_dotenv
from google.colab import userdata

load_dotenv()
cloudinary.config(
    cloud_name=userdata.get('cloudinary_user'),
    api_key=userdata.get('cloudinary_api'),
    api_secret=userdata.get('cloudinary_api_sec'),
    secure=True
)

INPUT_JSON = "products_database_processed.json"
OUTPUT_JSON = "products_with_cloudinary.json"

def round_confidences(tags):
    for t in tags:
        t["confidence"] = round(t["confidence"], 2)
    return tags

def main():
    #Load existing JSON
    with open(INPUT_JSON, "r", encoding="utf-8") as f:
        data = json.load(f)

    products = data["products"]
    total = len(products)

    for idx, prod in enumerate(products, start=1):
        local_path = prod["image_path"]
        print(f"[{idx}/{total}] Uploading {local_path}...")

        #Upload to Cloudinary, preserving folder structure
        folder = f"products/{prod['main_category']}/{prod['category']}"
        try:
            upload = cloudinary.uploader.upload(
                local_path,
                folder=folder,
                public_id=os.path.splitext(os.path.basename(local_path))[0],
                overwrite=True,
                resource_type="image"
            )
        except Exception as e:
            print(f"Upload failed: {e}")
            prod["cloudinary_url"] = None
            continue

        # Set new URL and round tags Confidence scores
        prod["image_url"] = upload["secure_url"]
        prod["imagga_tags"] = round_confidences(prod["imagga_tags"])
        prod["confidence_scores"] = ",".join(
            f"{t['confidence']:.2f}" for t in prod["imagga_tags"]
        )

        print(f"Uploaded to {prod['image_url']}")

    with open(OUTPUT_JSON, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=2, ensure_ascii=False)

    print(f"Done! Updated JSON saved as {OUTPUT_JSON}")

if __name__ == "__main__":
    main()