# This Notebook will help to get started with the Elasticsearch connector

In [6]:
import sys
import requests
import pandas as pd
import datetime
import json

In [7]:
print('Last run:', datetime.datetime.utcnow(), 'UTC')  # timezone can't be detected from browser

Last run: 2019-07-29 12:24:39.510683 UTC


In [8]:
# make sure ES is up and running
res = requests.get('http://elasticsearch:9200')
r=json.loads(res.content)
r

{'name': 'puSAOdA',
 'cluster_name': 'docker-cluster',
 'cluster_uuid': 'QOWGKg_3SQeeRys1ICaEvg',
 'version': {'number': '6.2.2',
  'build_hash': '10b1edd',
  'build_date': '2018-02-16T19:01:30.685723Z',
  'build_snapshot': False,
  'lucene_version': '7.2.1',
  'minimum_wire_compatibility_version': '5.6.0',
  'minimum_index_compatibility_version': '5.0.0'},
 'tagline': 'You Know, for Search'}

In [9]:
%%sh
whoami
# pip install elasticsearch
# already installed

jovyan


In [10]:
# connect to our cluster
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host': 'elasticsearch', 'port': 9200}])
es

<Elasticsearch([{'host': 'elasticsearch', 'port': 9200}])>

In [11]:
# Empty search to ensure it is working
res = es.search()
print(res["_shards"])
# res["hits"]["hits"][-1]


{'total': 6, 'successful': 6, 'skipped': 0, 'failed': 0}


Display our indices and document types saved in elasticsearch.

In [12]:
# the prefix "!" is the similar to "%%sh"-magic, but only valid in one line
!curl -XGET 'http://elasticsearch:9200/_cat/health?v'

epoch      timestamp cluster        status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent
1564403080 12:24:40  docker-cluster yellow          1         1      6   6    0    0        5             0                  -                 54.5%


In [13]:
# every index is green if it's healthy
!curl -XGET 'http://elasticsearch:9200/_cat/indices?v'

health status index               uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .kibana             RQxwCJ2UQkKjjBgKdTIL9g   1   0          2            1     11.5kb         11.5kb
yellow open   eu.dtz.data-2019.07 P6ue_VBAQiSH-LegMlydaA   5   1       4167            0    944.8kb        944.8kb


In [14]:
!curl -XGET 'http://elasticsearch:9200/logstash-2018.03.01/_mapping?pretty=true'

{
  "error" : {
    "root_cause" : [
      {
        "type" : "index_not_found_exception",
        "reason" : "no such index",
        "resource.type" : "index_or_alias",
        "resource.id" : "logstash-2018.03.01",
        "index_uuid" : "_na_",
        "index" : "logstash-2018.03.01"
      }
    ],
    "type" : "index_not_found_exception",
    "reason" : "no such index",
    "resource.type" : "index_or_alias",
    "resource.id" : "logstash-2018.03.01",
    "index_uuid" : "_na_",
    "index" : "logstash-2018.03.01"
  },
  "status" : 404
}


In [15]:
import subprocess
output = subprocess.check_output('curl -s -XGET "http://elasticsearch:9200/_all/_mapping"', shell=True)
output = json.loads(output)

indices = list(output.keys())
indices = [[index, typ] for index in indices for typ in output.get(index).get("mappings")]
list(indices)

[['.kibana', 'doc'], ['eu.dtz.data-2019.07', 'doc']]

In [16]:
# Problems with finding the document type for a given index pattern
#res = es.get(index="logstash-2018.02.07", id=1)
#print(res['_source'])


In [17]:
help(es.search)

Help on method search in module elasticsearch.client:

search(index=None, doc_type=None, body=None, params=None) method of elasticsearch.client.Elasticsearch instance
    Execute a search query and get back search hits that match the query.
    `<http://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html>`_
    
    :arg index: A comma-separated list of index names to search; use `_all`
        or empty string to perform the operation on all indices
    :arg doc_type: A comma-separated list of document types to search; leave
        empty to perform the operation on all types
    :arg body: The search definition using the Query DSL
    :arg _source: True or false to return the _source field or not, or a
        list of fields to return
    :arg _source_exclude: A list of fields to exclude from the returned
        _source field
    :arg _source_include: A list of fields to extract and return from the
        _source field
    :arg allow_no_indices: Whether to ign

In [18]:
# The results of metric testdata from the last 1 hour
body = {
  "size": 10,
  "query": {
    "bool": {
      "must": [
        {"range" : {
            "phenomenonTime" : {
                #"gte": "2018-02-20T09:08:34.230693+00:00", 
                "gte": "now-7d",
                "lte": "now", 
                "time_zone": "+01:00"
            }
        }},
        {"match": {
            "Datastream.name.keyword": {
                "query": "Airquality DS"
              }
        }}
      ]
    }
  }
}
res = es.search(index="logs*", body=body)
print("Got %d Hits:" % res['hits']['total'])
for hit in res['hits']['hits']:
    print("Timestamp: {}, \tmetric: {}, \tresult: {}\n".format(hit["_source"]["phenomenonTime"], 
            hit["_source"]["Datastream"]["name"], hit["_source"]["result"]))

Got 0 Hits:


# Visualize Data

In [19]:
import matplotlib.pyplot as plt
import numpy as np

