# 2) Index ESCI Data

After the OpenSearch preparations are done we can move towards indexing products.

We're using the [ESCI](https://github.com/amazon-science/esci-data) datset.

In [155]:
import pandas as pd
import numpy as np
import mercury as mr
import requests
import json
from opensearchpy import OpenSearch

In [156]:
DATA_DIR = '/Users/danielwrigley/work/Testing/git_repos/esci-data/shopping_queries_dataset/'

# Load Data

In [157]:
df_products = pd.read_parquet(DATA_DIR + 'shopping_queries_dataset_products.parquet')

In [158]:
df_products.head(5)

Unnamed: 0,product_id,product_title,product_description,product_bullet_point,product_brand,product_color,product_locale
0,B079VKKJN7,"11 Degrees de los Hombres Playera con Logo, Ne...",Esta playera con el logo de la marca Carrier d...,11 Degrees Negro Playera con logo\nA estrenar ...,11 Degrees,Negro,es
1,B079Y9VRKS,Camiseta Eleven Degrees Core TS White (M),,,11 Degrees,Blanco,es
2,B07DP4LM9H,11 Degrees de los Hombres Core Pull Over Hoodi...,La sudadera con capucha Core Pull Over de 11 G...,11 Degrees Azul Core Pull Over Hoodie\nA estre...,11 Degrees,Azul,es
3,B07G37B9HP,11 Degrees Poli Panel Track Pant XL Black,,,11 Degrees,,es
4,B07LCTGDHY,11 Degrees Gorra Trucker Negro OSFA (Talla úni...,,,11 Degrees,Negro (,es


In [159]:
df_products.shape[0]

1814924

In [160]:
# only us the products with a product_locale of us
df_products_us = df_products[df_products['product_locale'] == 'us']
df_products_us.shape[0]

1215854

## Augment the existing ecommerce schema
Read in the existing ecommerce schema and augment it with the knn settings and the new ingestion pipeline for generating embeddings.
This ensures that when we reindex data, we are running the `nlp-ingest-pipeline` to get the embeddings on the title field.


Note the number of dimensions is set to 384, the third time we encounter the number. This makes sure we can store the embeddings generated by the model.

In [170]:
# Specify the path to your JSON file
json_file_path = './hybrid-search-baseline-comparison/schema.json'

with open(json_file_path, 'r') as file:
    schema = json.load(file)

# Add the new setting to the settings hash
new_settings = {
  "settings": {
    "index.knn": True,
    "default_pipeline": "nlp-ingest-pipeline"
  }
}

new_mappings = {
  "mappings": {
    "properties": {
      "title_embedding": {
        "type": "knn_vector",
        "dimension": 384,
        "method": {
          "engine": "lucene",
          "space_type": "l2",
          "name": "hnsw",
          "parameters": {}
        }
      }      
    }
  }
}

settings = {**new_settings['settings'], **schema['settings']}
properties = {**new_mappings['mappings']['properties'], **schema['mappings']['properties']}

schema['settings'] = settings
schema['mappings']['properties'] = properties
mr.JSON(schema, level=2)

In [171]:
url = "http://localhost:9200/ecommerce"

headers = {
    'Content-Type': 'application/json'
}

payload = schema
response = requests.request("DELETE", url, headers=headers)

response = requests.request("PUT", url, headers=headers, data=json.dumps(payload))
mr.JSON(response.json(), level=4)

In [172]:
# Create OpenSearch client
host = 'localhost'
port = 9200

client = OpenSearch(
    hosts=[{'host': host, 'port': port}],
    http_compress=True,  # Enable HTTP compression
    use_ssl=False,       # Set to True if SSL is enabled on your cluster
    verify_certs=False   # Set to True if SSL certificates should be verified
)

# Indexing DataFrame into OpenSearch
index_name = 'ecommerce'

batch_size = 1000  # Set the batch size
i = 1
actions = []  # List to hold the documents to be indexed

for idx, row in df_products_us.iterrows():
    doc = row.to_dict()
    action = '''
    {"index": {"_index": "''' + index_name + '''", "_id":"''' + doc['product_id'] + '''"}}
    '''
    json_string = json.dumps(doc)
    action = action + json_string
    
    actions.append(action)

    # When the batch size is reached, send the batch for indexing
    # Every entry in actions is two lines, so we multiply batch_size by 2
    if len(actions) >= batch_size:
        print(f"{batch_size * i} documents indexed")
        response = client.bulk(index=index_name, body=actions)
        actions = []  # Clear the list for the next batch
        i+=1
        #print(response)
# Index any remaining documents that didn't fill a complete batch
if actions:
    response = client.bulk(index=index_name, body=actions)

1000 documents indexed
2000 documents indexed
3000 documents indexed
4000 documents indexed
5000 documents indexed
6000 documents indexed
7000 documents indexed
8000 documents indexed
9000 documents indexed
10000 documents indexed
11000 documents indexed
12000 documents indexed
13000 documents indexed
14000 documents indexed
15000 documents indexed
16000 documents indexed
17000 documents indexed
18000 documents indexed
19000 documents indexed
20000 documents indexed
21000 documents indexed
22000 documents indexed
23000 documents indexed
24000 documents indexed
25000 documents indexed
26000 documents indexed
27000 documents indexed
28000 documents indexed
29000 documents indexed
30000 documents indexed
31000 documents indexed
32000 documents indexed
33000 documents indexed
34000 documents indexed
35000 documents indexed
36000 documents indexed
37000 documents indexed
38000 documents indexed
39000 documents indexed
40000 documents indexed
41000 documents indexed
42000 documents indexed
4

## Check that all documents were indexed


In [164]:
df_products_us.shape[0]

1215854

In [173]:
url = "http://localhost:9200/ecommerce/_search"
payload = {
  "query": {
    "match_all": {}
  },
  "track_total_hits": "true",
  "size": 0
}

response = requests.request("POST", url, headers=headers, data=json.dumps(payload))
mr.JSON(response.json(), level=3)