In [3]:
from opensearchpy import OpenSearch, helpers


def opensearch_client(host, port, auth):
    client = OpenSearch(
        hosts = [{'host': host, 'port': port}],
        http_auth = auth,
        use_ssl = True,
        verify_certs = False
    )
    info = client.info()
    print(f"Welcome to {info['version']['distribution']} {info['version']['number']}!")
    return client

def opensearch_bulk_sync(client, index_name, df, mapping=None):
    if client.indices.exists(index=index_name):
        client.indices.delete(index=index_name)
    
    if mapping:
        client.indices.create(index=index_name, body=mapping)
    else:
        client.indices.create(index=index_name)

    data = [
        {"_index": index_name, "_id": i, "_source": row.to_dict()}
        for i, row in df.iterrows()
    ]

        
    success, failed = helpers.bulk(client=client, actions=data)
    return success, failed

def opensearch_bulk_async(client, index_name, df, mapping=None):
    if client.indices.exists(index=index_name):
        client.indices.delete(index=index_name)
        
    if mapping:
        client.indices.create(index=index_name, body=mapping)
    else:
        client.indices.create(index=index_name)
        
    data = dataframe_to_actions(df, index_name)
    success, failed = helpers.bulk(client=client, actions=data)
    return success, failed

def dataframe_to_actions(df, index_name):
    for i, row in df.iterrows():
        yield {
            "_index": index_name,
            "_id": i,
            "_source": row.to_dict()
        }



In [4]:
from opensearchpy.helpers import bulk
import pandas as pd
import json

# # Define the documents to be indexed
# documents = [
#     {"_index": "test-index", "_id": 1, "_source": {"field1": "value1", "field2": "value2"}},
#     {"_index": "test-index", "_id": 2, "_source": {"field1": "value3", "field2": "value4"}},
#     {"_index": "test-index", "_id": 3, "_source": {"field1": "value5", "field2": "value6"}}
# ]

host = '192.168.0.111'
port = 9200
auth = ('admin', 'Padmasini10') # For testing only. Don't store credentials in code.

client = opensearch_client(host, port, auth)

BASE_DIR = "../../../txtai/data"
df = pd.read_parquet(f"{BASE_DIR}/interns_sample.parquet")


# Load documents into OpenSearch
# success, _ = bulk(client, json.loads(df.to_json(orient="records")))

# success, _ = opensearch_bulk_sync(client, 'interns', df)
success, _ = opensearch_bulk_async(client, 'interns', df)

print(f"Successfully indexed {success} documents.")




Welcome to opensearch 2.16.0!




Successfully indexed 3972 documents.
