Now that we have configured OpenSearch and created an ingestion pipeline, we will ingest the **Visual Genome** dataset that we created in the [first notebook](01_load_data.ipynb).

In [1]:
import ast
import json
import requests
import pandas as pd
from opensearchpy import OpenSearch

### Read the dataset

In [2]:
df = pd.read_csv('../data/vgenome_sample_1k.csv', index_col=0)
df.shape

(1000, 3)

In [3]:
df.head()

Unnamed: 0,image_id,image_url,tags
53559,2366867,http://crowdfile.blob.core.chinacloudapi.cn/46...,"['birds', 'camera', 'bird', 'sky', 'neck', 'pe..."
41981,2378980,http://crowdfile.blob.core.chinacloudapi.cn/46...,"['window', 'line', 'double door', 'front winds..."
89244,2329504,http://crowdfile.blob.core.chinacloudapi.cn/46...,"['window', 'head', 'man', 'flag', 'pole', 'lig..."
71785,2347788,http://crowdfile.blob.core.chinacloudapi.cn/46...,"['headband', 'kid', 'child', 'whisk', 'visor',..."
100014,2318253,http://crowdfile.blob.core.chinacloudapi.cn/46...,"['bear', 'eye', 'claws', 'face', 'grass', 'mou..."


In [4]:
type(df.iloc[0, 2])

str

The `tags` column was read in as a string. Let's change that into a list of strings.

In [5]:
df = df.rename(columns={'tags': 'tags_list'})

df['tags_list'] = df['tags_list'].apply(ast.literal_eval)

df.head()

Unnamed: 0,image_id,image_url,tags_list
53559,2366867,http://crowdfile.blob.core.chinacloudapi.cn/46...,"[birds, camera, bird, sky, neck, peach sky, me..."
41981,2378980,http://crowdfile.blob.core.chinacloudapi.cn/46...,"[window, line, double door, front windshield, ..."
89244,2329504,http://crowdfile.blob.core.chinacloudapi.cn/46...,"[window, head, man, flag, pole, light, tie, bu..."
71785,2347788,http://crowdfile.blob.core.chinacloudapi.cn/46...,"[headband, kid, child, whisk, visor, container..."
100014,2318253,http://crowdfile.blob.core.chinacloudapi.cn/46...,"[bear, eye, claws, face, grass, mouth, these, ..."


Let's also create a list of all tags a concatenated, single string.

In [6]:
df['tags'] = df['tags_list'].apply(lambda x: " ".join(x))

df.head()

Unnamed: 0,image_id,image_url,tags_list,tags
53559,2366867,http://crowdfile.blob.core.chinacloudapi.cn/46...,"[birds, camera, bird, sky, neck, peach sky, me...",birds camera bird sky neck peach sky metal pol...
41981,2378980,http://crowdfile.blob.core.chinacloudapi.cn/46...,"[window, line, double door, front windshield, ...",window line double door front windshield back ...
89244,2329504,http://crowdfile.blob.core.chinacloudapi.cn/46...,"[window, head, man, flag, pole, light, tie, bu...",window head man flag pole light tie bus car sk...
71785,2347788,http://crowdfile.blob.core.chinacloudapi.cn/46...,"[headband, kid, child, whisk, visor, container...",headband kid child whisk visor container chips...
100014,2318253,http://crowdfile.blob.core.chinacloudapi.cn/46...,"[bear, eye, claws, face, grass, mouth, these, ...",bear eye claws face grass mouth these stone th...


### Ingest data

Before we ingest data, let's update the OpenSearch configurations by adding the `tags-ingest-pipeline`. Also, in order to use the tags embedding processor that we defined in our pipeline, we also need to create a _k-NN_ index. Since we had named out embedding vector column `tag_embedding`, that's the column that we need to map as a _k-NN_ vector. We also map the `tags` column as a text field.

In [7]:
json_file_path = '../configs/schema.json'

with open(json_file_path, 'r') as file:
    schema = json.load(file)

