# Load Elastic

In this notebook, we're going to take the output form the [Transform Notebook](<./01 - transform.ipynb>) and load it into Elastic.

As always, we start with imports:

## Imports

In [13]:
from elasticsearch import Elasticsearch, helpers
import yaml
import json
import bucketstore
import randomname


## Setup

With importing out of the way we'll get a few basics out of the way. The below cell:
 1) Loads a file containing connection info to my DO Space and Elastic
 2) Uses the connection info to connect to the space
 3) Loads the bucket that all of the data is stored in.
 4) Connects to Elastic
 5) Sets some default variables

In [14]:
with open("secrets.yml", 'r') as ymlfile:
    cfg = yaml.safe_load(ymlfile)

bucketstore.login(
    access_key_id=cfg['spaces']['access'],
    secret_access_key=cfg['spaces']['secret'],
    region='nyc3',
    endpoint_url=cfg['spaces']['url']
)
bucket = bucketstore.get('wrathalake')

elastic = Elasticsearch(
    cloud_id=cfg['elastic']['cloudId'],
    api_key=(cfg['elastic']['apiKey'])
)

ridesKey = 'intermediate/rides.json'
dataStream = 'equilab-rides-dev'

## Functions

Below we have the main processing function that takes the ride filename, grabs the data, and formats it for Elasticsearch.

In [15]:
def buildData(ride):
    rideData = json.loads(bucket[ride['interFile']])
    rideName = '{}-{}'.format(ride['rideDate'], randomname.get_name())

    elasticData = [
        {
            '_op_type': 'create',
            '_index': dataStream,
            '_source': {
                '@timestamp': point['time'],
                'index': point['index'],
                'rideName': rideName,
                'location': {
                    'lat': point['coords']['lat'],
                    'lon': point['coords']['long']
                },
                'elevation': point['elevation'],
                'timeDelta': point.get('timeDelta', None),
                'distance': point.get('distance', None),
                'speed': point.get('speed', None),
                'climb': point.get('climb', None),
                'drop': point.get('drop', None)
            }
        }
        for point in rideData
    ]
    
    return elasticData

In [16]:
rides = json.loads(bucket[ridesKey])


In [17]:
for ride in rides.keys():
    if rides[ride]['inElastic'] == False:
        print("Need to add data for ride {} into Elastic".format(ride))
        
        elasticData = buildData(rides[ride])
        
        try:
            # make the bulk call, and get a response
            response = helpers.bulk(elastic, elasticData)

            rides[ride]['inElastic'] = True
            print (" - RESPONSE:", response)
        except Exception as e:
            print(" - ERROR:", e)
    else:
        # Uncomment the below line to print the rides that have already been processed.
        #print("Already processed data for ride {}.".format(ride))
        pass

bucket[ridesKey] = json.dumps(rides)

Already processed data for ride raw/sources/equilab/training-2023-09-09.gpx.
Already processed data for ride raw/sources/equilab/training-2023-09-10.gpx.
Already processed data for ride raw/sources/equilab/training-2023-09-23.gpx.
Already processed data for ride raw/sources/equilab/training-2023-10-14.gpx.
Already processed data for ride raw/sources/equilab/training-2023-10-22.gpx.
Already processed data for ride raw/sources/equilab/training-2023-11-04.gpx.
Already processed data for ride raw/sources/equilab/training-2023-11-05.gpx.
Already processed data for ride raw/sources/equilab/training-2023-11-18.gpx.
Already processed data for ride raw/sources/equilab/training-2023-11-19.gpx.
Need to add data for ride raw/sources/equilab/training-2023-08-12.gpx into Elastic

RESPONSE: (7460, [])
Need to add data for ride raw/sources/equilab/training-2023-08-13.gpx into Elastic

RESPONSE: (6180, [])


In [10]:
# Delete the Elasticsearch Data for re-processing
# elastic.indices.delete(index=dataStream)


In [18]:
# Print all of the rides that have been processed
list(rides.keys())[-5:]

['raw/sources/equilab/training-2023-11-05.gpx',
 'raw/sources/equilab/training-2023-11-18.gpx',
 'raw/sources/equilab/training-2023-11-19.gpx',
 'raw/sources/equilab/training-2023-08-12.gpx',
 'raw/sources/equilab/training-2023-08-13.gpx']

In [19]:
# Print the ride details for one of the rides above, by default it shows the lastest
rides[list(rides.keys())[-1]]

{'rawFile': 'raw/sources/equilab/training-2023-08-13.gpx',
 'interFile': 'intermediate/sources/equilab/training-2023-08-13.json',
 'rideDate': '20230813',
 'totalTime': 6250.003,
 'totalDistance': 29337.12579091715,
 'totalClimb': 6869.750875999966,
 'averageSpeed': 3.200412187301101,
 'inElastic': True}