# ElasticSearch Upload
---
- Connect to Bonsai ES client
- Create cluster / shards if non existant 
- Add alias to cluser 
- Connect to Firestore
- Extract SkiBoards from Firestore
- Format / Validate SkiBoard objects
- Upload data to ElasticSearch

In [1]:
# https://docs.bonsai.io/article/102-python
import io, sys, os, base64, re, logging
import json
import datetime

from elasticsearch import Elasticsearch
from elasticsearch.client import IndicesClient
from elasticsearch.helpers import streaming_bulk

import firebase_admin
from firebase_admin import credentials, firestore

## Connect to Firestore

In [2]:
# Inititalise Firebase connection
try:
    firebase_cred = credentials.Certificate(sys.path[0] + '/../application/config/firebase_service_account_key.json')
    firebase_admin.initialize_app(firebase_cred)
except:
    firebase_admin.get_app()

In [3]:
db = firestore.client()

## Collect all SkiBoards from Firestore

In [5]:
docs = db.collection('Users').stream()
users = []

for doc in docs:
    doc_id = doc.id
    doc = doc.to_dict()

    user = {
        'id': doc_id
    }
    print('Extracting... {} {} ({})'.format(doc['fname'], doc['lname'], doc['email']))
    for key in doc:
        user[key] = doc[key]
        
    quiver = []
    collection_docs = db.collection('Users').document(doc_id).collection('Quiver').get()
    for collection in collection_docs:
        quiver_item = {
            'id': collection.id
        }
        collection = collection.to_dict()
        for detail in collection:
            quiver_item[detail] = collection[detail]
                
        quiver.append(quiver_item)
        
    user['quiver'] = quiver
    users.append(user)

Extracting... Admin User (lck3nny.dev@gmail.com)


In [6]:
print('Extracted {} Users\n'.format(len(users)))
print(users[0])

Extracted 1 Users

{'id': '7aLK5kOucZVIJ2EiXZlj', 'updated': DatetimeWithNanoseconds(2023, 4, 25, 20, 22, 36, 326366, tzinfo=datetime.timezone.utc), 'lname': 'User', 'permissions': ['admin'], 'email': 'lck3nny.dev@gmail.com', 'created': DatetimeWithNanoseconds(2023, 3, 5, 3, 46, 45, 958000, tzinfo=datetime.timezone.utc), 'fname': 'Admin', 'ski': False, 'snowboard': [True, 'goofy'], 'quiver': [{'id': 'M8KSbX3kKRGoLWmsc5Sb', 'skiboard': 'burton-custom-2022', 'added': DatetimeWithNanoseconds(2023, 5, 5, 1, 20, 7, 346431, tzinfo=datetime.timezone.utc), 'size': '154W'}]}


## Connect to ElasticSearch

In [7]:
es_endpoint = "https://gmcbdi8n2l:qd603uuo9b@skiboards-3495559625.us-west-2.bonsaisearch.net:443"
es_user = "gmcbdi8n2l"
es_password = "qd603uuo9b"

In [8]:
es_client = Elasticsearch([es_endpoint], basic_auth=(es_user, es_password))

In [9]:
new_index_name = "users-{}".format(datetime.date.today())
alias = "users"
print("New Index: " + new_index_name + " AKA: " + alias)

New Index: users-2023-05-27 AKA: users


In [10]:
idx_manager = IndicesClient(es_client)

In [11]:
# Create new index
try:
    idx_manager.create(index=new_index_name)
except RequestError:
    print("Error initializing index manager")

# Add alias to new index
try:
    idx_manager.put_alias(new_index_name, alias)
except:
    print("Error adding alias to cluster")

## Upload Firestore data to ElasticSearch

In [26]:
def generate_update_docs():    
    docs = db.collection('Users').stream()
    users = []

    for doc in docs:
        doc_id = doc.id
        doc = doc.to_dict()

        user = {
            'id': doc_id
        }
        '''
        print('Extracting... {} {} ({})'.format(doc['fname'], doc['lname'], doc['email']))
        for key in doc:
            user[key] = doc[key]

        quiver = []
        
        collection_docs = db.collection('Users').document(doc_id).collection('Quiver').get()
        for collection in collection_docs:
            quiver_item = {}
            collection = collection.to_dict()
            for detail in collection:
                quiver_item[detail] = collection[detail]

            quiver.append(quiver_item)

        user['quiver'] = quiver
        '''
        
        document = {
            '_type': 'document',
            '_id': doc_id,
            '_source': user
        }
        print("\nCompleted document definition")
        print(document)
        yield document


In [27]:
successes = 0
print("Processing update...")
print("New Index: " + new_index_name)
for ok, action in streaming_bulk(client=es_client, index=new_index_name, actions=generate_update_docs()):
    successes += ok

Processing update...
New Index: users-2023-05-27
Extracting... Admin User (lck3nny.dev@gmail.com)

Completed document definition
{'_type': 'document', '_id': '7aLK5kOucZVIJ2EiXZlj', '_source': {'id': '7aLK5kOucZVIJ2EiXZlj', 'updated': DatetimeWithNanoseconds(2023, 4, 25, 20, 22, 36, 326366, tzinfo=datetime.timezone.utc), 'lname': 'User', 'permissions': ['admin'], 'email': 'lck3nny.dev@gmail.com', 'created': DatetimeWithNanoseconds(2023, 3, 5, 3, 46, 45, 958000, tzinfo=datetime.timezone.utc), 'fname': 'Admin', 'ski': False, 'snowboard': [True, 'goofy']}}


BulkIndexError: ('1 document(s) failed to index.', [{'index': {'_index': 'users-2023-05-27', '_type': 'document', '_id': '7aLK5kOucZVIJ2EiXZlj', 'status': 400, 'error': {'type': 'illegal_argument_exception', 'reason': 'mapper [snowboard] cannot be changed from type [text] to [boolean]'}, 'data': {'id': '7aLK5kOucZVIJ2EiXZlj', 'updated': DatetimeWithNanoseconds(2023, 4, 25, 20, 22, 36, 326366, tzinfo=datetime.timezone.utc), 'lname': 'User', 'permissions': ['admin'], 'email': 'lck3nny.dev@gmail.com', 'created': DatetimeWithNanoseconds(2023, 3, 5, 3, 46, 45, 958000, tzinfo=datetime.timezone.utc), 'fname': 'Admin', 'ski': False, 'snowboard': [True, 'goofy']}}}])

----
----
                                                  B R E A K
----
----

## Search

In [147]:
query = "burton custom"

In [149]:
search_body = {
        "query": {
            "multi_match": {
                "query": query,
                "type": "bool_prefix",
                "fields": [
                    "brand",
                    "model",
                    "year"
                ]
            }
        }
    }
res = es_client.search(index=new_index_name, body=search_body)

RequestError: RequestError(400, 'search_phase_execution_exception', 'failed to create query: For input string: "burton custom"')

## Delete Index

In [150]:
try:
    idx_manager.delete(index=new_index_name)
except:
    print("No index found with alias: {}".format(new_index_name))