# Load Index into Elasticsearch

We are using ES 6.3 + ES-LTR plugin. LTR plugin was installed using the following command:

    cd <elasticsearch_home>
    bin/elasticsearch-plugin install http://es-learn-to-rank.labs.o19s.com/ltr-1.1.0-es6.3.1.zip
    
We are using [elasticsearch-head](https://github.com/mobz/elasticsearch-head) as our browser client. In order to make it work, you need to disable some security measures using the following directives in `config/elasticsearch.yml`.

    http.cors.enabled: true
    http.cors.allow-origin: "*"

First step is to set up an index `tmdbindex` and load the data into it.

In [1]:
import csv
import json
import os
import requests
import sqlite3

In [2]:
DATA_DIR = "../../data/tmdb-dataset"

MOVIES_DATA = os.path.join(DATA_DIR, "movies_metadata.csv")
LOOKUPS_DB = os.path.join(DATA_DIR, "lookups.db")

ES_URL = "http://localhost:9200"

## Create Index and Schema

In [3]:
headers = {
    "Content-Type": "application/json"
}
data = {
    "settings" : {
        "index" : {
            "number_of_shards" : 1, 
            "number_of_replicas" : 1 
        }
    },
    "mappings" : {
        "doc" : {
            "properties" : {
                "doc_id": { "type": "keyword", "store": "true" },
                "title": { "type": "text", "store": "true", "copy_to": "title_tfidf" },
                "description": { "type": "text", "store": "true", "copy_to": "description_tfidf" },
                "title_tfidf": { "type": "text", "store": "true", "similarity": "classic" },
                "description_tfidf": { "type": "text", "store": "true", "similarity": "classic" },
                "popularity": { "type": "double", "store": "true" },
                "release_dt": { "type": "date", "store": "true" },
                "revenue": { "type": "double", "store": "true" },
                "runtime": { "type": "double", "store": "true" },
                "rating": { "type": "double", "store": "true" },
                "keywords": { "type": "keyword", "store": "true" },
                "genres": { "type": "keyword", "store": "true" }
            }
        }
    }
}
resp = requests.put(ES_URL + "/tmdbindex", headers=headers, data=json.dumps(data))
print(resp.text)

{"acknowledged":true,"shards_acknowledged":true,"index":"tmdbindex"}


## Insert Records

In [4]:
def get_keywords(conn, movie_id):
    cur = conn.cursor()
    cur.execute("select keywords from keywords where mid = ?", [movie_id])
    rows = cur.fetchall()
    keywords = []
    if len(rows) > 0:
        for row in rows:
            keywords = row[0].split("|")
            break
    cur.close()
    return keywords


def filter_genres(conn, genres):
    filtered_genres = []
    cur = conn.cursor()
    for genre in genres:
        cur.execute("select gname from genres where gname = ?", [genre])
        rows = cur.fetchall()
        if len(rows) == 0:
            continue
        filtered_genres.append(genre)
    cur.close()
    return filtered_genres


def get_float(orig_value, default_value):
    if orig_value is None:
        return default_value
    elif len(orig_value.strip()) == 0:
        return default_value
    else:
        return float(orig_value)


def parse_genres(genre_json):
    if len(genre_json.strip()) == 0:
        return []
    names = []
    idname_pairs = json.loads(genre_json.replace("'", "\""))
    for idname_pair in idname_pairs:
        names.append(idname_pair["name"])
    return names

In [5]:
def add_record_to_es(es_url, doc_id, title, description, popularity, 
                     release_date, revenue, runtime, rating, keywords, genres,
                     should_commit=False):
    if doc_id is not None:
        doc = {
            "doc_id": str(doc_id),
            "title": title,
            "description": description,
            "popularity": popularity,
            "release_dt": release_date,
            "revenue": revenue,
            "runtime": runtime,
            "rating": rating,
            "keywords": keywords,
            "genres": genres
        }
        resp = requests.put(es_url + "/tmdbindex/doc/{:d}".format(doc_id), 
                            headers=headers, data=json.dumps(doc))
    if should_commit:
        requests.post(es_url + "/tmdbindex/_flush")


In [6]:
conn = sqlite3.connect(LOOKUPS_DB)
i = 0
should_commit = False
with open(MOVIES_DATA, "r") as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
        if i % 1000 == 0:
            print("{:d} records ingested into Elasticsearch".format(i))
            should_commit = True
        if row["original_language"] != "en":
            # only stick to english
            i += 1
            continue
        doc_id = int(row["id"])
        title = row["original_title"]
        description = row["overview"]
        popularity = get_float(row["popularity"], 0.0)
        release_date = row["release_date"]
        revenue = get_float(row["revenue"], 0.0)
        runtime = get_float(row["runtime"], 0.0)
        rating = get_float(row["vote_average"], 0.0)
        # look up keywords
        keywords = get_keywords(conn, doc_id)
        # parse out genres
        genres = filter_genres(conn, parse_genres(row["genres"]))
        # add record to solr
        add_record_to_es(ES_URL, doc_id, title, description, popularity, 
                         release_date, revenue, runtime, rating, keywords, genres,
                         should_commit=should_commit)
        should_commit = False
        i += 1

add_record_to_es(ES_URL, None, None, None, None, None, None, None, None, None, None, True)
print("{:d} records ingested into Elasticsearch, COMPLETE".format(i))
conn.close()

0 records ingested into Elasticsearch
1000 records ingested into Elasticsearch
2000 records ingested into Elasticsearch
3000 records ingested into Elasticsearch
4000 records ingested into Elasticsearch
5000 records ingested into Elasticsearch
6000 records ingested into Elasticsearch
7000 records ingested into Elasticsearch
8000 records ingested into Elasticsearch
9000 records ingested into Elasticsearch
10000 records ingested into Elasticsearch
11000 records ingested into Elasticsearch
12000 records ingested into Elasticsearch
13000 records ingested into Elasticsearch
14000 records ingested into Elasticsearch
15000 records ingested into Elasticsearch
16000 records ingested into Elasticsearch
17000 records ingested into Elasticsearch
18000 records ingested into Elasticsearch
19000 records ingested into Elasticsearch
20000 records ingested into Elasticsearch
21000 records ingested into Elasticsearch
22000 records ingested into Elasticsearch
23000 records ingested into Elasticsearch
24000