In [1]:
import elasticsearch
from elasticsearch import Elasticsearch
import pandas as pd
import os 
import sys

In [2]:
import urllib3
urllib3.disable_warnings()

In [21]:
NULL_TOKEN = 'NULL'

I'm running this from Windows. I tried following [the docs here](https://www.elastic.co/guide/en/elasticsearch/reference/current/zip-windows.html) but had some trouble getting SSL verification to play ball. Life is too short for local SSL verification, so I'm going to run "insecurely" with http. This is for demo purposes only.

In [3]:
from elasticsearch import Elasticsearch, RequestsHttpConnection

es = Elasticsearch(['localhost'], port=9200, connection_class=RequestsHttpConnection, http_auth=('elastic', 'FNAEODo+=fmntpwN7NsR'), use_ssl=True, verify_certs=False)

assert es.ping()



# Create index

In [4]:
es.indices.create(index='my-first-index', ignore=400)

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'my-first-index'}

Go to Kibana dev tools and type the following in the console
```
GET _cat/indices
```
to view all indices

In [5]:
res = es.indices.get_alias('*')
for name in res:
    print(name)

.apm-custom-link
.kibana_security_session_1
.kibana_8.2.2_001
.apm-agent-configuration
.kibana-event-log-8.2.2-000001
.security-7
.kibana_task_manager_8.2.2_001
my-first-index




In [6]:
es.indices.delete(index='my-first-index', ignore=[400,401])

{'acknowledged': True}

In [7]:
res = es.indices.get_alias('*')
for name in res:
    print(name)

.apm-agent-configuration
.security-7
.kibana_security_session_1
.kibana_8.2.2_001
.kibana-event-log-8.2.2-000001
.apm-custom-link
.kibana_task_manager_8.2.2_001




And the index is now gone.

## Upload two sample JSON docs

In [8]:
docs = [
    {'first_name': 'John',
     'last_name': 'Doe',
     'age': 30,
     'interests': ['group theory', 'cats']
    },
    {'first_name': 'Alice',
     'last_name': 'Doe',
     'age': 29,
     'interests': ['category theory', 'dogs']
    }
]

In [9]:
es.indices.create(index='people', ignore=400)

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'people'}

In [10]:
for i, d in enumerate(docs):
    res = es.index(index='people', doc_type='_doc', body=d, id=i+1)

In [11]:
res

{'_index': 'people',
 '_id': '2',
 '_version': 1,
 'result': 'created',
 '_shards': {'total': 2, 'successful': 1, 'failed': 0},
 '_seq_no': 1,
 '_primary_term': 1}

Go to the console and type 
```
GET people/_search
```
and you'll see the documents appear.

In [12]:
es.indices.delete(index='people', ignore=[400,401])

{'acknowledged': True}

# Importing documents into elasticsearch

In [13]:
import pandas as pd
import json
from ast import literal_eval
from tqdm import tqdm
import datetime
import os
import sys
from elasticsearch import helpers