new_settings = {
    "settings": {
        "index.knn": True,
        "default_pipeline": "tags-ingest-pipeline"
    }
}

new_mappings = {
    "mappings": {
        "properties": {
            "tag_embedding": {
                "type": "knn_vector",
                "dimension": 384,
                "method": {
                    "engine": "lucene",
                    "space_type": "l2",
                    "name": "hnsw",
                    "parameters": {}
                }
            },
            "tags": {
                "type": "text"
            }
        }
    }
}

settings = {**new_settings['settings'], **schema['settings']}
properties = {**new_mappings['mappings']['properties'], **schema['mappings']['properties']}

schema['settings'] = settings
schema['mappings']['properties'] = properties

print(schema)

{'settings': {'index.knn': True, 'default_pipeline': 'tags-ingest-pipeline', 'index': {'number_of_shards': 1, 'number_of_replicas': 0, 'mapping.total_fields.limit': 20000}, 'analysis': {'filter': {'english_minimal_stem': {'type': 'stemmer', 'language': 'minimal_english'}}, 'analyzer': {'english_stemmed': {'type': 'custom', 'tokenizer': 'standard', 'filter': ['lowercase', 'english_minimal_stem']}}}}, 'mappings': {'properties': {'tag_embedding': {'type': 'knn_vector', 'dimension': 384, 'method': {'engine': 'lucene', 'space_type': 'l2', 'name': 'hnsw', 'parameters': {}}}, 'tags': {'type': 'text'}, 'product_id': {'type': 'keyword'}, 'product_title': {'type': 'text', 'analyzer': 'english_stemmed'}, 'product_description': {'type': 'text', 'analyzer': 'english_stemmed'}, 'product_bullet_point': {'type': 'text', 'analyzer': 'english_stemmed', 'fielddata': True}, 'product_brand': {'type': 'text', 'analyzer': 'english_stemmed', 'fielddata': True}, 'product_color': {'type': 'text', 'analyzer': 'e

#### Create an index for ingestion

This index, let's call it `tags_db` will have the `tags-ingest-pipeline` as the defaul pipeline.

In [8]:
url = "http://localhost:9200/tags_db"

headers = {'Content-Type': 'application/json'}

payload = schema

# delete if it already exists
response = requests.delete(url, headers=headers)

response = requests.put(url, headers=headers, data=json.dumps(payload))

print(response.json())

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'tags_db'}


_Now_ we are ready to ingest the documents.

In [9]:
host = 'localhost'
port = 9200

client = OpenSearch(
    hosts=[{'host': host, 'port': port}],
    http_compress=True,  # enable HTTP compression
    use_ssl=False,       # set to True if SSL is enabled on your cluster
    verify_certs=False   # set to True if SSL certificates should be verified
)

index_name = 'tags_db'

In [10]:
# input data (to be indexed)
docs = df[['image_id', 'tags']].to_dict(orient='records')

# we need to convert the dataframe into JSON format
json_lines = []
for _, row in df.iterrows():
    index_entry = {"index": {"_index": "tags_db", "_id": row["image_id"]}}
    tags_entry = {"tags": row["tags"]}
    json_lines.append(json.dumps(index_entry))
    json_lines.append(json.dumps(tags_entry))

# join JSON lines into a single string
result = "\n".join(json_lines)

In [11]:
# bulk upload (since our dataset is pretty small)

response = client.bulk(index=index_name, body=result)

### Perform default search

In [12]:
url = "http://localhost:9200/tags_db/_search"

payload = {
    "query": {
        "match_all": {}
    },
    "track_total_hits": "true",
    "size": 0
}

response = requests.post(url, headers=headers, data=json.dumps(payload))

print(response.json())

{'took': 554, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 989, 'relation': 'eq'}, 'max_score': None, 'hits': []}}


In [13]:
df.shape[0], response.json()['hits']['total']['value']

(1000, 989)

For some reason, not all images are loaded (11 were skipped), but that's fine for this exercise. The next step is to perform search using specific search keywords. Let's move on to the [next notebook](03_search_and_eval.ipynb).