In [1]:
import pandas as pd
import re
import json
import time
import requests
from requests.auth import HTTPBasicAuth
from elasticsearch import Elasticsearch
import elasticsearch
from elasticsearch.helpers import bulk

print(elasticsearch.__version__)

(7, 13, 3)


In [2]:
# !!! CUSTOMIZE THIS SECTION WITH YOUR CREDENTIALS !!!

USER = 'elastic'
PWD = 'YOUR_PWD_HERE'
index_name = 'books'
ES_ENDPOINT = 'https://localhost:9200'

path_to_ca_certificates = '/PATH/TO/elasticsearch-8.5.3/config/certs/http_ca.crt'

### Read data

In [3]:
df = pd.read_csv('books.csv')
df = df.set_index('id')
df.head()

Unnamed: 0_level_0,author_name,title,country,language,year
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
14602826,"Yearsley, Ann",Poems on several occasions,England,English,1786
14602830,"A, T.",A Satyr against Vertue. (A poem: supposed to b...,England,English,1679
14602831,"A, T.","The Aeronaut, a poem; founded almost entirely,...",Ireland,English,1816
14602832,"Albert, Prince Consort, consort of Victoria, Q...","The Prince Albert, a poem",Ireland,English,1868
14602833,"Anslow, Robert","The Defeat of the Spanish Armada, A.D. 1588. A...",England,English,1888


In [4]:
#transform dataframe into json format
docs = df.to_dict(orient='records')
doc_ids = df.index
print(doc_ids[0])
print(docs[0])

14602826
{'author_name': 'Yearsley, Ann', 'title': 'Poems on several occasions', 'country': 'England', 'language': 'English', 'year': 1786}



### Elasticsearch Python wrapper

In [5]:
def create_index(es, index_name, settings=None):
    """
    Create an Elasticsearch index
    @param es: an Elasticsearch object
    @param index_name: the name of the new index to be created
    @param settings: the index settings
    @return whether the index was created
    """
    is_created = False
    try:
        if es.indices.exists(index=[index_name]):
            es.indices.delete(index=[index_name], ignore=[404])
        es.indices.create(index=index_name, settings=settings)
        is_created = True
    except Exception as ex:
        print(str(ex))
    return is_created

In [6]:
# Index settings
settings_basic = {
        "number_of_shards": 4,
        "number_of_replicas": 2,
        "analysis": {
            "analyzer": {"std_english": {"type": "standard", "stopwords": "_english_" }}
        }
}

In [7]:
#connect to the local elasticsearch node and authenticate
es = Elasticsearch(hosts=[ES_ENDPOINT], ca_certs=path_to_ca_certificates, basic_auth=(USER, PWD))
#create an index
is_created = create_index(es, index_name, settings=settings_basic)
print(f'Index creation: {is_created}')

ConnectionError([Errno 2] No such file or directory) caused by: SSLError([Errno 2] No such file or directory)
Index creation: False


In [None]:
#loops over the first 10 documents
for i, doc in zip(doc_ids[0:10], docs[0:10]):
    #index the documents with corresponding ids
    res = es.index(index=index_name, id=i, document=doc)
    print(res)
# see also the bulk functions for importing under: elasticsearch.helpers

In [None]:
actions = [
  {
    "_index": index_name,
    "_id": doc_id,
    "_source": doc
  }
  for doc_id, doc in list(zip(doc_ids, docs))
]

# send actions in bulk (the API takes care of chunking them optimally)
bulk(es, actions)

In [None]:
# get settings info of the selected index
es.indices.get_settings(index=index_name)

In [None]:
# retrieve a document with a given ID
es.get(index=index_name, id=doc_ids[1])

In [None]:
# this is how you would delete the index
es.indices.delete(index=index_name, ignore=404)

### Elasticsearch with python cURL (Requests)

In [None]:
class Elastic:
    """
    A convenience object to send HTTP requests to Elasticsearch
    """
    def __init__(self, endpoint, username, password, path_to_ca_certificates):
        """
        @param endpoint: the URL of the Elasticsearch instance
        @param username: the Elasticsearch username 
        @param password: the Elasticsearch password
        """
        self.header = {'Content-Type': 'application/json', 'charset':'UTF-8'}
        #self.header={'Content-Type': '--data-binary application/x-ndjson'}
        self.endpoint = endpoint
        self.username = username
        self.password = password
        self.path_to_ca_certificates = path_to_ca_certificates
        self.methods_mapping = {'get': requests.get, 
                                'put':requests.put, 
                                'post':requests.post, 
                                'delete':requests.delete}
        
    def curl(self, method, handle, json=None):
        """
        Sends an HTTP request to the Elasticsearch instance
        @param method: can be 'get', 'put', 'post', 'delete'
        @param handle: the API handle to be appended to the Elasticsearch url
        @param json: the json payload of the HTTP request
        """
        http_method = self.methods_mapping[method.lower()]
        r = http_method(f'{self.endpoint}/{handle}', auth=HTTPBasicAuth(USER, PWD), 
                        headers=self.header, json=json,
                        verify = self.path_to_ca_certificates)
        return r

In [None]:
e = Elastic(ES_ENDPOINT, USER, PWD, path_to_ca_certificates)

In [None]:
# create another index jsut as an example. in the following, we will keep using the book index
# created using the Elasticsearch API

create_index_json={
  "mappings" : {
      "properties" : {
        "author_name" : {
          "type" : "text"
        },
        "country" : {
          "type" : "keyword"
        },
        "language" : {
          "type" : "keyword"
        },
        "title" : {
          "type" : "text"
        },
        "year" : {
          "type" : "long"
        }
      }
  },
  "settings": {
    "number_of_shards": 4, 
    "number_of_replicas": 2, 
    "index.max_result_window": 20000,
    "index" : {
        "similarity" : {
          "default" : {
            "type" : "BM25", "b": 0.75, "k1": 1.2
          }
        }
    },
    "analysis": {
      "analyzer": {
        "std_english": {"type": "standard", "stopwords": "_english_" }
      }
    }
  }
}

# create an index
r = e.curl('put', index_name, json=create_index_json)
r.json()

In [None]:
# get the index details and settings
r = e.curl('get', index_name)
r.json()

In [None]:
# deactivate refresh in preparation of data indexing
r = e.curl('put', 'books/_settings', {'index' : {'refresh_interval' : -1}})
r.json()

In [None]:
# index documents with their individual ids (use bulk below for speedup)
for doc_id, doc in list(zip(doc_ids, docs))[0:10]:
    r = e.curl('post', f'books/_doc/{doc_id}', json=doc)
r.json()

In [None]:
# bulk indexing (via official API)

#connect to the local elasticsearch node and authenticate
es = Elasticsearch([ES_ENDPOINT], ca_certs=path_to_ca_certificates, basic_auth=(USER, PWD))

actions = [
  {
    "_index": index_name,
    "_id": doc_id,
    "_source": doc
  }
  for doc_id, doc in list(zip(doc_ids, docs))
]

# send actions in bulk (the API takes care of chunking them optimally)
bulk(es, actions)

In [None]:
# reset the refresh interval to 2 seconds
r = e.curl('put', 'books/_settings', {'index' : {'refresh_interval' : '2s'}})
r.json()

In [None]:
r = e.curl('get', f'books/_doc/{doc_ids[42]}')
r.json()

In [None]:
# this is how you would delete the index
r = e.curl('delete', 'books')
r.json()

### Search queries [EXERCISES]

#### Empty query

In [None]:
# empty query
r = e.curl('get', f'books/_search')
r.json()

#### Aggregation query

In [None]:
""" 
EXERCISE A: 
execute an aggregation query to count the number of books writte in each country
"""

#### Full-text query

In [None]:
""" 
EXERCISE B: 
execute a full-text query for the query "love magic"
"""

#### Exact match query

In [None]:
""" 
EXERCISE C: 
execute an exact-match query for the query "magic love"
"""

#### Multi-field full-text query with field boosting

In [None]:
""" 
EXERCISE D: 
Execute a query that searches both on tile and author
Weights the importance of matches on the author field twice as much as matches on the title
Sse "shakespeare" as query term
"""

#### Combining different queries

In [None]:
"""
EXERCISE E:
execute a *single* boolean query returning books that:
- have the "queen mary" in the title and were written in England
- should NOT have been published in the range of years [1850-1913]
"""

#### Fuzzy queries

In [None]:
"""
EXERCISE F:
execute a fuzzy query for the query "comander" with at most 50 expansions 
and considering transpositions
"""

#### Get TF of terms in document

In [None]:
"""
EXERCISE G:
get the term frequencies (in the title) of document with id = 4200
consider only words with a minimum length of 4 and a minimum term frequency of 2
"""

### Solutions

In [None]:
# SOLUTION TO EXERCISE A

# aggregation query
query = {
    "aggregations": {
        "by_category": {
            "terms": {
                "field": "country",
                "size":100
            }
        }
    }
}
r = e.curl('get', f'books/_search', query)
r.json()

In [None]:
# SOLUTION TO EXERCISE B

# full-text query
query = {
    "query": {
        "match": {
            "title": {
                "query": "love magic"
            }
        }
    }
}
r = e.curl('get', f'books/_search', query)
r.json()

In [None]:
# SOLUTION TO EXERCISE C

# exact match query
query = {
  "query": {
    "match_phrase": {
      "title": {
        "query": "magic love"
      }
    }
  }
}
r = e.curl('get', f'books/_search', query)
r.json()

In [None]:
# SOLUTION TO EXERCISE D

# Full text query, multiple fields with boosting
query = {
"query": {
    "multi_match": {
            "query": "shakespeare",
            "fields": ["title", "author_name^2"],
            "type": "phrase"
        }
    }
}
r = e.curl('get', f'books/_search', query)
r.json()

In [None]:
# SOLUTION TO EXERCISE E

# One or more queries can be specified in each of the clauses
# All the clauses are optional
# MUST: A document must match all of the queries
# MUST_NOT: A document must not match any of the queries
# SHOULD: A document does not have to match the queries, but it is considered more relevant if it does
# FILTER: Filters with yes/no categories
query = {
  "size" : 100,
  "query": {
    "bool": {
      "must": [
        {
          "match": {
                "title": "queen mary"
          }
        },
        {
          "match": {
              "country": "England"
          }
        }
      ],
      "must_not": [
        {
          "range": {
            "year": {
                "gte": 1850,
                "lte": 1913
            }
          }
        }
      ]
    }
  }
}
r = e.curl('get', f'books/_search', query)
r.json()

In [None]:
# SOLUTION TO EXERCISE F

query ={
  "query": {
    "fuzzy": {
      "title": {
        "value": "comander",
        "fuzziness": "AUTO",
        "max_expansions": 50,
        "prefix_length": 0,
        "transpositions": True,
        "rewrite": "constant_score"
      }
    }
  }
}
r = e.curl('get', f'books/_search', query)
r.json()

In [None]:
# SOLUTION TO EXERCISE G

# https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-termvectors.html
query = {
    "fields" : ["title"],
    "term_statistics" : True,
    "field_statistics" : True,
    "positions": True,
    "filter": {
        "min_word_length": 4,
        "min_term_freq" : 2
  }
}
#r = e.curl('get', f'books/_doc/{doc_ids[4200]}')
r = e.curl('get', f'books/_termvectors/{doc_ids[4200]}', query)
r.json()