# Retired Material from Edris's Analytics on Trace Data

When a particular avenue of testing is unproductive or otherwise abandoned, it is migrated to this workbook.

In [1]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan, bulk
from datetime import datetime
from pathlib import Path
import csv
import functools
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.axes as ax
import timeit
import pyarrow.parquet as pq

# Enable verbose errors, to make debugging easier
%xmode Verbose

PROJECT_ROOT = Path.cwd()

Exception reporting mode: Verbose


## Testing and Custom Modules

As I'm building out functionality, I will first test it directly in this notebook and, as it becomes well-defined, I'll be factoring it out into independent modules. As those modules are added, they'll be included below and I'll be using the autoreload IPython extension so that I don't need to restart my IPython Kernel every time I update a module.

| File | Content |
| ---- | ------- |
| es_queries.py | ElasticSearch queries. |
| query_analysis.py | Analysis of ES queries. |

In [3]:
%load_ext autoreload
%autoreload 2

import src.es_queries as esq
import src.query_analysis as qa
import src.make_network as mn
import src.draw_network as dn

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Connecting to ElasticSearch

The login method takes a username and password or a filename. If none are provided, the filename defaults to `credentials.key`, but all `.key` files are in `.gitignore` for security reasons. As such, to use that file, it will need to be recreated whenever this repo is cloned and populated with the username on the first line and password on the second line.

In [4]:
es = esq.es_login()

Connection Successful


## Getting Unique Pairs of PerfSonar nodes

My initial ElasticSearch queries used `trace_derived` and exclude any pair of PerfSonar nodes that don't have more than $1$ hop and don't have at least $1000$ records.

### Attempt 1

Aggregate `trace_derived` based on first the sources, then within each source, aggregate on destination. This quickly hit the bucket limit, such that I could only pull $60$ sources and $59$ destinations (when in reality there are over $400$ of each), for a total of $3,540$ pairs (when in reality there are closer to $35,000$).

In [5]:
unique_pairs = esq.get_unique_pairs(es)

In [6]:
ps_pairs = []
for record in unique_pairs['aggregations']['sources']['buckets']:
    for bucket in record['destinations']['buckets']:
        ps_pairs.append((record['key'], bucket['key']))

In [7]:
len(ps_pairs)

3540

### Attempt 2

I aggregated sources and destinations separately. This allowed me to get all sources and all destinations, but didn't give me information about which were connected.

### Attempt 3 (SUCCESS)

I removed the query to pull data from `trace_derived` in full. In order to do this, I used `scan` in place of `search`, which returns a generator instead of a dictionary. I could then iterate through this generator to extract the source/destination pairs that meet my criteria and log those that do not.

In [8]:
td_scan = esq.trace_derived_scan(es)

In [9]:
qa.reset_vars()
qa.make_ps_pairs(td_scan)

Successfully identified 24503 perfSONAR pairs.


### Iterating over `ps_trace`

With the `qa` module now containing a set of valid `ps_pairs`, we can now pull data regarding those pairs from `ps_trace`. As `ps_trace` is huge, we only ever pull a small time period.

In [None]:
start = datetime(year=2020, month=7, day=7, hour=8).strftime("%Y-%m-%dT%H:%M:%S.000Z")
end = datetime(year=2020, month=7, day=7, hour=11).strftime("%Y-%m-%dT%H:%M:%S.000Z")
ps_trace = esq.ps_trace_scan(es, start, end)

## Stable Routes
### Attempt 1

Iterating over the generator, we consider only records that have previously identified perfsonar pairs and keep them only if we are only able to find a single route_sha1 across all records for that pair in the given time frame.

In [None]:
qa.stable_routes(ps_trace)
routes = qa.get_stable_routes()

### Save Scan Results

Save a day worth of data from `ps_trace` to a parquet file for each of seven days.

In [None]:
for i in range(7):
    start = datetime(year=2020, month=7, day=(7 + i), hour=0).strftime("%Y-%m-%dT%H:%M:%S.000Z")
    end = datetime(year=2020, month=7, day=(8 + i), hour=0).strftime("%Y-%m-%dT%H:%M:%S.000Z")
    scan_gen = esq.scan_gen(esq.ps_trace_scan(es, start, end))
    filename = 'ps_trace' + str(i) + '.pa'
    qa.save_data(scan_gen, filename)

### Attempt 2

Iterate over 7 days worth of data. For each route, tracking whether it has changed and how long it lasted in the previous sha.

## Edge Frequency Dataframe

I decided to go with a list, as it was pointless to convert back to a dataframe as below. 

In [None]:
edges_df = pd.DataFrame(list(edges.items()), columns=['Edge', 'Count'])
edges_df = edges_df.sort_values('Count', ascending=False)
edges_df.head()

# Testing

Below this point are disorganized snippets of code used while testing various features.