Data [comes from here](https://www.kaggle.com/datasets/shivamb/netflix-shows?resource=download).

In [14]:
df = pd.read_csv('data/netflix_titles.csv')

In [15]:
df.head()

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description
0,s1,Movie,Dick Johnson Is Dead,Kirsten Johnson,,United States,"September 25, 2021",2020,PG-13,90 min,Documentaries,"As her father nears the end of his life, filmm..."
1,s2,TV Show,Blood & Water,,"Ama Qamata, Khosi Ngema, Gail Mabalane, Thaban...",South Africa,"September 24, 2021",2021,TV-MA,2 Seasons,"International TV Shows, TV Dramas, TV Mysteries","After crossing paths at a party, a Cape Town t..."
2,s3,TV Show,Ganglands,Julien Leclercq,"Sami Bouajila, Tracy Gotoas, Samuel Jouy, Nabi...",,"September 24, 2021",2021,TV-MA,1 Season,"Crime TV Shows, International TV Shows, TV Act...",To protect his family from a powerful drug lor...
3,s4,TV Show,Jailbirds New Orleans,,,,"September 24, 2021",2021,TV-MA,1 Season,"Docuseries, Reality TV","Feuds, flirtations and toilet talk go down amo..."
4,s5,TV Show,Kota Factory,,"Mayur More, Jitendra Kumar, Ranjan Raj, Alam K...",India,"September 24, 2021",2021,TV-MA,2 Seasons,"International TV Shows, Romantic TV Shows, TV ...",In a city of coaching centers known to train I...


For every document we need an id. Let's see if `show_id` is a suitable candidate

In [16]:
assert df.show_id.nunique() == df.shape[0]

Yup.

In [17]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8807 entries, 0 to 8806
Data columns (total 12 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   show_id       8807 non-null   object
 1   type          8807 non-null   object
 2   title         8807 non-null   object
 3   director      6173 non-null   object
 4   cast          7982 non-null   object
 5   country       7976 non-null   object
 6   date_added    8797 non-null   object
 7   release_year  8807 non-null   int64 
 8   rating        8803 non-null   object
 9   duration      8804 non-null   object
 10  listed_in     8807 non-null   object
 11  description   8807 non-null   object
dtypes: int64(1), object(11)
memory usage: 825.8+ KB


Fill with a null token where there are missing values

In [22]:
df_impute = df.fillna(NULL_TOKEN)

## Prepare data for elasticsearch

In [19]:
df_upload = df_impute.to_dict(orient='records')

In [20]:
df_upload[0]          

{'show_id': 's1',
 'type': 'Movie',
 'title': 'Dick Johnson Is Dead',
 'director': 'Kirsten Johnson',
 'cast': 'NULL',
 'country': 'United States',
 'date_added': 'September 25, 2021',
 'release_year': 2020,
 'rating': 'PG-13',
 'duration': '90 min',
 'listed_in': 'Documentaries',
 'description': 'As her father nears the end of his life, filmmaker Kirsten Johnson stages his death in inventive and comical ways to help them both face the inevitable.'}

Preparing the dataset as `records` results in a dict for every row of the dataframe, where the keys are the columns. This is the required shape for upload into elasticsearch.

In [29]:
def records(frame: pd.DataFrame):
    for line in frame:
        yield {
            '_index': 'netflix',
            '_id': line['show_id'],
            '_source': {
                'title': line.get('title', NULL_TOKEN),
                'director': line.get('title', NULL_TOKEN),
                'description': line.get('title', NULL_TOKEN),
                'duration': line.get('duration', NULL_TOKEN),
                'cast': line.get('cast', NULL_TOKEN),
            }
        }

Note that mapping types are [no longer a thing](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/removal-of-types.html) in elasticsearch. Let's upload with the `bulk` API.

In [30]:
res = helpers.bulk(es, records(df_upload))

In elasticsearch, 
```
GET _cat/indices
```
gives
```
yellow open netflix djGNQEBxTnaX9LIwZo48sA 1 1 8807 0 4.8mb 4.8mb
```

Also
```
GET netflix/_count
```
should show 8807 records.
```
GET netflix/_settings
```
should show 1 replica and 1 shard, as we did not give any parameters to our bulk upload.
```
GET netflix/_mapping
```
will give the types inferred for each field. Everything should be `"type":"text"`.

In [31]:
es.indices.delete(index='netflix', ignore=[400,401])

{'acknowledged': True}

## Settings and mappings

State up-front the types of each field, to enforce types at upload.

In [36]:
settings_and_mappings = {
    "settings":{
        "number_of_shards":1,
        "number_of_replicas":0,
    },
    "mappings":{
        "properties":{
            "director":{"type":"text"},
            "duration":{"type":"text"},
            "description":{"type":"text"},
            "title":{"type":"text"},
            "cast":{"type":"text"},            
        }
    }
}

In [40]:
res = es.indices.create(index='netflix', ignore=[400, 404], body=settings_and_mappings)

If you take a look at `GET netflix/_mapping`, you should see the mappings above.

In [41]:
res = helpers.bulk(es, records(df_upload))

In [39]:
es.indices.delete(index='netflix', ignore=[400,401])

{'acknowledged': True}