# Load wrist watches dataset to Astra

## Create description and embeddings from images, then upload to Astra

#### Get Claude Sonnet description method

In [249]:
#########################################################
# Method to Get Claude Sonnet description from an image #
#########################################################

def claude_description(image):
    # Variables for Bedrock API
    category = 'wrist watches'
    modelId = 'anthropic.claude-3-sonnet-20240229-v1:0'
    contentType = 'application/json'
    accept = 'application/json'
    
    prompt = """
    Identify the following product in the image provided.
    Product Category: {product_category}
    
    Return an enhanced description of the product based on the image for better search results.
    Do not include any specific details that can not be confirmed from the image such as the quality of materials, other color options, or exact measurements.
    """

    # Messages
    messages = [
      {
        "role": "user",
        "content": [
          {
            "type": "image",
            "source": {
              "type": "base64",
              "media_type": "image/jpeg",
              "data": image
            }
          },
          {
            "type": "text",
            "text": prompt.format(product_category=category)
          }
        ]
      }
    ]

    # Body
    claude_body = json.dumps({
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 1000,
        "messages": messages
    })

    # Run Bedrock API to invoke Claude 3 model
    claude_response = bedrock_runtime.invoke_model(
      modelId=modelId,
      contentType=contentType,
      accept=accept,
      body=claude_body
    )

    claude_response_body = json.loads(claude_response.get('body').read())
    return claude_response_body['content'][0]['text']


#### Get embedding with AWS Titan method

In [279]:
# Generate Embeddings using Amazon Titan

def generate_titan_embedding(input_text, input_image):
    # Variables for Bedrock API
    embedding_output_length = 1024
    embedding_model_id = "amazon.titan-embed-image-v1"
    contentType = 'application/json'
    accept = 'application/json'
    
    titan_body = json.dumps({
        "inputText": input_text,
        "inputImage": input_image,
        "embeddingConfig": {
            "outputEmbeddingLength": embedding_output_length
        }
    })

    titan_response = bedrock_runtime.invoke_model(
        modelId=embedding_model_id,
        contentType=contentType,
        accept=accept,
        body=titan_body
    )

    final_response = json.loads(titan_response.get('body').read())
    return final_response['embedding']

#### Final program

In [None]:
import boto3
import base64
import pandas as pd
from IPython.display import display
from io import BytesIO
from tqdm.auto import tqdm
from astrapy.db import AstraDB, AstraDBCollection
    
####################################
# Create AstraDB vector collection #
####################################
ASTRA_DB_ENDPOINT = ""
ASTRA_DB_REGION = ""
ASTRA_DB_TOKEN = ""
ASTRA_DB_KEYSPACE = ""

# Creation of AstraDB collection
#catalog_collection = astra_db.create_collection(collection_name = "watches_catalog", dimension=1024)
#print("Collection creation ended.")

# If using an existing collection
print("Getting AstraDB collection...")
catalog_collection = AstraDBCollection(
    collection_name="watches_catalog", 
    astra_db=astra_db
)

#######################
# Set AWS credentials #
#######################
# establish Bedrock client
bedrock_runtime = boto3.client(
    service_name="bedrock-runtime",
    aws_access_key_id="your_accesskey",
    aws_secret_access_key="your_secret",
    aws_session_token="Get the temporal token and put it here",
    endpoint_url="your service endpoint",
    region_name="us-east-1",
)

def insert_catalog(row, new_df, index, max_steps, errors, progress_bar):
    progress_bar.value = 0
    try:        
### Get the image from s3
        BUCKET_NAME = "name_of_your_bucket"
        s3_path = "catalog/"
        full_obj_path = "{img_path}{filename}".format(img_path = s3_path, filename = row['image_name'])
        s3 = boto3.client('s3')
    
        img_data = s3.get_object(Bucket=BUCKET_NAME, Key=full_obj_path)['Body'].read()
    
        # Update the progress bar / Retrieve image
        progress_bar.value += 1 # signal to increment the progress bar
        progress_bar.description = str(progress_bar.value) + "/" + str(max_steps) + " - s3" 
    
