# Sisyphus

In [None]:
from IPython.display import Image
image="https://i0.wp.com/sisypheanhigh.com/foot/wp-content/uploads/2015/08/sisyphus-red.png"
Image(url=image,width=200,height=200)

In [None]:
from elasticsearch import Elasticsearch
from pandasticsearch import Select, DataFrame
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

%matplotlib inline

The kibana web ui is at http://monster.us.cray.com:30601

In [None]:
hostname="http://monster.us.cray.com:30200"

In [None]:
client =  Elasticsearch(hostname, http_compress=True)
#print(client.cluster.health())

In [None]:
import datetime,time
from pytz import timezone

In [None]:
def addSeconds(now, increment=30):
    timestamp=time.mktime(time.strptime(now,"%Y-%m-%d %H:%M:%S"))
    dt=datetime.datetime.fromtimestamp(timestamp)
    dt=dt+datetime.timedelta(seconds=30)
    end=f"{dt:%Y-%m-%d %H:%M:%S}"
    return(end)

In [None]:
start="2019-08-08 09:10:00"

In [None]:
end=addSeconds(start)

In [None]:
#print("start date/time:" + start + " end date/time:" + end)

In [None]:
def myquery(start=None,end=None):
    return """
        {
            "size":0,
            "query": {
                "bool": {
                  "must": [{
                      "match_all": {}
                    },
                    {
                      "range": {
                        "timereported": {
                          "gte": "%s",
                          "lte": "%s",
                          "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                    }
                  }
                }
              ],
              "must_not": []
            }
          },
            "_source":{
                "excludes":[]
            },
            "aggs": {
            "2": {
              "date_histogram": {
                "field": "timereported",
                "interval": "30s",
                "time_zone": "America/Chicago",
                "min_doc_count": 1
              }
            }
          }
        }
    """ % (start,end)

In [None]:
q=myquery(start, end)
resp = client.search(index="shasta-logs-*", body=q)
#print("Number of responses: " + "{:,}".format(resp['hits']['total']))

In [None]:
table=resp['aggregations']['2']['buckets']
#print(len(table))
pd_table=pd.DataFrame.from_dict(table)
if not pd_table.empty:
   print(pd_table.head(1))

In [None]:
#pd_table['doc_count'].describe()

In [None]:
pd_table['key_as_string']=pd.to_datetime(pd_table['key_as_string'].astype(str), format='%Y-%m-%dT%H:%M:%S.%f%z')

In [None]:
count=pd_table['doc_count'].count()
print("Number of entries: %d." % count)

In [None]:
if ( count > 100 ):
   sample = pd_table.sample(100)
else:
   sample = pd_table.sample(count)

X=sample['key_as_string'].tolist()
Y=sample['doc_count'].tolist()
#print(X)
#print(Y)

In [None]:
import time
import numpy as np
from bokeh.models.sources import ColumnDataSource
from bokeh.plotting import figure
from bokeh.io import output_notebook, show, push_notebook
from bokeh.models import DatetimeTickFormatter
from bokeh.models.tools import HoverTool
from bokeh.models import BoxAnnotation
from math import pi

In [None]:
output_notebook()

In [None]:
p = figure(x_axis_type="datetime", plot_width=1200, plot_height=400)
source = ColumnDataSource(data={'datetime' : X, 'count' : Y,})

#test_data = ColumnDataSource(data=dict(x=[0], y=[0]))
#line = my_figure.line("x", "y", source=test_data)
#line = p.circle("x", "y", source=test_data, size = 8, color = 'navy', alpha=0.3)
pcolor='darkblue'

line = p.circle(x='datetime',y='count', legend="msg count", source=source, size = 3, color = pcolor, alpha=0.6)

low_box = BoxAnnotation(top=100, fill_alpha=0.1, fill_color='wheat')
mid_box = BoxAnnotation(bottom=100, top=100000, fill_alpha=0.1, fill_color='yellowgreen')
high_box = BoxAnnotation(bottom=100000, fill_alpha=0.1, fill_color='crimson')

p.add_layout(low_box)
p.add_layout(mid_box)
p.add_layout(high_box)


p.title.text = 'Message Counts per 30 minutes'
p.background_fill_color="#f5f5f5"
p.grid.grid_line_color="white"
p.yaxis.axis_label = 'Count'
p.xaxis.axis_label =' timereported per 30 seconds'
p.xaxis.formatter=DatetimeTickFormatter(
        hours=["%d %B %Y"],
        days=["%d %B %Y"],
        months=["%d %B %Y"],
        years=["%d %B %Y"],
    )
p.xaxis.major_label_orientation = pi/4

hover = HoverTool(
    tooltips=[
        ('Count', '@count{%d}'),
        ('timereported per 30 seconds','@datetime{%Y-%m-%d %H:%M:%S.%3N}')
    ],
    formatters={
        'count':'printf',
        'datetime': 'datetime',
    },
    # display a tooltip whenever the cursor is vertically in line with a glyph
    mode='vline'
)

handle = show(p, notebook_handle=True)

In [None]:
from threading import Thread

stop_threads = False

# Need to get the next 30 second increment to plot off the old value
def blocking_callback(id, stop):
    new_data=dict(datetime=[0], count=[0])          
    global start, end 
    global pcolor
    
    step      = 0
    step_size = 1  # increment for increasing step
    max_step  = 10  # arbitrary stop point for example
    period    = 0.1  # in seconds (simulate waiting for new data)
    n_show    = 100000  # number of points to keep and show
    low       = 10
    high      = 1000000

    while True:
        start=end
        if (datetime.datetime.fromtimestamp(time.mktime(time.strptime(start,"%Y-%m-%d %H:%M:%S"))) > datetime.datetime.now()):
            pcolor='darkgreen'
            if ( period < 30 ):
                print("resetting query timer to 30 secs...")
            period=30
                         
        end=addSeconds(start)
        #print("start date/time:" + start + " end date/time:" + end)
        q=myquery(start, end)
        resp = client.search(index="shasta-logs-*", body=q)
        #print("Number of responses: " + "{:,}".format(resp['hits']['total']))
        table=resp['aggregations']['2']['buckets']
        #print(len(table))
        pd_table=pd.DataFrame.from_dict(table)
        # Sometimes the result can be null
        if pd_table.empty: continue
        pd_table['key_as_string']=pd.to_datetime(pd_table['key_as_string'].astype(str), format='%Y-%m-%dT%H:%M:%S.%f%z')
        count=pd_table['doc_count'].count()
        #print("Number of entries: %d." % count)
        sample = pd_table.sample(count)

        X=sample['key_as_string'].tolist()
        Y=sample['doc_count'].tolist()

        new_data['datetime'] = X
        new_data['count'] = Y
        #cnt=0
        #average=0
        #print(Y)
        #for n in Y:
        #    cnt=cnt+1
        #    average=(int)((average+n)/cnt)
        #
        #print(average)
        #if  (( average < low ) or ( average > high )):
        #        continue
        #new_data['count'] = [average]
        
        #test_data.stream(new_data, n_show)
        source.stream(new_data, n_show)
        push_notebook(handle=handle)
        step += step_size
        time.sleep(period)

        if stop():
            print("exit")
            break

thread = Thread(target=blocking_callback, args=(id, lambda: stop_threads))
thread.start()

In [None]:
# preceding streaming is not blocking
#for cnt in range(10):
#    print("Do this, while plot is still streaming", cnt)

In [None]:
# you might also want to stop the thread
stop_threads=True
del thread