In [20]:
# The results from the last 30 days, but the page size is limited to the size argument.
body = {
  "size": 10000,
  "query": {
    "bool": {
      "must": [
        {"range" : {
            "phenomenonTime" : {
                #"gte": "2018-02-20T09:08:34.230693+00:00", 
                "gte": "now-30d",
                "lte": "now", 
                "time_zone": "+01:00"
            }
        }},
        {"match": {
            "Datastream.name.keyword": {
                "query": "Current of the Panda Robot"
              }
        }}
      ]
    }
  }
}
res = es.search(index="eu.dtz.data-*", body=body)
print("Got %d Hits:" % res['hits']['total'])
res["hits"]["hits"][-1]

Got 634 Hits:


{'_index': 'eu.dtz.data-2019.07',
 '_type': 'doc',
 '_id': 'vVmwPWwBRoiMMqnEmgEN',
 '_score': 2.8039846,
 '_source': {'@version': '1',
  'host': 'iot86',
  'port': 57760,
  'path': 'datastack-adapter/adapter/datastore_adapter.py',
  'dayOfWeek': '1',
  'Datastream': {'@iot.id': 44,
   '@iot.selfLink': 'http://192.168.48.71:8082/v1.0/Datastreams(44)',
   'name': 'Current of the Panda Robot'},
  'stack_info': None,
  'tags': [],
  'resultTime': '2019-07-29T12:24:39.587527+00:00',
  'type': 'logstash',
  'message': '',
  'level': 'INFO',
  'hourOfDay': '12',
  'logger_name': 'datastore-adapter',
  '@timestamp': '2019-07-29T12:24:39.589Z',
  'phenomenonTime': '2019-07-29T12:24:39.587501+00:00',
  'result': 0.16171596689807538}}

In [21]:
print("Size of the hits: {:2f} kB".format(sys.getsizeof(res["hits"]["hits"])/1024))
len(res["hits"]["hits"]) # this request is limited to 10000 hits, as the the size may lead to inconviniences

Size of the hits: 5.367188 kB


634

In [22]:
# Initialize the scroll
page = es.search(
index = 'eu.dtz.data-*',
scroll = '2m',
size = 1000,
body = {
  "query": {
    "bool": {
      "must": [
        {"range" : {
            "phenomenonTime" : {
                #"gte": "2018-02-20T09:08:34.230693+00:00", 
                "gte": "now-30d",
                "lte": "now", 
                "time_zone": "+01:00"
            }
        }},
        {"match": {
            "Datastream.name.keyword": {
                "query": "Current of the Panda Robot"
              }
        }}
      ]
    }
  }
})

sid = page['_scroll_id']
scroll_size = page['hits']['total']

# Start scrolling and append data
data = list()
while (scroll_size > 0):
    print("Scrolling...")
    page = es.scroll(scroll_id = sid, scroll = '2m')
    # Update the scroll ID
    sid = page['_scroll_id']
    # Get the number of results that we returned in the last scroll
    scroll_size = len(page['hits']['hits'])
    print("appending array of size " + str(scroll_size))
    dataframe = [[row["_source"]["phenomenonTime"], row["_source"]["Datastream"]["name"], row["_source"]["result"]] for row in page['hits']['hits']]
    data += dataframe
    # Do something with the obtained page
print("Length of the resulting array:", len(data))

Scrolling...
appending array of size 0
Length of the resulting array: 0


In [23]:
data[0]

IndexError: list index out of range

In [None]:
df = pd.DataFrame(data, columns=["phenomenonTime", "name", "result"])
df.index = pd.to_datetime(df["phenomenonTime"])
df = df.drop("phenomenonTime", axis=1)
print(df.shape)
df.head()

In [None]:
df.to_csv("currentsPanda.csv")

In [None]:
%%sh
ls -l

In [None]:
print("Min: {}, max: {}, mu: {}, std: {}".format(
df["result"].min(), df["result"].max(), df["result"].mean(), df["result"].std()))
df["result"].plot.hist(bins=100)
plt.show()

In [None]:
# Timeline for the dataframe
df["result"].plot()
plt.show()

# Create a new Index from the Notebook

In [None]:
doc = {
    'author': 'kimchy',
    'text': 'Elasticsearch: cool. bonsai cool.',
    'timestamp': datetime.datetime.now(),
}
res = es.index(index="test-index", doc_type='tweet', id=1, body=doc)
#print(res['created'])

In [None]:
# the version iterates over the data
es.get(index="test-index", doc_type='tweet', id=1)

In [None]:
es.indices.refresh(index="test-index*")

# Connector to Spark

In [None]:
df.head(3)

In [None]:
import pyspark
sc = pyspark.SparkContext.getOrCreate()
sc


In [None]:
# create pyspark-dataFrame
sqlCtx = pyspark.SQLContext(sc)
sdf = sqlCtx.createDataFrame(df.astype(str))
sdf.show(5)

In [None]:
sdf.rdd.first()  # ["host"]

In [None]:
sdf.createOrReplaceTempView("tempTable")
res = sqlCtx.sql("""SELECT name, stddev(result) as std__of_result
            FROM tempTable
            GROUP BY name""")
res.show()

# Reading in Spark directly from Elasticsearch


In [None]:
import elasticsearch

In [None]:
help(elasticsearch)

In [None]:
df = sqlCtx.read.format("es")


In [None]:
!echo $PYSPARK_PYTHON

In [None]:
sc

# Sniffing

In [None]:
from elasticsearch import Elasticsearch

# you can specify to sniff on startup to inspect the cluster and load
# balance across all nodes
# you can also sniff periodically and/or after failure:
es = Elasticsearch([{'host': 'elasticsearch', 'port': 9200}],
          sniff_on_start=True,
          sniff_on_connection_fail=True,
          sniffer_timeout=60)

In [None]:
es