### Get description from Claude ###
        # b64 representation of the image for Claude
        b64_img_data = base64.b64encode(img_data).decode("utf-8")
    
        img_description = claude_description(b64_img_data)

        # Update the progress bar / Get description
        progress_bar.value += 1 # signal to increment the progress bar
        progress_bar.description = str(progress_bar.value) + "/" + str(max_steps) + " - Claude" 
    
### Get Embedding with Titan ###
        embedding = generate_titan_embedding(img_description, b64_img_data)

        # Update the progress bar / Get embedding
        progress_bar.value += 1 # signal to increment the progress bar
        progress_bar.description = str(progress_bar.value) + "/" + str(max_steps) + " - Titan" 
    
### Add row to dataframe ###
        new_df_row = [row['brand'], row['name'], row['price'], full_obj_path, img_description, embedding]
        #new_df._append(new_df_row)
        new_df.loc[len(new_df.index)] = new_df_row
        
        # Update the progress bar / Add row to dataframe
        progress_bar.value += 1 # signal to increment the progress bar
        progress_bar.description = str(progress_bar.value) + "/" + str(max_steps) + " - Dataframe" 
        
### Add new row to AstraDB ###
        catalog_collection.insert_one({
            "df_index": index,
            "brand": row['brand'],
            "product_name": row['name'],
            "price": row['price'],
            "category": category,
            "file_path": full_obj_path,
            "description": img_description,
            "$vector": embedding,
        })

        # Update the progress bar / Add row to AstraDB
        progress_bar.value += 1 # signal to increment the progress bar
        progress_bar.description = str(progress_bar.value) + "/" + str(max_steps) + " - Astra" 

    except Exception as error:
        errors.append(f"Error at IX {index} {error}")

#######################
# Let the magic begin #
#######################

# Load de dataset CSV 
df = pd.read_csv("dataset/metadata.csv", header = 0)
new_df = pd.DataFrame(columns=['brand','name', 'price', 'file_path', 'description', 'embedding'])

##########################################################
# Define limits and restrictions (to make partial loads) #
##########################################################
skip = 0
load_to = len(df)-1
max_steps = 5

errors = []
tqdm.pandas()
ip = IntProgress(min=0, max=max_steps) # instantiate the bar for each row
display(ip) # display the bar

for index, row in tqdm(df[skip:load_to].iterrows(), total=df[skip:load_to].shape[0], desc=f'Loading with Astrapy'):
    insert_catalog(row, new_df, index, max_steps, errors, ip)

# Export the resulting dataframe to an CSV file
new_df.to_csv("dataset/watches_dataset.csv", sep=',')

print("Finished")
print(f"Errors: {len(errors)}")

In [None]:
print(errors)

In [None]:
print(new_df.head())
print(len(new_df.index))

In [None]:
### Store everything in AstraDB (for later use in vector search)

In [None]:

# Load to vector store
def load_to_astra(df, collection):
  len_df = len(df)
  f = IntProgress(min=0, max=len_df) # instantiate the bar
  display(f) # display the bar
  for i in range(len_df):
    f.value += 1 # signal to increment the progress bar
    f.description = str(f.value) + "/" + str(len_df)

    product_name = df.loc[i, "product_name"]
    link = df.loc[i, "link"]
    product_images = df.loc[i,"product_images"]
    price = df.loc[i, "price"]
    details = df.loc[i, "details"]
    category = df.loc[i, "category"]
    gender = df.loc[i, "gender"]
    embeddings = df.loc[i, "embeddings"]

    try:
      # add to the Astra DB Vector Database
      collection.insert_one({
          "_id": i,
          "product_name": product_name,
          "link": link,
          "product_images": product_images,
          "price": price,
          "details": details,
          "category": category,
          "gender": gender,
          "$vector": embeddings,
        })
    except Exception as error:
      # if you've already added this record, skip the error message
      error_info = json.loads(str(error))
      if error_info[0]['errorCode'] == "DOCUMENT_ALREADY_EXISTS":
        print("Document already exists in the database. Skipping.")

load_to_astra(df, collection)