# Elasticsearch with Python 

## An Interactive Tech Talk

**1st November 2018**<br>
**Sean Kozer**

The purpose of this interactive tech talk is to familiarise you with Elasticsearch concepts and how to apply them in Python projects.

You will be searching, filtering and performing statistical analysis on movie data from IMDB.

### Connecting to Elasticsearch

The first step is to create a connection. We will be creating a persistent connection so we won't have to explicitly pass it into each method call.

The `"elasticsearch"` hostname is the name of ES service in `docker-compose.yml`

In [None]:
from elasticsearch_dsl import connections

# Create the default connection to Elasticsearch
client = connections.create_connection(hosts=["elasticsearch"], timeout=20)

### Defining the schema

Elasticsearch is technically a schemaless datastore. But to avoid issues down the track, it's explicitly define a mapping.

Elasticsearch-DSL provides a convenient wrapper which bears some similarities with Django models.

We'll be using several different datatypes for our movie data.

- For string fields which require full-text search, we're going to use the `Text` type.
- For string fields which are only used for filtering and sorting, we're going to use the `Keyword` type.
- The other datatypes we'll use are `Integer` and `Date`, which are self explanatory in this demo.

In [None]:
from elasticsearch_dsl import Date, Document, Integer, Keyword, Text

INDEX_NAME = "movies"

class Movie(Document):
    title = Text()
    overview = Text()
    genre = Keyword()
    release_date = Date()
    revenue = Integer()
    production_company = Keyword()

    class Index:
        name = INDEX_NAME
    
    def get_display_name(self):
        year = self.release_date.strftime('%Y')
        return '{title} ({year})'.format(title=self.title, year=year)
    
    def __repr__(self):
        return '<Movie: {}>'.format(self.get_display_name())

### Creating the index and mapping

Now that we have our mapping, it's time to push it to Elasticsearch.

In [None]:
# Create the mapping
Movie.init()

# Confirm the mapping exists
client.indices.get_mapping(INDEX_NAME)

### Indexing our first movie

Let's create a movie and push it to Elasticsearch. You'll notice the syntax here is very similar to Django models.

In [None]:
# Create an example movie
movie = Movie(
    meta={'id': 1},
    title="Example Movie",
    overview="This movie is about cats and dogs",
    genre=["Comedy", "Action"],
    release_date="2018-11-02",
    revenue=1000000,
    production_company="Pixar"
)
movie.save()

### Finding our first movie

**Success!** Let's search the index to try find the movie we created.

Elasticsearch returns all results when no filters or queries are specified.

Because this is just a test movie, we'll delete it right after we print it.

**Note:** Pagination defaults to 10 results. You can increase this by using Python array slicing.

In [None]:
from elasticsearch_dsl import Search

# Search for all movies (10 at a time) and delete them one by one
movies = Movie.search()
for movie in movies:
    print(movie)
    movie.delete()


### Use real data

Now that we're confident everything is working, let's index real movies!

In [None]:
import csv
import ast
from elasticsearch.helpers import bulk

MAX_ROWS = 1000
movies = []

with open("input/movies.csv") as csvfile:
    reader = csv.DictReader(csvfile)
    for idx, row in enumerate(reader):
        if idx >= MAX_ROWS:
            break

        movie = Movie(
            meta={"id": row["imdb_id"]},
            title=row["title"],
            overview=row["overview"],
            genre=[g["name"] for g in ast.literal_eval(row["genres"])],
            release_date=row["release_date"] or None,
            revenue=row["revenue"],
            production_company=[p["name"] for p in ast.literal_eval(row["production_companies"])],      
        )
        # Convert the movie to a dict with metadata so it can be bulk indexed
        movies.append(movie.to_dict(include_meta=True))
        
# Bulk index our movies     
res = bulk(client, movies)
print("{} movies successfully indexed. Failures: {}".format(*res))

### Full text search


#### Tokenisation

In our mapping we defined the title as a `Text` datatype which allows tokenisation and analysis at index time and search time.

By default, it uses the `standard` analyzer which has sensible, but basic defaults.<br>
You can create custom analyzers in index settings and assign them to fields in the mapping.<br>

For this demo, we'll keep it simple.

#### Scoring

