In [None]:
%pip install --quiet datasets langchain-couchbase langchain-openai crewai python-dotenv

In [None]:
from glob import glob
from math import ceil
import os
from pathlib import Path
from random import choices
import re

import cv2
import matplotlib.pyplot as plt
from PIL import Image


from sentence_transformers import SentenceTransformer # The transformer used to execute the clip model.
from tqdm.notebook import tqdm                        # Nice progress bars

In [None]:
import getpass
import json
import logging
import os
import time
from datetime import timedelta

from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.diagnostics import PingState, ServiceType
from couchbase.exceptions import (InternalServerFailureException,
                                  QueryIndexAlreadyExistsException,
                                  ServiceUnavailableException)
from couchbase.management.buckets import CreateBucketSettings
from couchbase.management.search import SearchIndex
from couchbase.options import ClusterOptions
from datasets import load_dataset
from dotenv import load_dotenv

In [None]:
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

# Suppress httpx logging
logging.getLogger('httpx').setLevel(logging.CRITICAL)

In [None]:
# Load environment variables
load_dotenv()

CB_HOST = os.getenv('CB_HOST') or input("Enter Couchbase host (default: couchbase://localhost): ") or 'couchbase://localhost'
CB_USERNAME = os.getenv('CB_USERNAME') or input("Enter Couchbase username (default: Administrator): ") or 'Administrator'
CB_PASSWORD = os.getenv('CB_PASSWORD') or getpass.getpass("Enter Couchbase password (default: password): ") or 'password'
CB_BUCKET_NAME = os.getenv('CB_BUCKET_NAME') or input("Enter bucket name (default: vector-search-testing): ") or 'vector-search-testing'
INDEX_NAME = os.getenv('INDEX_NAME') or input("Enter index name (default: vector_search_crew): ") or 'vector_search_crew'
SCOPE_NAME = os.getenv('SCOPE_NAME') or input("Enter scope name (default: shared): ") or 'shared'
COLLECTION_NAME = os.getenv('COLLECTION_NAME') or input("Enter collection name (default: crew): ") or 'crew'

print("Configuration loaded successfully")

In [None]:
# Connect to Couchbase
try:
    auth = PasswordAuthenticator(CB_USERNAME, CB_PASSWORD)
    options = ClusterOptions(auth)
    cluster = Cluster(CB_HOST, options)
    cluster.wait_until_ready(timedelta(seconds=5))
    print("Successfully connected to Couchbase")
except Exception as e:
    print(f"Failed to connect to Couchbase: {str(e)}")
    raise

In [None]:
# Load index definition
try:
    with open('crew_index.json', 'r') as file:
        index_definition = json.load(file)
except FileNotFoundError as e:
    print(f"Error: crew_index.json file not found: {str(e)}")
    raise
except json.JSONDecodeError as e:
    print(f"Error: Invalid JSON in crew_index.json: {str(e)}")
    raise
except Exception as e:
    print(f"Error loading index definition: {str(e)}")
    raise

In [None]:
try:
    scope_index_manager = cluster.bucket(CB_BUCKET_NAME).scope(SCOPE_NAME).search_indexes()

    # Check if index already exists
    existing_indexes = scope_index_manager.get_all_indexes()
    index_name = index_definition["name"]

    if index_name in [index.name for index in existing_indexes]:
        logging.info(f"Index '{index_name}' found")
    else:
        logging.info(f"Creating new index '{index_name}'...")

    # Create SearchIndex object from JSON definition
    search_index = SearchIndex.from_json(index_definition)

    # Upsert the index (create if not exists, update if exists)
    scope_index_manager.upsert_index(search_index)
    logging.info(f"Index '{index_name}' successfully created/updated.")

except QueryIndexAlreadyExistsException:
    logging.info(f"Index '{index_name}' already exists. Skipping creation/update.")
except ServiceUnavailableException:
    raise RuntimeError("Search service is not available. Please ensure the Search service is enabled in your Couchbase cluster.")
except InternalServerFailureException as e:
    logging.error(f"Internal server error: {str(e)}")
    raise

In [None]:
# Load CLIP model.
# This may print out warnings, which can be ignored.
model = SentenceTransformer("clip-ViT-L-14")

