# Inserting a Pandas DataFrame into Elasticsearch

In this example we're going to upload data into Elasticsearch, including performing some cleaning to make the ES experience better.

https://towardsdatascience.com/exporting-pandas-data-to-elasticsearch-724aa4dd8f62

First we need to import some libraries"

In [1]:
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import pandas as pd
import tqdm


For our *analytical ecosystem* each service we add will be available at the name we use in the `docker-compose` file for that service. In this case Elasticsearch. is available at... `elasticsearch`.

In [2]:
es = Elasticsearch(['elasticsearch'])


And just for good measure, we'll make sure we can connect.

In [3]:
es.cluster.health()


{'cluster_name': 'docker-cluster',
 'status': 'yellow',
 'timed_out': False,
 'number_of_nodes': 1,
 'number_of_data_nodes': 1,
 'active_primary_shards': 9,
 'active_shards': 9,
 'relocating_shards': 0,
 'initializing_shards': 0,
 'unassigned_shards': 1,
 'delayed_unassigned_shards': 0,
 'number_of_pending_tasks': 0,
 'number_of_in_flight_fetch': 0,
 'task_max_waiting_in_queue_millis': 0,
 'active_shards_percent_as_number': 90.0}

Moving data to Elasticsearch from a DataFrame isn't as simple as it should be, at least in my opinion. I should probably fix that someday and create a pandas library to package some of this up. Until then, we'll define a few functions to get us going.

In [4]:
# This function takes a DataFrame and the ES index we want to save it in.
def doc_generator(df, index):
    df_iter = df.iterrows()
    
    for i, document in df_iter:
        yield {
                "_index": index,
                "_source": document.to_json(),
            }

# This function takes a date value and returns a safe date value. We have to avoid `NaT` values
def safe_date(date_value):
    return (
        pd.to_datetime(date_value) if not pd.isna(date_value)
            else  datetime(1970,1,1,0,0)
    )

# This function takes a string column and makes sure there is something in it. We have to avoid Na values.
def safe_value(field_val):
    return field_val if not pd.isna(field_val) else "Other"


In [5]:
# For each date column that we need to sanitize we repeat the below line:
# df['goodDateField'] = df['PossiblyBlankDateField'].apply(safe_date)

# For each string column that we need to sanitize we repeat the below line:
# df['goodStringField'] = df['PossiblyBlankField'].apply(safe_value)


In [6]:
df = pd.read_csv('https://raw.githubusercontent.com/mwaskom/seaborn-data/master/mpg.csv')


In [7]:
df.head()


Unnamed: 0,mpg,cylinders,displacement,horsepower,weight,acceleration,model_year,origin,name
0,18.0,8,307.0,130.0,3504,12.0,70,usa,chevrolet chevelle malibu
1,15.0,8,350.0,165.0,3693,11.5,70,usa,buick skylark 320
2,18.0,8,318.0,150.0,3436,11.0,70,usa,plymouth satellite
3,16.0,8,304.0,150.0,3433,12.0,70,usa,amc rebel sst
4,17.0,8,302.0,140.0,3449,10.5,70,usa,ford torino


In [8]:
es.indices.delete(index='car-mpg-dataset', ignore=[400, 404])


{'error': {'root_cause': [{'type': 'index_not_found_exception',
    'reason': 'no such index [car-mpg-dataset]',
    'resource.type': 'index_or_alias',
    'resource.id': 'car-mpg-dataset',
    'index_uuid': '_na_',
    'index': 'car-mpg-dataset'}],
  'type': 'index_not_found_exception',
  'reason': 'no such index [car-mpg-dataset]',
  'resource.type': 'index_or_alias',
  'resource.id': 'car-mpg-dataset',
  'index_uuid': '_na_',
  'index': 'car-mpg-dataset'},
 'status': 404}

In [9]:
bulk(es, doc_generator(df, 'car-mpg-dataset'), request_timeout=20)

(398, [])