In [None]:
def try_eval_ps_trace():
    start = datetime(year=2020, month=7, day=7, hour=8).strftime("%Y-%m-%dT%H:%M:%S.000Z")
    end = datetime(year=2020, month=7, day=7, hour=9).strftime("%Y-%m-%dT%H:%M:%S.000Z")
    ps_trace = esq.ps_trace_scan(es, start, end)
    qa.route_changes(ps_trace)

In [None]:
print(timeit.timeit('try_eval_ps_trace()', setup="from __main__ import try_eval_ps_trace", number=1))
# qa.get_ps_pairs()

In [None]:
# A record of each route and whether 
route_life = qa.route_life(60)

In [None]:
route_change = {}
for route in route_life:
    if route_life[route] in route_change:
        route_change[route_life[route]] = route_change[route_life[route]] + 1
    else:
        route_change[route_life[route]] = 1

route_change

In [None]:
fig, ax = plt.subplots(figsize=(15,10))
ax.scatter([x for x in route_change.keys()], [y for y in route_change.values()])
plt.show()

In [None]:
qa.get_route_changes()

In [None]:
start = datetime(year=2020, month=7, day=7, hour=8).strftime("%Y-%m-%dT%H:%M:%S.000Z")
end = datetime(year=2020, month=7, day=7, hour=9).strftime("%Y-%m-%dT%H:%M:%S.000Z")
ps_trace = esq.ps_trace_scan(es, start, end)
ps_gen = esq.scan_gen(ps_trace)

In [None]:
df = pd.read_parquet(str(PROJECT_ROOT / 'data' / 'route_changes.pa'))

In [None]:
df['Life'] = (7 * 24 / df['changetimes'].str.len())
df = df.rename(columns={'changetimes':'Paths'})
dfl = df.drop(['dest', 'sha', 'src'], axis=1)

In [None]:
dfl = dfl.groupby(['Life']).count()

In [None]:
dfl = dfl.reset_index()

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(15, 10))
plt.rcParams.update({'font.size': 18})
dfl.plot(x = 'Life', y = 'Paths', logy = True, kind = 'scatter', ax = ax)
# dfl.plot(x = 'Life', y = 'Paths', kind = 'scatter', logx = True, logy = True, ax = ax[1])
# ax[0].set_xlim(170, 0)
ax.set_xlabel('Life of a path in hours')
ax.set_ylabel('Paths count (' + str(dfl['Paths'].sum()) + ' total) - log scale')
ax.set_title('Over 7 days (168 hrs), how long does any path between PS nodes stays stable?')
# ax[1].set_xlim(170, 0)
# ax[1].set_xlabel('Life of a path in hours (log scale)')
# ax[1].set_ylabel('Paths count (log scale)')
plt.gca().invert_xaxis()
plt.show()

In [None]:
dfn = dfl.copy()
dfn['Paths'] = dfn['Paths'] / dfn['Paths'].sum()
# dfn['Life'] = dfn['Life'] / dfn['Life'].max()

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(15, 10))
plt.rcParams.update({'font.size': 18})
dfn.plot(x = 'Life', y = 'Paths', kind = 'scatter', ax = ax[0])
dfn.plot(x = 'Life', y = 'Paths', kind = 'scatter', logx = True, logy = True, ax = ax[1])
ax[0].set_xlabel('Life of a path (hours)')
ax[0].set_ylabel('Paths (count)')
ax[0].set_title('Over 7 days (168 hrs), how long does any path between PS nodes stays stable?')
ax[1].set_xlabel('Life of a path (hours) - log scale')
ax[1].set_ylabel('Paths (count) - log scale')
plt.show()

In [None]:
def chain(*iters):
    for iterable in iters:
        yield from iterable

In [None]:
all_hops = []
for i in range(7):
    pst = pd.read_parquet(str(PROJECT_ROOT / 'data' / 'ps_trace' + str(i) + '.pa'))
    hops = pst['hops'].tolist()
    hops = pd.core.common.flatten(hops)
    all_hops = chain(all_hops, hops)

In [None]:
hops_df = pd.DataFrame(list(all_hops), columns=['Hops'])

In [None]:
hop_counts = hops_df['Hops'].value_counts()

In [None]:
# hop_c2 = pd.DataFrame(hop_counts).reset_index().rename(columns={'index':'Node', 'Hops': 'Count'})
hop_c2.head(n=100)

In [None]:
def record_count():
    with open('test.csv', 'w', newline='') as csvfile:
        fieldnames = ['src', 'dest', 'route-sha1', 'hops', 'timestamp']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writerow({'src':'src', 'dest':'dest', 'route-sha1':'route-sha1', 'hops':'hops', 'timestamp':'timestamp'})
        
        start = datetime(year=2020, month=7, day=7, hour=8).strftime("%Y-%m-%dT%H:%M:%S.000Z")
        end = datetime(year=2020, month=7, day=7, hour=9).strftime("%Y-%m-%dT%H:%M:%S.000Z")
        ps_trace = esq.ps_trace_scan(es, start, end)
        records = 0
    #     df = pd.DataFrame(data=next(ps_trace)["_source"])
        for trace in esq.scan_gen(ps_trace):
    #         df.append(trace['_source'], ignore_index=True)
            writer.writerow(trace)
            records += 1
            if not records % 1000:
                print(records)
        print(records)
    #     return df
    
