In [1]:
import eland as ed
import pandas as pd
import matplotlib.pyplot as plt

# import elasticsearch-py client
from elasticsearch import Elasticsearch

# Function for pretty-printing JSON
def json(raw):
    import json
    print(json.dumps(raw, indent=2, sort_keys=True))

In [2]:
# Connect to an Elasticsearch instance
# here we use the official Elastic Python client
# check it on https://github.com/elastic/elasticsearch-py
es = Elasticsearch(
  ['http://localhost:9200'],
  http_auth=("es_kbn", "changeme")
)
# print the connection object info (same as visiting http://localhost:9200)
# make sure your elasticsearch node/cluster respond to requests
json(es.info())

{
  "cluster_name": "elasticsearch",
  "cluster_uuid": "WAdmDzUvSXOI0NSD4YJQag",
  "name": "DESKTOP-Q85BIOJ",
  "tagline": "You Know, for Search",
  "version": {
    "build_date": "2020-12-05T01:00:33.671820Z",
    "build_flavor": "default",
    "build_hash": "1c34507e66d7db1211f66f3513706fdf548736aa",
    "build_snapshot": false,
    "build_type": "zip",
    "lucene_version": "8.7.0",
    "minimum_index_compatibility_version": "6.0.0-beta1",
    "minimum_wire_compatibility_version": "6.8.0",
    "number": "7.10.1"
  }
}


In [3]:
# Load the dataset from the local csv file of call logs
pd_df = pd.read_csv("./invoices.csv", sep=';', encoding = 'unicode_escape')
pd_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 541909 entries, 0 to 541908
Data columns (total 13 columns):
 #   Column             Non-Null Count   Dtype  
---  ------             --------------   -----  
 0   invoice_id         541909 non-null  object 
 1   item_id            541909 non-null  int64  
 2   item_model         541909 non-null  object 
 3   item_name          541909 non-null  object 
 4   item_brand         541909 non-null  object 
 5   item_vendor        541909 non-null  object 
 6   order_qty          541909 non-null  int64  
 7   invoice_date       541909 non-null  object 
 8   unit_price         541909 non-null  float64
 9   customer_id        541909 non-null  int64  
 10  country_name       541909 non-null  object 
 11  country_latitude   541909 non-null  float64
 12  country_longitude  541909 non-null  float64
dtypes: float64(3), int64(3), object(7)
memory usage: 53.7+ MB


In [4]:
#converting the type of Invoice Date Field from string to datetime.
pd_df['invoice_date'] = pd.to_datetime(pd_df['invoice_date'])

# Arrange prices for phones
pd_df['unit_price'] = pd_df['unit_price'] * 10.00

# Rename the columns to be snake_case
pd_df.columns = [x.lower().replace(" ", "_") for x in pd_df.columns]

# Combine the 'latitude' and 'longitude' columns into one column 'location' for 'geo_point'
pd_df["country_location"] = pd_df[["country_latitude", "country_longitude"]].apply(lambda x: ",".join(str(item) for item in x), axis=1)

# Drop the old columns in favor of 'location'
pd_df.drop(["country_latitude", "country_longitude"], axis=1, inplace=True)

pd_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 541909 entries, 0 to 541908
Data columns (total 12 columns):
 #   Column            Non-Null Count   Dtype         
---  ------            --------------   -----         
 0   invoice_id        541909 non-null  object        
 1   item_id           541909 non-null  int64         
 2   item_model        541909 non-null  object        
 3   item_name         541909 non-null  object        
 4   item_brand        541909 non-null  object        
 5   item_vendor       541909 non-null  object        
 6   order_qty         541909 non-null  int64         
 7   invoice_date      541909 non-null  datetime64[ns]
 8   unit_price        541909 non-null  float64       
 9   customer_id       541909 non-null  int64         
 10  country_name      541909 non-null  object        
 11  country_location  541909 non-null  object        
dtypes: datetime64[ns](1), float64(1), int64(3), object(7)
memory usage: 49.6+ MB


In [5]:
# Load the data into elasticsearch
ed_df = ed.pandas_to_eland(
    pd_df=pd_df,
    es_client=es,

    # Where the data will live in Elasticsearch
    es_dest_index="es-invoices",

    # Type overrides for certain columns, this can be used to customize index mapping before ingest
    es_type_overrides={
        "invoice_id": "keyword",
        "item_id": "keyword",
        "item_model": "keyword",
        "item_name": "keyword",     
        "item_brand": "keyword",
        "item_vendor": "keyword",   
        "order_qty": "integer",
        "invoice_date": "date",
        "unit_price": "float",  
        "customer_id": "keyword",
        "country_name": "keyword",
        "country_location": "geo_point"  
    },

    # If the index already exists what should we do?
    es_if_exists="replace",

    # Wait for data to be indexed before returning
    es_refresh=True,
)
ed_df.info()

<class 'eland.dataframe.DataFrame'>
Index: 541909 entries, 2000 to 541908
Data columns (total 12 columns):
 #   Column            Non-Null Count   Dtype         
---  ------            --------------   -----         
 0   country_location  541909 non-null  object        
 1   country_name      541909 non-null  object        
 2   customer_id       541909 non-null  object        
 3   invoice_date      541909 non-null  datetime64[ns]
 4   invoice_id        541909 non-null  object        
 5   item_brand        541909 non-null  object        
 6   item_id           541909 non-null  object        
 7   item_model        541909 non-null  object        
 8   item_name         541909 non-null  object        
 9   item_vendor       541909 non-null  object        
 10  order_qty         541909 non-null  int64         
 11  unit_price        541909 non-null  float64       
dtypes: datetime64[ns](1), float64(1), int64(1), object(9)
memory usage: 64.0 bytes