In [None]:
import os
import json
import requests
from urllib.parse import urlparse

# Ensure the output directory exists
os.makedirs('images', exist_ok=True)

# Load the JSON data
with open('license_plates.json', 'r', encoding='utf-8') as file:
    data = json.load(file)

# Extract all non-null image URLs
image_urls = [
    entry['p_license_plate_time_in_img_location']
    for entry in data
    if entry.get('p_license_plate_time_in_img_location')
]

print(f"Found {len(image_urls)} images to download.")

for url in image_urls:
    try:
        # Get the filename from the URL
        filename = os.path.basename(urlparse(url).path)
        dest_path = os.path.join('images', filename)

        # Download and save the image
        resp = requests.get(url, timeout=30)
        resp.raise_for_status()
        with open(dest_path, 'wb') as f:
            f.write(resp.content)
        print(f"Downloaded: {filename}")
    except Exception as e:
        print(f"Failed to download {url}: {e}")


In [None]:
import os
import getpass
from random import choices
from glob import glob
from PIL import Image
from tqdm import tqdm
import re
from dotenv import load_dotenv
from couchbase.cluster import Cluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterOptions
from couchbase.exceptions import DocumentExistsException


# Connect to Couchbase
auth = PasswordAuthenticator(CB_USERNAME, CB_PASSWORD)
cluster = Cluster(CB_HOST, ClusterOptions(auth))
bucket = cluster.bucket(CB_BUCKET_NAME)
cb_collection = bucket.scope(SCOPE_NAME).collection(COLLECTION_NAME)

def load_images(image_count=1000):
    """
    Load `image_count` images into Couchbase, creating an embedding for each.
    """
    # Fix the file extension to .jpg instead of .JPEG
    image_paths = glob("images/**/*.jpg", recursive=True)

    if not image_paths:
        print("No images found. Please check that the images folder exists and contains .jpg files.")
        return

    # Use min() to avoid requesting more images than available
    selected_paths = choices(image_paths, k=min(len(image_paths), image_count))

    for path in tqdm(selected_paths):
        emb = model.encode(Image.open(path))
        doc_id = re.sub("images/", "", path)

        try:
            # Fixed: changed document parameter to value
            cb_collection.insert(
                key=doc_id,
                value={
                    "embedding": emb.tolist(),
                    "path": path  # Optional: Store original path if needed
                }
            )
        except DocumentExistsException:
            pass

# Now call the function with your desired number of images
NUMBER_OF_IMAGES_TO_LOAD = 200  # Set this to your desired number
load_images(NUMBER_OF_IMAGES_TO_LOAD)

In [None]:
from datetime import timedelta
from couchbase.search import SearchRequest, MatchNoneQuery
from couchbase.options import SearchOptions  # Updated import location
from couchbase.vector_search import VectorSearch, VectorQuery

def image_search(search_phrase, limit=9):
    """
    Use Couchbase Vector Search to search for a matching image.

    The `search_phrase` is first converted to a vector embedding using
    the `model` loaded earlier in the notebook. The vector is then used
    to search Couchbase for matching images.
    """
    # Convert search phrase to embedding
    emb = model.encode(search_phrase)

    # Create search request with vector search
    search_req = SearchRequest.create(MatchNoneQuery()).with_vector_search(
        VectorSearch.from_vector_query(
            VectorQuery("embedding", emb.tolist(), num_candidates=100)
        )
    )

    # Set search options
    search_options = SearchOptions(timeout=timedelta(seconds=5.0), limit=limit)

    # Execute search
    result = bucket.scope(SCOPE_NAME).search(INDEX_NAME, search_req, search_options)

    # Process results
    results = []
    for row in result.rows():
        try:
            doc_result = cb_collection.get(row.id, timeout=timedelta(seconds=2.0))
            # Using value instead of content
            doc_value = doc_result.value
            results.append({
                "id": row.id,
                "score": row.score,
                "path": doc_value.get("path")
            })
        except Exception as e:
            print(f"Error fetching document {row.id}: {str(e)}")

    return results

In [None]:
image_search("white colored car")