# Data Ingestion

In [None]:
pip install opensearch-py

In [None]:
pip install requests

In [1]:
# Importing the packages

from opensearchpy import OpenSearch, RequestsHttpConnection
from opensearchpy.helpers import bulk

In [2]:
host = 'opensearch-node1'
port = 9200
auth = ('admin', 'admin')

# Create the client with SSL/TLS and hostname verification disabled.
client = OpenSearch(
    hosts = [{'host': host, 'port': port}],
    http_auth = auth,
    connection_class=RequestsHttpConnection,
    http_compress = False, # enables gzip compression for request bodies
    use_ssl = True,
    verify_certs = False,
    ssl_assert_hostname = False,
    ssl_show_warn = False
)
print(client.info())

{'name': 'opensearch-node1', 'cluster_name': 'opensearch-cluster', 'cluster_uuid': 'uav9xBZET9CFi1XQpXN7dQ', 'version': {'distribution': 'opensearch', 'number': '2.6.0', 'build_type': 'tar', 'build_hash': '7203a5af21a8a009aece1474446b437a3c674db6', 'build_date': '2023-02-24T18:57:04.388618985Z', 'build_snapshot': False, 'lucene_version': '9.5.0', 'minimum_wire_compatibility_version': '7.10.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'The OpenSearch Project: https://opensearch.org/'}


## Importing data using pandas
### Reading the csv and manually mapping the columns

In [3]:
import pandas as pd

iris_df = (pd.read_csv("Iris.csv").dropna().reset_index(drop=True))

body = {
    "mappings":{
        "properties": {
            "Id": {"type": "integer"},
            "SepalLengthCm": {"type": "float"},
            "SepalWidthCm": {"type": "float"},
            "PetalLengthCm": {"type": "float"},
            "PetalWidthCm": {"type": "float"},
            "Species": {"type": "text"}
        }
    }
}

### Creating the dataset Iris

In [4]:
response = client.indices.create("iris", body=body)
print('\nCreating dataset:')
print(response)


Creating dataset:
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'iris'}


### Ingesting data into the dataset from the csv
#### Using Explicit mapping

In [5]:
bulk_data = []
for i,row in iris_df.iterrows():
    bulk_data.append(
        {
            "_index": "iris",
            "_id": i,
            "_source": {        
                "Id": row['Id'],
                "SepalLengthCm": row['SepalLengthCm'],
                "SepalWidthCm": row['SepalWidthCm'],
                "PetalLengthCm": row['PetalLengthCm'],
                "PetalWidthCm": row['PetalWidthCm'],
                "Species": row['Species']
            }
        }
    )
bulk(client, bulk_data)

client.indices.refresh(index="iris")
client.cat.count(index="iris", format="json")

[{'epoch': '1680036929', 'timestamp': '20:55:29', 'count': '150'}]