When performing a full text search, we use the `Match` query type which scores results by keyword relevance.<br>
The algorithm which determines relevance is highly complicated, but primarily hinges on the [TF-IDF](https://en.wikipedia.org/wiki/Tf%E2%80%93idf) statistic.

Let's see this in action by searching for movie titles containing the word "Blue".

In [None]:
from elasticsearch_dsl.query import Match

# Find all movies with the word "blue" in the title
search = Movie.search().query(
    Match(title="blue")
)
list(search)

### Filtering

Elasticsearch comes with many types of filters.

The `Term` filter will return results with an exact match.

A good example is filtering movies by genre. Because we've defined genre as a `Keyword` datatype in the mapping, we can filter movies containing an exact genre.

**Note:** Movies with multiple genres (e.g. Action, Comedy) will be included if at least one of the genres matches.

As an example, let's find all comedy movies.

In [None]:
from elasticsearch_dsl.query import Term

# Find all comedy movies
search = Movie.search().filter(
    Term(genre="Comedy")
)
list(search)

### What about other types of filters?

Sometimes we might want to filter non-string fields in more useful ways. For example, filtering for a range of values.

In this example, we'll use the `Range` filter to find all movies released in the year 1994.

**Note:** If this were a real application, it would be more performant to index `year` as its own field with an `Integer` datatype.

In [None]:
from elasticsearch_dsl.query import Range

# Find all movies released in 1994
search = Movie.search().filter(
    Range(release_date={"gte": "1994-01-01", "lt": "1995-01-01"})
)
list(search)

### Aggregations

Searching and filtering movies has its uses, but what else can we learn from this dataset?<br>
Elasticsearch aggregations can be used for a broad range of applciations, from statistical analysis to generating faceted navigation.

We'll look at both cases below.

In order to generate faceted navigation, we need to aggregate the data into buckets.

For example, to find out the 10 most common genres in our dataset, we would use a `Terms` aggregation on the `genre` field. This will return the name of the genre and the number of results it has.

In [None]:
from elasticsearch_dsl.aggs import Terms

# Return all movies
search = Movie.search()

# Find the most common genres
search.aggs.bucket("genres", Terms(field="genre"))
results = search.execute()

for genre in results.aggregations.genres.buckets:
    print("{} has {} movies".format(genre.key, genre.doc_count))


### Let's be data scientists

Sorting our dataset into buckets is cool, but let's take aggregations one step further.<br>
In Elasticsearch, aggregations can be nested, which means we can extract more information at the bucket level.

For example, let's apply the previous pattern to see which production companies produce the most films.

Now, for each production company, let's extract statistics on revenue such as total gross, top gross, etc. across all their films. We do this by extending the bucket `Terms` aggregation with a `Stats` metric aggregation.

In [None]:
from elasticsearch_dsl.aggs import Stats

# Return all movies
search = Movie.search()

# Find the most common production companies
search.aggs.bucket(
    "production_companies",
    Terms(field="production_company").metric(
        "revenue_stats",
        Stats(field="revenue")
    )
)
results = search.execute()

for production_company in results.aggregations.production_companies.buckets:
    # Extract data from the top level aggregation
    company_name = production_company.key
    num_movies = production_company.doc_count
    
    # Extract revenue statistics from the sub-aggregation
    revenue_stats = production_company.revenue_stats
    top_gross = int(revenue_stats.max)
    total_gross = int(revenue_stats.sum)
    
    print("{} has {} movies with top gross of ${:,} and total gross of ${:,}\n".format(
        company_name, num_movies, top_gross, total_gross
    ))

### Other cool features

Elasticsearch has a whole bunch of cool features which I haven't covered in this tech talk but are worth looking into.

- Suggesters (used for autocomplete)
- Custom analyzers, tokenizers and token filters (used for synonyms, typo tolerance, internationalisation)
- MLT queries (used for finding similar documents based on keywords)
- Kibana and its plugins (for time-series data monitoring and analysis)
- Curator (for maintaining time-series indices)

### Resources

- [Elasticsearch Reference](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html)
- [Qbox Blog](https://qbox.io/blog)
- [Elasticsearch Blog](https://www.elastic.co/blog)


### References

- [Python Elasticsearch Client (elasticsearch-py)](https://elasticsearch-py.readthedocs.io/en/master/)
- [Elasticsearch DSL (elasticsearch-dsl)](https://elasticsearch-dsl.readthedocs.io/en/latest/index.html)
- [IMDB Dataset](https://www.kaggle.com/rounakbanik/the-movies-dataset/)