In [None]:
import numpy as np
import pandas as pd
import cufflinks as cf
import plotly
import plotly.offline as py
import plotly.graph_objs as go
from datetime import datetime  
from datetime import timedelta
from pmdarima.arima import auto_arima
from elasticsearch import Elasticsearch

In [None]:
# for offline visualization
py.init_notebook_mode(connected=True)
cf.go_offline()

In [None]:
es = Elasticsearch(timeout=120)

In [None]:
# parse elasticsearch aggregation result to get a list of (action, count)
def get_action_count(action_buckets, count_agg_name):
    output = []
    for bucket in action_buckets:
        action = bucket['key']
        count = bucket[count_agg_name]['value']
        output.append((action, count))
    return output

In [None]:
# retrieve gateway from action url
def get_gateway_from_action(action):
    parts = action.split('/')
    gateway = None
    if len(parts) >= 6 and parts[4] == 'gateway':
        gateway = parts[5]
    if gateway == '':
        gateway = parts[6]
    return gateway

In [None]:
# transform (action, count) dictionary to (gateway, count)
def action_to_gateway_count(action_count):
    gateway_count = {}
    for (action, count) in action_count:
        gateway = get_gateway_from_action(action)
        if gateway is not None:
            gateway_count[gateway] = gateway_count.get(gateway, 0) + count
    return gateway_count

In [None]:
# convert dictionary to keys and values arrays sorted by values
def dic2list_sorted_by_value(dic):
    keys = list(dic.keys())
    values = list(dic.values())
    values, keys = zip(*sorted(zip(values, keys))[::-1])
    return values, keys

In [None]:
# convert dictionary to keys and values arrays sorted by keys
def dic2list_sorted_by_keys(dic):
    keys = list(dic.keys())
    values = list(dic.values())
    values, keys = zip(*sorted(zip(keys, values)))
    return values, keys

In [None]:
# draw bar chart using plot.ly
def draw_bar_chart(data, legends=None, colors=None, title=None, ylabel=None):
    
    total = {}
    for dic in data:
        for key in dic:
            if key in total:
                total[key] += dic[key]
            else:
                total[key] = dic[key]
                
    _, ordered_keys = dic2list_sorted_by_value(total)
    
    plot = []
    
    for i in range(len(data)):
        counts = [data[i][key] for key in ordered_keys]
        color = None if colors is None else colors[i]
        
        plot.append(go.Bar(
            x=ordered_keys,
            y=counts,
            name=legends[i],
            marker=dict(
                color=color
            )
        ))
        
    layout = go.Layout(
        barmode='stack',
        title=title,
        yaxis=dict(title=ylabel),
        showlegend=True
    )

    fig = go.Figure(data=plot, layout=layout)
    py.iplot(fig, filename='stacked-bar')

In [None]:
# obtain all the available gateways
def get_all_gateways():
    result = es.search(index='lognormal_index', body={
        'size': 0,
        "aggs": {
            "fields": {
              "nested": {
                "path": "fields"
              },
              "aggs": {
                "filter_mbaasgateway": {
                  "filter": {
                    "term": {
                      "fields.key": "mbaasgateway"
                    }
                  },
                  "aggs": {
                    "gateways": {
                      "terms": {
                        "field": "fields.text",
                        "size": 1000,
                      }
                    }
                  }
                }
              }
            }
        }
    })

    gateways = [bucket['key'] for bucket in result['aggregations']['fields']['filter_mbaasgateway']['gateways']['buckets']]
    return gateways

In [None]:
# obtain all the actions of a gateway
def get_gateway_actions(gateway):
    result = es.search(index='lognormal_index', body={
        "size": 0, 
          "query": {
            "nested": {
              "path": "fields",
              "query": {
                "bool": {
                  "filter": {
                    "bool": {
                      "must": [
                        {
                          "match": {
                            "fields.key": "mbaasgateway"
                          }
                        },
                        {
                          "match": {
                            "fields.text": gateway
                          }
                        }
                      ]
                    }
                  }
                }
              }
            }
          },
          "aggs": {
            "group_by_action": {
              "terms": {
                "field": "action",
                "size": 1000
              }
            }
          }
    })

    action = [bucket['key'] for bucket in result['aggregations']['group_by_action']['buckets']]
    return action