print(timeit.timeit('df = record_count()', setup="from __main__ import record_count", number=1))

In [None]:
def record_count():
    data = []
    start = datetime(year=2020, month=7, day=7, hour=8).strftime("%Y-%m-%dT%H:%M:%S.000Z")
    end = datetime(year=2020, month=7, day=7, hour=9).strftime("%Y-%m-%dT%H:%M:%S.000Z")
    ps_trace = esq.ps_trace_scan(es, start, end)
    records = 0
#     df = pd.DataFrame(data=next(ps_trace)["_source"])
    for trace in esq.scan_gen(ps_trace):
#         df.append(trace['_source'], ignore_index=True)
        data.append(trace)
        records += 1
        if not records % 1000:
            print(records)
    print(records)
    df = pd.DataFrame(data)
    df.to_parquet(str(PROJECT_ROOT / 'data' / 'test.pa'), engine='pyarrow')
#     return df
    
print(timeit.timeit('df = record_count()', setup="from __main__ import record_count", number=1))

In [None]:
# df = pd.read_csv('test.csv')
df = pd.read_parquet(str(PROJECT_ROOT / 'data' / 'test.pa'), engine='pyarrow')

In [None]:
df

In [None]:
start = datetime(year=2020, month=7, day=7, hour=8).strftime("%Y-%m-%dT%H:%M:%S.000Z")
end = datetime(year=2020, month=7, day=14, hour=8).strftime("%Y-%m-%dT%H:%M:%S.000Z")
ps_trace = esq.ps_trace_scan(es, start, end, include=["hops"])

In [None]:
df

In [None]:
%debug

### Evaluating a larger dataset

Creating a generator spanning a day of data, I can identify routes 

In [None]:
start = datetime(year=2020, month=7, day=8, hour=8).strftime("%Y-%m-%dT%H:%M:%S.000Z")
end = datetime(year=2020, month=7, day=9, hour=8).strftime("%Y-%m-%dT%H:%M:%S.000Z")
ps_trace = esq.ps_trace_scan(es, start, end)

In [None]:
changed = {}
removed_edges = {}
added_edges = {}

for trace in ps_trace:
    trace = trace['_source']
    if not trace['src_production'] or not trace['dest_production']:
        continue
    try:
        src = trace['src']
        dest = trace['dest']
        sha = trace['route-sha1']
        hops = trace['hops']
        max_rtt = trace['max_rtt']
        looping = trace['looping']
        time = trace['timestamp']
    except KeyError:
        continue

    if (src, dest) in routes and sha != routes[(src, dest)]['sha'] and max_rtt > routes[(src, dest)]['max_rtt']:
        og_path_edges = []
        new_path_edges = []
        removed = []
        added = []

        for i in range(len(routes[(src, dest)]['hops']) - 1):
            s = routes[(src, dest)]['hops'][i]
            d = routes[(src, dest)]['hops'][i+1]
            og_path_edges.append((s, d))
            og_path_edges.append((d, s))
        
        for i in range(len(hops) - 1):
            s = hops[i]
            d = hops[i+1]
            new_path_edges.append((s, d))
            new_path_edges.append((d, s))

        for edge in og_path_edges:
            if edge not in new_path_edges:
                removed.append(edge)
                if edge in removed_edges:
                    removed_edges[edge]['count'] = removed_edges[edge]['count'] + 1
                    removed_edges[edge]['time'].append(time)
                else:
                    removed_edges[edge] = {'count': 1, 'time': [time]}
        
        for edge in new_path_edges:
            if edge not in og_path_edges:
                added.append(edge)
                if edge in added_edges:
                    added_edges[edge]['count'] = added_edges[edge]['count'] + 1
                    added_edges[edge]['time'].append(time)
                else:
                    added_edges[edge] = {'count': 1, 'time': [time]}
        
        changed[(src, dest)] = {'added': added, 'removed': removed}

In [None]:
def plot_traceroute_changes(edges_impacted):
    fig, ax = plt.subplots(figsize=(15, 7))
    ax.set_title('Edges removed from formerly stable paths')
    ax.set_ylabel('Number of paths an edge was formerly on')
    ax.set_xlabel('Edges formerly on paths')
    ax.bar([str(edge) for edge in edges_impacted.keys()], [edges_impacted[edge]['count'] for edge in edges_impacted.keys()])
    ax.tick_params(labelrotation=20)
#     ax.set_yticks([i for i in range(int(max(edges_impacted.values()))+1)])

#     file_name = 'Traceroute-' + str(edge_removed[0]) + '_' + str(edge_removed[1]) + '.png'
#     plt.savefig(str(PROJECT_ROOT / 'reports' / 'figures' / file_name))
    plt.show()

In [None]:
plot_traceroute_changes(removed_edges)