# Number of Requests per Organization

In [None]:
# return number of requests for all gateways
def get_all_request_counts(failed=False, start_date=None, end_date=None):
    search_body = {
      "size": 0,
      "query": {
        "bool": {
          "filter": {
            "range": {
              "create_date": {
                "gte": start_date,
                "lte": end_date,
                "format": "MM/yyyy"
              }
            }
          }
        }
      },
      "aggs": {
        "group_by_action": {
          "terms": {
            "field": "action",
            "size": 1000
          },
          "aggs": {
            "count_distinct_requestid": {
              "cardinality": {
                "field": "requestid"
              }
            }
          }
        }
      }
    }
        
    if failed:
        search_body['query']['bool']['must'] = [
            {
              "nested": {
                "path": "params",
                "query": {
                  "bool": {
                    "must_not": [
                      {
                        "term": {
                          "params.code": {
                            "value": "200.0"
                          }
                        }
                      }
                    ],
                    "must": [
                      {
                        "exists": {
                          "field": "params.code"
                        }
                      }
                    ]
                  }
                }
              }
            }
        ]
        
    result = es.search(index='lognormal_index', body=search_body)
    action_count = []
    for bucket in result['aggregations']['group_by_action']['buckets']:
        action = bucket['key']
        count = bucket['count_distinct_requestid']['value']
        action_count.append((action, count))
    
    gateway_count = action_to_gateway_count(action_count)
    return gateway_count

In [277]:
# draw number of requests for all gateways
# possible modes: both (f and s), failed, success, ratio
def draw_request_counts(mode='both', start_date=None, end_date=None):
    postfix = (' from %s' % start_date if start_date else '') + (' to %s' % end_date if end_date else '')
    total = get_all_request_counts(start_date=start_date, end_date=end_date)
    failed = get_all_request_counts(failed=True, start_date=start_date, end_date=end_date)
    
    total = pd.DataFrame.from_dict(
        total, orient='index', columns=['count']
    ).sort_values(by='count', ascending=False)
    
    failed = pd.DataFrame.from_dict(
        failed, orient='index', columns=['count']
    ).sort_values(by='count', ascending=False)

    if mode == 'failed':
        failed.rename(columns={'count': 'Unsuccessful'}).iplot(
            kind='bar', 
            yTitle='Number of Requests', 
            title='Number of unsuccessful requests' + postfix, 
            colors=['crimson']
        )

    elif mode == 'ratio':
        ratio = failed.div(total, fill_value=0).sort_values(by='count', ascending=False)
        ratio.rename(columns={'count': 'Failure'}).iplot(
            kind='bar', 
            title='Fraction of failed requests' + postfix, 
            colors=['orangered']
        )

    else:        
        successful = total.sub(failed, fill_value=0)

        if mode == 'successful':
            successful.rename(columns={'count': 'Successful'}).iplot(
                kind='bar', 
                yTitle='Number of Requests', 
                title='Number of successful requests' + postfix, 
                colors=['royalblue']
            )

        elif mode == 'both':
            successful.rename(
                columns={'count': 'Successful'}
            ).join(
                failed.rename(columns={'count': 'Unsuccessful'})
            ).iplot(
                kind='bar', 
                barmode='stack',
                sortbars=True,
                yTitle='Number of Requests', 
                title='Number of successful/unsuccessful requests' + postfix, 
                colors=['royalblue', 'crimson']
            )

# Statistics of Execution Time per Organization

In [None]:
def get_runtime_stats(start_date=None, end_date=None):
    result = es.search(index='lognormal_index', body={
      "size": 0,
      "query": {
        "bool": {
          "filter": {
            "range": {
              "create_date": {
                "gte": start_date,
                "lte": end_date,
                "format": "MM/yyyy"
              }
            }
          }
        }
      },
      "aggs": {
        "fields": {
          "nested": {"path": "fields"},
          "aggs": {
            "filter_mbaasgateway": {
              "filter": {"term": {"fields.key": "mbaasgateway"}},
              "aggs": {
                "gateways": {
                  "terms": {
                    "field": "fields.text",
                    "size": 1000,
                    "collect_mode": "breadth_first"
                  },
                  "aggs": {
                    "reverese": {
                      "reverse_nested": {},
                      "aggs": {
                        "fields_agg": {
                          "nested": {"path": "fields"},
                          "aggs": {
                            "filter_execTime": {
                              "filter": {"term": {"fields.key": "executionTime"}},
                              "aggs": {
                                "quantiles": {
                                  "percentiles": {
                                    "field": "fields.value",
                                    "percents": [1, 25, 50, 75, 99]
                                  }
                                }
                              }
                            }
                          }
                        }
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    })
    
    runtime_stats = {}
    labels = []
    for bucket in result['aggregations']['fields']['filter_mbaasgateway']['gateways']['buckets']:
        if bucket['reverese']['fields_agg']['filter_execTime']['doc_count'] > 0:
            key = bucket['key']
            runtime_stats[key] = bucket['reverese']['fields_agg']['filter_execTime']['quantiles']['values']
            labels.append(key)
    
    return runtime_stats

In [None]:
def draw_runtime_stats(start_date=None, end_date=None):
    quantiles = ['1.0', '25.0', '50.0', '50.0', '75.0', '99.0']
    plot = []

    data_stats = get_runtime_stats(start_date, end_date)

    for gateway in data_stats:
        data = [data_stats[gateway][q] for q in quantiles]
        trace = go.Box(
            y=data,
            name=gateway,
            boxpoints=False
        )
        plot.append(trace)
        
    date_postfix = (' from %s' % start_date if start_date else '') + (' to %s' % end_date if end_date else '')

    layout = go.Layout(
        title="Statistics of request execution time per organization" + date_postfix,
        yaxis=dict(title="Execution Time (ms)"),
        showlegend=False
    )

    fig = go.Figure(data=plot, layout=layout)
    py.iplot(fig)

# Number of Requests per month

In [None]:
# return number of requests for a specified gateway
def get_request_counts_by_gateway(gateway=None, interval='month', failed=False, start_date=None, end_date=None):
    search_body = {
      "size": 0, 
      "query": {
        "bool": {
          "filter": [
            {
              "range": {
                "create_date": {
                  "gte": start_date,
                  "lte": end_date,
                  "format": "MM/yyyy"
                }
              }
            }
          ]
        }
      },
      "aggs": {
        "monthly": {
          "date_histogram": {
            "field": "create_date",
            "interval": interval,
            "format": "yyyy-MM-dd"
          },
          "aggs": {
            "request_count": {
              "cardinality": {
                "field": "requestid"
              }
            }
          }
        }
      }
    }
    
    if gateway:
        actions = get_gateway_actions(gateway)
        search_body['query']['bool']['filter'].append(
            {
              "terms": {
                "action": actions
              }
            }
        )
    
    if failed:
        search_body['query']['bool']['must'] = [
            {
              "nested": {
                "path": "params",
                "query": {
                  "bool": {
                    "must_not": [
                      {
                        "term": {
                          "params.code": {
                            "value": "200.0"
                          }
                        }
                      }
                    ],
                    "must": [
                      {
                        "exists": {
                          "field": "params.code"
                        }
                      }
                    ]
                  }
                }
              }
            }
        ]
#     print(search_body)
    result = es.search(index='lognormal_index', body=search_body)
    date_count = {
        bucket['key_as_string']: bucket['request_count']['value'] for bucket in result['aggregations']['monthly']['buckets']
    }
    return date_count

In [None]:
# draw number of requests over time for a specified gateway
def draw_request_counts_by_gateway(gateway=None, mode='both', interval='month', start_date=None, end_date=None):
    
    total = get_request_counts_by_gateway(gateway, start_date=start_date, end_date=end_date, interval=interval)
    failed = get_request_counts_by_gateway(gateway, failed=True, start_date=start_date, end_date=end_date, interval=interval)
    
    for key in total:
        if key not in failed:
            failed[key] = 0
            
    successful = {key: total[key] - failed[key] for key in total}
    
    postfix = '' if gateway is None else ' for "%s"' % gateway
            
    if mode == 'failed':
        draw_bar_chart(
            title='Number of failed requests over time' + postfix, 
            ylabel='Number of Requests', 
            data=[failed], 
            legends=['Failed'], 
            colors=['crimson']
        )
    elif mode == 'ratio':
        failure_ratio = {key: failed[key] / total[key] for key in total}
        draw_bar_chart(
            title='Fraction of failed requests over time' + postfix,
            data=[failure_ratio], 
            legends=['Failure Rate'], 
            colors=['orangered']
        )
    elif mode == 'success':
        draw_bar_chart(
            title='Number of successful requests over time' + postfix,
            ylabel='Number of Requests', 
            data=[successful], 
            legends=['Successful'], 
            colors=['royalblue']
        )
    elif mode == 'both':
        draw_bar_chart(
            title='Number of requests over time' + postfix,
            ylabel='Number of Requests', 
            data=[successful, failed], 
            legends=['Successful', 'Unsuccessful'], colors=['royalblue', 'crimson']
        )

In [None]:
def get_runtime_stats_by_gateway(gateway=None, interval='month', start_date=None, end_date=None):

    search_body = {
      "size": 0, 
      "query": {
        "bool": {
          "filter": [
            {
              "range": {
                "create_date": {
                  "gte": start_date,
                  "lte": end_date,
                  "format": "MM/yyyy"
                }
              }
            }
          ]
        }
      },
      "aggs": {
        "monthly": {
          "date_histogram": {
            "field": "create_date",
            "interval": interval,
            "format": "yyyy-MM-dd"
          },
          "aggs": {
            "fields_agg": {
              "nested": {
                "path": "fields"
              },
              "aggs": {
                "filter_execTime": {
                  "filter": {
                    "term": {
                      "fields.key": "executionTime"
                    }
                  },
                  "aggs": {
                    "quantiles": {
                      "percentiles": {
                        "field": "fields.value",
                        "percents": [
                          1,
                          25,
                          50,
                          75,
                          99
                        ]
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
    
    if gateway:
        actions = get_gateway_actions(gateway)
        search_body['query']['bool']['filter'].append(
            {
              "terms": {
                "action": actions
              }
            }
        )
    
    result = es.search(index='lognormal_index', body=search_body)
    runtime_stats = []
    for bucket in result['aggregations']['monthly']['buckets']:
        if bucket['fields_agg']['filter_execTime']['doc_count'] > 0:
            key = bucket['key_as_string']
            runtime_stats.append((key, bucket['fields_agg']['filter_execTime']['quantiles']['values']))
    
    return runtime_stats

In [None]:
def draw_runtime_stats_by_gateway(gateway=None, mode='box', interval='month', start_date=None, end_date=None):
    runtime_stats = get_runtime_stats_by_gateway(gateway, start_date=start_date, end_date=end_date, interval=interval)
    postfix = '' if gateway is None else ' for "%s"' % gateway
    
    if mode == 'box':
        plot = []
        quantiles = ['1.0', '25.0', '50.0', '50.0', '75.0', '99.0']

        for key,value in runtime_stats:

                data = [value[q] for q in quantiles]
                trace = go.Box(
                    y=data,
                    name=key,
                    boxpoints=False
                )
                plot.append(trace)
        
        layout = go.Layout(
            title="Statistics of request execution time over time" + postfix,
            yaxis=dict(title="Execution Time (ms)"),
            showlegend=False
        )

    elif mode == 'bar':
        x = [key for key,value in runtime_stats]
        y = [value['50.0'] for key,value in runtime_stats]
            
        plot = [go.Scatter(x=x,y=y)]
        layout = go.Layout(
            title="Median execution time for '%s' requests" % gateway,
            yaxis=dict(title="Median Execution Time (ms)"),
            showlegend=False
        )
    else: return
        
    fig = go.Figure(data=plot, layout=layout)
    py.iplot(fig)

# Number of Users

In [None]:
def get_user_count_by_gateway(gateway=None):
    search_body = {
      "size": 0, 
      "aggs": {
        "monthly": {
          "date_histogram": {
            "field": "create_date",
            "interval": "day",
            "format": "yyyy-MM-dd"
          },
          "aggs": {
            "count_users": {
              "cardinality": {
                "field": "userid"
              }
            },
            "cumulative_users": {
              "cumulative_sum": {
                "buckets_path": "count_users"
              }
            }
          }
        }
      }
    }
    
    if gateway:
        actions = get_gateway_actions(gateway)
        search_body['query'] = {
            "bool": {
                "filter": {
                    "terms": {
                        "action": actions
                    }
                }
            }
        }
        
    result = es.search(index='lognormal_index', body=search_body)
    date_count = {
        bucket['key_as_string']: bucket['cumulative_users']['value'] for bucket in result['aggregations']['monthly']['buckets']
    }
    return date_count

In [None]:
def draw_user_count_prediction(gateway=None):
    
    title='Number of users over time' + '' if gateway is None else ' for "%s"' % gateway
    
    total = get_user_count_by_gateway(gateway)
    keys, values = dic2list_sorted_by_keys(total)
    data = pd.DataFrame(values, keys, columns=['Observed'])
    data.index = pd.to_datetime(data.index)
    # data.iplot()
    stepwise_model = auto_arima(
        data, start_p=0, start_q=0,
        max_p=5, max_q=5, m=1,
        seasonal=False, trace=False,
        error_action='ignore',  
        suppress_warnings=True, 
        stepwise=True
    )
    
    train_idx = int(len(data) * 0.7)
    train = data[:train_idx]
    test = data[train_idx:]

    stepwise_model.fit(train)
    future_forecast = stepwise_model.predict(n_periods=len(test))
    future_forecast = pd.DataFrame(future_forecast,index=test.index,columns=['Predicted'])
    future_forecast[future_forecast<0] = 0
    
    pd.concat([data,future_forecast],axis=1).iplot(title=title)

# Predicting Request Trends

In [None]:
def draw_request_count_prediction(gateway=None, interval='day', start_date=None, end_date=None):
    total = get_request_counts_by_gateway(gateway, start_date=start_date, end_date=end_date, interval=interval)
    postfix = '' if gateway is None else ' for "%s"' % gateway
    title='Number of requests over time' + postfix
    total = get_request_counts_by_gateway(gateway=gateway, interval=interval, start_date=start_date, end_date=end_date)
    keys, values = dic2list_sorted_by_keys(total)
    data = pd.DataFrame(values, keys, columns=['Observed'])
    data.index = pd.to_datetime(data.index)
    # data.iplot()
    stepwise_model = auto_arima(
        data, start_p=0, start_q=0,
        max_p=5, max_q=5, m=7,
        start_P=0, max_P=5, max_Q=5, seasonal=True,
        d=0, D=1, trace=False,
        error_action='ignore',  
        suppress_warnings=True, 
        stepwise=True
    )
    
    train_idx = int(len(data) * 0.6)
    train = data[:train_idx]
    test = data[train_idx:]

    stepwise_model.fit(train)
    future_forecast = stepwise_model.predict(n_periods=len(test))
    future_forecast = pd.DataFrame(future_forecast,index=test.index,columns=['Predicted'])
    future_forecast[future_forecast<0] = 0
    
    pd.concat([data,future_forecast],axis=1).iplot(title=title)