In [1]:
# Enables figures loading outside of browser.
# If not run, figures will load inline.
%matplotlib

import os
import math
import pandas as pd
import numpy as np
import matplotlib.dates as dates
import matplotlib.pyplot as plt
import matplotlib.ticker
import datetime
import collections
import itertools

from scipy import stats

# Depends on: pip install sklearn
from sklearn.model_selection import train_test_split

# Some matplotlib features are version dependent.
assert(matplotlib.__version__ >= '2.1.2')

# Depends on: pip install --upgrade google-cloud-bigquery
from google.cloud import bigquery

def run_query(query, project='mlab-sandbox'):
    client = bigquery.Client(project=project)
    job = client.query(query)

    results = collections.defaultdict(list)
    for row in job.result(timeout=3000):
        for key in row.keys():
            results[key].append(row.get(key))

    return pd.DataFrame(results)

def unlog(x, pos):
    """Formats the x axis for histograms taken on the log of values."""
    v = math.pow(10, x)
    frac, whole = math.modf(v)
    if frac > 0:
        return '%.1f' % v
    else:
        return '%d' % whole
    
    
def hist(vals, bin_count, log=True, cdf=False):
    """Produces hist or cdf values for smooth plots."""
    if log:
        r = [math.log10(x) for x in vals]
    else:
        r = vals
        
    m, bins = np.histogram(r, bin_count, normed=True)
    m = m.astype(float)

    tops = m
    if cdf:
        tops = np.cumsum(m)
        total = sum(m)
        tops = [float(t) / total for t in tops ]
    
    return tops, bins


logFormatter = matplotlib.ticker.FuncFormatter(unlog)

Using matplotlib backend: MacOSX


In [2]:
def plot_df(
    df, xname='', yname='',
    cname='', bins=None, cdf=False,
    fig_by='', axes_by='', group_by='',
    figsize=(6,8), axes=(1,1),
    label='{group}',
    xlabel='', ylabel='',
    xlim=(), ylim=(),
    fx=list, fy=list,
    xlog=False, ylog=False,
    suptitle='', title='', legend={}, figmap=None, log=None, fxn=None):
    """
    Args:
        df: pandas.DataFrame,
        xname: str, name of column to use as x-axis.
        yname: str, name of column to use as y-axis.
        cname: str, name of column to use as data source.
        cdf: bool,
        bins: int or callable,
        fig_by: str, name of column to split data into multiple figures.
        axes_by: str, name of column to arrange into a single panel.
        group_by: str, name of column to plot common split_by and group_by columns.
        figsize: (int, int), dimensions of figure.
        axes: (int, int), arrangement of axes within figure.
        label: str,
        xlabel: str,
        ylabel: str,
        xlim: (xmin, xmax),
        ylim: (ymin, ymax),
        fx: func,
        fy: func,
        xlog: bool,
        ylog: bool,
        suptitle: str,
        title: str,
        legend: **legend_args,
        figmap: returned from a previous run of plot_df, used to overlay values
            from multiple data frames. Must use the same fig_by, axes_by, and
            group_by values.
        log: bool,
        f: callable,
    Returns:
      dict of str to (figures, axes) tuples
    """
    def info(f):
        if log:
            print f

    if figmap is None:
        info('new figmap')
        figmap = {}
    scatter = None
    if (xname and yname):
        scatter = True
    if cname:
        scatter = False
    if scatter is None:
        raise Exception('Provide xname and yname or cname')
    
    for f in sorted(set(['default'] if not fig_by else df[fig_by])):
        if f in figmap:
            info('reusing figmap for %s' % f)
            fig, ax = figmap[f]
        else:
            fig = plt.figure(figsize=figsize)
            ax = fig.subplots(axes[0], axes[1], squeeze=False)
            info('saving figmap for %s' % f)
            figmap[f] = (fig, ax)
        ax_index = list(itertools.product(range(axes[0]), range(axes[1])))
        
        df_fig = df if f == 'default' else df[df[fig_by] == f]
        for p, a in enumerate(sorted(set(['default'] if not axes_by else df_fig[axes_by]))):

            df_axes = df_fig if a == 'default' else df_fig[df_fig[axes_by] == a]
            if p >= len(ax_index):
                print 'SKIPPING', p, f, a, 'too few axes positions'
                continue
                
            i, j = ax_index[p]
            for g in sorted(set(['default'] if not group_by else df_axes[group_by])):
                df_g = df_axes if g == 'default' else df_axes[df_axes[group_by] == g]

                if scatter:
                    x = fx(df_g[xname])
                    y = fy(df_g[yname])
                    l = label.format(figure=f, axis=a, group=g)
                    ax[i][j].scatter(x, y, s=1, label=l)
                else:
                    r = df_g[cname]
                    if bins is None:
                        size = int(math.sqrt(len(r)))
                    else:
                        size = bins(r)
                    if fxn:
                        fxn(r, figure=f, axis=a, group=g, size=size)
                    info("%s %s %s %s %s" % (f, a, g, size, len(r)))
                    h_tops, h_bins = hist(r, size, log=xlog , cdf=cdf)
                    l = label.format(figure=f, axis=a, group=g, size=size)
                    ax[i][j].plot(h_bins[:-1], h_tops, label=l)

            if i != len(ax)-1:
                ax[i][j].set_xticklabels([])

            if title:
                ax[i][j].set_title(title.format(figure=f, axis=a, group=g))
            if ylabel:
                ax[i][j].set_ylabel(ylabel.format(figure=f, axis=a, group=g))
            if xlabel:
                ax[i][j].set_xlabel(xlabel.format(figure=f, axis=a, group=g))

            if xlim:
                ax[i][j].set_xlim(xlim)
            if ylim:
                ax[i][j].set_ylim(ylim)

            ax[i][j].grid(color='#dddddd')
            ax[i][j].legend(fontsize='x-small', **legend)
            if scatter:
                ax[i][j].tick_params(axis='x', labelrotation=-90)
            if xlog:
                ax[i][j].xaxis.set_major_formatter(logFormatter)
            if ylog:
                ax[i][j].semilogy()

        if suptitle:
            fig.suptitle(suptitle.format(figure=f))
        fig.tight_layout(rect=[0, 0.03, 1, 0.95])

    return figmap

        
def plot_scatter(df, xname, yname, **kwargs):
    return plot_df(df, xname=xname, yname=yname, **kwargs)

    
def plot_hist(df, cname, bins=None, **kwargs):
    return plot_df(df, cname=cname, bins=bins, **kwargs)

# UPLINK UTILIZATION OVER TIME

In [133]:
df_disco_pct = run_query("""
#standardSQL
SELECT

  UPPER(REGEXP_EXTRACT(hostname, r'mlab1.([a-z]{3})[0-9]{2}.*')) AS metro,
  REGEXP_EXTRACT(hostname, r'mlab1.([a-z]{3}[0-9]{2}).*') AS site,
  FORMAT_TIMESTAMP("%Y-%m-%d", TIMESTAMP_TRUNC(sts, DAY)) AS day,
  UNIX_SECONDS(TIMESTAMP_TRUNC(sts, DAY)) AS ts,  
  0.8 * APPROX_QUANTILES(value, 101)[ORDINAL(50)] as bytes_50th,
  APPROX_QUANTILES(value, 101)[ORDINAL(90)] as bytes_90th

FROM (
  SELECT
    metric,
    REGEXP_EXTRACT(hostname, r'(mlab1.[a-z]{3}[0-9]{2}).*') AS hostname,
    sample.timestamp AS sts,
    sample.value AS value
  FROM
    `measurement-lab.base_tables.switch*`,
    UNNEST(sample) AS sample
  WHERE
    metric LIKE 'switch.octets.uplink.tx'
    AND REGEXP_CONTAINS(hostname, r"mlab1.(dfw|lga|nuq)\d\d")
  GROUP BY
    hostname, metric, sts, value
)
WHERE
  hostname IS NOT NULL
GROUP BY
  hostname, day, ts
ORDER BY
  hostname, day, ts
""")



In [137]:
plot_scatter(
    df_disco_pct, 'ts', 'bytes_50th',
    axes_by='metro', group_by='site', axes=(3, 1),
    suptitle='Daily Median Uplink Utilization',
    ylabel="Mbps",
    title='{axis}',
    xlim=(pd.to_datetime("2016-05-31"), pd.to_datetime("2018-08-01")),
    ylim=(1e4, 1e9),
    fx=lambda l: [pd.to_datetime(t, unit='s') for t in l],
    legend={'loc':3, 'ncol':7, 'columnspacing':1},
    ylog=True)

{'default': (<Figure size 600x800 with 3 Axes>,
  array([[<matplotlib.axes._subplots.AxesSubplot object at 0x7f8300c912d0>],
         [<matplotlib.axes._subplots.AxesSubplot object at 0x7f8301b1e050>],
         [<matplotlib.axes._subplots.AxesSubplot object at 0x7f8301b36310>]],
        dtype=object))}

# Daily DISCO discard ratios

In [138]:
df_disco_ratio = run_query("""
WITH measurementlab_switch_dedup AS (
  SELECT
    metric,
    REGEXP_EXTRACT(hostname, r'(mlab[1-4].[a-z]{3}[0-9]{2}).*') AS hostname,
    sample.timestamp AS sts,
    sample.value AS value
  FROM
    `measurement-lab.base_tables.switch*`,
    UNNEST(sample) AS sample
  WHERE
    (metric LIKE 'switch.discards.uplink.tx' OR metric LIKE 'switch.unicast.uplink.tx')
    AND REGEXP_CONTAINS(hostname, r"mlab1.(dfw|lga|nuq)\d\d")
  GROUP BY
    hostname, metric, sts, value
)

SELECT
  UPPER(REGEXP_EXTRACT(hostname, r'mlab1.([a-z]{3})[0-9]{2}.*')) AS metro,
  REGEXP_EXTRACT(hostname, r'mlab1.([a-z]{3}[0-9]{2}).*') AS site,
  hostname,
  day,
  ts,
  IF(total > 0, discards / total, 0) as ratio
FROM (
SELECT
  hostname,
  FORMAT_TIMESTAMP("%Y-%m-%d", TIMESTAMP_TRUNC(sts, DAY)) AS day,
  UNIX_SECONDS(TIMESTAMP_TRUNC(sts, DAY)) AS ts,
  SUM(IF(metric = "switch.discards.uplink.tx", value, 0)) AS discards,
  SUM(IF(metric = "switch.unicast.uplink.tx", value, 0)) AS total
FROM
  measurementlab_switch_dedup
WHERE
  hostname IS NOT NULL
GROUP BY
  hostname, day, ts
HAVING
  discards < total
ORDER BY
  hostname, day, ts
)
GROUP BY
  hostname, day, ts, ratio
HAVING
  ratio < 0.01
ORDER BY
  hostname, day, ts
""")



In [141]:
plot_scatter(
    df_disco_ratio, 'ts', 'ratio', axes_by='metro', group_by='site', axes=(3, 1),
    suptitle='Daily Packet Loss Ratio (discards / unicast)',
    ylabel="Ratio",
    title='{axis}',
    xlim=(pd.to_datetime("2016-05-31"), pd.to_datetime("2018-08-01")),
    ylim=(1e-6, 1e-1),
    fx=lambda l: [pd.to_datetime(t, unit='s') for t in l],
    legend={'loc':2},
    ylog=True)

{'default': (<Figure size 600x800 with 3 Axes>,
  array([[<matplotlib.axes._subplots.AxesSubplot object at 0x7f82e283ee90>],
         [<matplotlib.axes._subplots.AxesSubplot object at 0x7f8307a3cd10>],
         [<matplotlib.axes._subplots.AxesSubplot object at 0x7f830211d990>]],
        dtype=object))}

# NDT Median Download Rates

In [144]:
df_ndt_all = run_query("""
WITH mlab_ndt AS (
  SELECT
    UPPER(REGEXP_EXTRACT(connection_spec.server_hostname, r"([a-z]{3})[0-9]{2}")) as metro,
    REGEXP_EXTRACT(connection_spec.server_hostname, r"([a-z]{3}[0-9]{2})") as site,
    web100_log_entry.connection_spec.remote_ip as remote_ip,
    log_time,
    (8 * (web100_log_entry.snap.HCThruOctetsAcked / (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd))) AS download_mbps

  FROM
  
    `measurement-lab.base_tables.ndt*`
  WHERE

        REGEXP_CONTAINS(connection_spec.server_hostname, r"(lga|dfw|nuq)\d\d")
    AND web100_log_entry.snap.HCThruOctetsAcked >= 1000000
    AND (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd) >= 9000000
    AND (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd) < 600000000
    AND connection_spec.data_direction = 1
    AND web100_log_entry.connection_spec.remote_ip != "45.56.98.222"
    AND web100_log_entry.connection_spec.remote_ip != "2600:3c03::f03c:91ff:fe33:819"
    AND web100_log_entry.connection_spec.remote_ip != "35.225.75.192"
    AND web100_log_entry.connection_spec.remote_ip != "35.192.37.249"
    AND web100_log_entry.connection_spec.remote_ip != "35.193.254.117"
    AND anomalies.no_meta is not true
    
  
  GROUP BY
    connection_spec.server_hostname,
    log_time,
    web100_log_entry.connection_spec.remote_ip,
    web100_log_entry.connection_spec.local_ip,
    web100_log_entry.connection_spec.remote_port,
    web100_log_entry.connection_spec.local_port,
    download_mbps
)
    
SELECT
  metro,
  site,
  day,
 --  AVG(download_mbps) as download_mbps,
  APPROX_QUANTILES(download_mbps, 101)[ORDINAL(50)] as download_mbps,
  count(*) as count
FROM
(
  SELECT
    metro,
    site,
    TIMESTAMP_TRUNC(log_time, DAY) as day,
    -- APPROX_QUANTILES(download_mbps, 101)[ORDINAL(50)] as download_mbps
    MAX(download_mbps) as download_mbps
  FROM
    mlab_ndt

  GROUP BY
    metro, site, day, remote_ip
)

GROUP BY
  metro, site, day

ORDER BY
  day
""")




In [142]:
plot_scatter(
    df_ndt_all, 'day', 'download_mbps', axes_by='metro', group_by='site', axes=(3, 1),
    suptitle='Median NDT Download Rates',
    ylabel="Mbps",
    title='{axis}',
    xlim=(pd.to_datetime("2016-05-31"), pd.to_datetime("2018-08-01")),
    ylim=(0, 50),
    fx=lambda l: [pd.to_datetime(t) for t in l],
    legend={'loc':2})

# NDT Segs Retrans

In [13]:
# NOT ENOUGH HISTORICAL NDT DATA TO GET FULL TIMELINE.

In [146]:
df_ndt_retrans = run_query("""
WITH mlab_ndt AS (
  SELECT
    connection_spec.server_hostname as hostname,
    web100_log_entry.connection_spec.remote_ip as remote_ip,
    log_time,
    web100_log_entry.snap.SegsRetrans as SegsRetrans,
    web100_log_entry.snap.SegsOut as SegsOut

  FROM
    `measurement-lab.base_tables.ndt*`

  WHERE
        REGEXP_CONTAINS(connection_spec.server_hostname, r"(lga|dfw|nuq)\d\d")
    AND web100_log_entry.snap.HCThruOctetsAcked >= 1000000
    AND (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd) >= 9000000
    AND (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd) < 600000000
    AND connection_spec.data_direction = 1
    AND web100_log_entry.connection_spec.remote_ip != "45.56.98.222"
    AND web100_log_entry.connection_spec.remote_ip != "2600:3c03::f03c:91ff:fe33:819"
    AND web100_log_entry.connection_spec.remote_ip != "35.225.75.192"
    AND web100_log_entry.connection_spec.remote_ip != "35.192.37.249"
    AND web100_log_entry.connection_spec.remote_ip != "35.193.254.117"
    AND log_time >= TIMESTAMP("2016-06-01")
  
  GROUP BY
    connection_spec.server_hostname,
    log_time,
    web100_log_entry.connection_spec.remote_ip,
    web100_log_entry.connection_spec.local_ip,
    web100_log_entry.connection_spec.remote_port,
    web100_log_entry.connection_spec.local_port,
    SegsRetrans,
    SegsOut
)
    
SELECT
  UPPER(REGEXP_EXTRACT(hostname, r"([a-z]{3})[0-9]{2}")) as metro,
  REGEXP_EXTRACT(hostname, r"([a-z]{3}[0-9]{2})") as site,
  day,
  APPROX_QUANTILES(ratio, 101)[ORDINAL(50)] AS median_ratio,
  count(*) as count
FROM
(
  SELECT
    hostname,
    TIMESTAMP_TRUNC(log_time, DAY) as day,
    MAX(SAFE_DIVIDE(SegsRetrans, SegsOut)) as ratio

  FROM
    mlab_ndt

  GROUP BY
    hostname,
    day,
    remote_ip
)

GROUP BY
  hostname, day

ORDER BY
  day
""")



In [148]:
plot_scatter(
    df_ndt_retrans, 'day', 'median_ratio', axes_by='metro', group_by='site', axes=(3, 1),
    suptitle='Median NDT Retransmission Ratio - (SegsRetran / SegsOut)',
    ylabel="Ratio",
    title='{axis}',
    xlim=(pd.to_datetime("2016-05-31"), pd.to_datetime("2018-08-01")),
    ylim=(1e-6, 1e-1),
    fx=lambda l: [pd.to_datetime(t) for t in l],
    legend={'loc':2},
    ylog=True)

{'default': (<Figure size 600x800 with 3 Axes>,
  array([[<matplotlib.axes._subplots.AxesSubplot object at 0x7f82df2cb210>],
         [<matplotlib.axes._subplots.AxesSubplot object at 0x7f82ddf5b610>],
         [<matplotlib.axes._subplots.AxesSubplot object at 0x7f82de101f90>]],
        dtype=object))}

## COMBINED SegsRetrans & Switch Discards

sites = [
    ['dfw'],
    ['lga'],
    ['nuq'],
]

axes = [
    [None],
    [None],
    [None],
    [None],
    [None],
    [None],
]
def box(x, y, text):
    plt.text(x, y, text,
        bbox=dict(boxstyle="round",
              ec=(.5, 0.5, 1., 0.25),
              fc=(.5, 0.8, 1., 0.25),
        )
    )
print len(df_ndt_retrans)

prop_cycle = plt.rcParams['axes.prop_cycle']
colors = prop_cycle.by_key()['color']

fig = plt.figure(figsize=(6, 8))

for i, site_row in enumerate(sites):
    for j, site in enumerate(site_row):
        axes[i][j] = plt.subplot2grid((3, 1), (i, j))
        axes[i][j].set_ylabel('Ratio ' + site.upper())
        if i != len(sites)-1:
            axes[i][j].set_xticklabels([])

        c = 0
        for s in sorted(set(df_ndt_retrans['site'])):
            if site in s:
                ds = df_ndt_retrans[ (df_ndt_retrans['site'] == s) ]
                d = [pd.to_datetime(t) for t in ds['day']]
                axes[i][j].scatter(d, ds['median_ratio'], s=1, label=s, c=colors[c])
                c += 1

        axes[i][j].set_ylim(1e-6, 1e-1)
        axes[i][j].set_xlim(pd.to_datetime("2016-05-31"), pd.to_datetime("2018-08-01"))
        axes[i][j].tick_params(axis='x', labelrotation=-90)
        axes[i][j].grid(color='#dddddd')
        axes[i][j].legend(loc=2, fontsize='x-small')
        axes[i][j].semilogy()

for i, site_row in enumerate(sites):
    for j, site in enumerate(site_row):       

        if i != len(sites)-1:
            axes[i][j].set_xticklabels([])
        c = 0
        for h in set(df_disco_ratio['hostname']):
            if ('mlab1.' + site) in h:
                ds = df_disco_ratio[ (df_disco_ratio['hostname'] == h) ]
                d = [pd.to_datetime(t, unit='s') for t in ds['ts']]
                axes[i][j].scatter(d, ds['ratio'], s=1, label=h[6:11], c=colors[c])
                c += 1

box(pd.to_datetime("2016-10-30"), 5e-3, u"Segs Retransmit ↘")
box(pd.to_datetime("2016-10-30"), 9e-6, u"Switch Discards ↗")
                
fig.suptitle('Retrans & Switch Discard Rates')
fig.tight_layout(rect=[0, 0.03, 1, 0.95])

# NDT test distributions - Before & After



print df_ndt_dist.keys()
print len(df_ndt_dist)

def hist(vals, bin_count, log=True, cdf=False):
    """Produces hist or cdf values for smooth plots."""
    if log:
        r = [math.log10(x) for x in vals]
    else:
        r = vals
        
    m, bins = np.histogram(r, bin_count, normed=True)
    m = m.astype(float)

    tops = m
    if cdf:
        tops = np.cumsum(m)
        total = sum(m)
        tops = [float(t) / total for t in tops ]
    
    return tops, bins

seq = [(0, 0), (0, 1), (1, 0), (1, 1), (2, 0), (2, 1)]

for site in set([v[6:9] for v in set(df_ndt_dist['name'])]):
    fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(12, 6))
    for p, h in enumerate(sorted([h for h in set(df_ndt_dist['name']) if site in h])):
        before = None
        r_before = None

        for day in ['before-2w', 'after-2w']:
            ds = df_ndt_dist[ (df_ndt_dist['name'] == h) & (df_ndt_dist['period'] == day) ]
            r = ds['download_mbps']
            #print h, len(r)
            if not len(r):
                continue

            size = int(math.sqrt(len(r)))
            
            if day == 'after-2w':
                size = before                 # Test before vs after
                result = stats.ks_2samp(r, r_before)
                #if result.pvalue < 0.01:     # print 'diff', h, result  # Test itself.
                a, b = train_test_split(r, test_size=0.5)
                result = stats.ks_2samp(a, b)
                #if result.pvalue < 0.01:
                    #print 'same', h, result
                    #print '================================='

            else:
                before = size
                r_before = r
            
                
            #tops, bins = hist(r, int(1.8 * math.sqrt(len(r))), log=True , cdf=True)
            #tops, bins = hist(r, int(math.sqrt(len(r))), log=True , cdf=True)
            #print size, h, day
            #tops, bins = hist(r, size, log=True , cdf=True)
            tops, bins = hist(r, size, log=True , cdf=True)
            #tops, bins = hist(r, int(1.8 * math.sqrt(len(r))), log=False , cdf=True)           
            #tops, bins = hist(r, len(r), log=False , cdf=True)            
            

            #tops_a, bins_a = hist(a, int(1 * math.sqrt(len(a))), log=True, cdf=True)
            #tops_b, bins_b = hist(b, int(1 * math.sqrt(len(b))), log=True, cdf=True)
            if p > len(seq)-1:
                print 'skipping', h
                continue
            i, j = seq[p]
            #print h, len(bins), len(tops)
            axes[i, j].plot(bins[:-1], tops, label='cdf-'+h[6:11] + '-' + str(day))
            #axes[i, j].plot(bins_a[:-1], tops_a, label=h[6:11] + '-' + str(day)+'-a')
            #axes[i, j].plot(bins_b[:-1], tops_b, label=h[6:11] + '-' + str(day)+'-b')
            axes[i, j].set_title(h[6:11])
            #axes[i, j].set_xlim(-10, 1000)
            #axes[i, j].set_xlim(math.log10(.25), math.log10(1000))
            axes[i, j].set_xlim(math.log10(.1), math.log10(1000))
            axes[i, j].grid(color='#dddddd')
            axes[i, j].legend(loc=2, fontsize='x-small')
            #axes[i, j].set_ylim(-0.1, 1.1)
            axes[i, j].xaxis.set_major_formatter(logFormatter)
    fig.subplots_adjust(hspace=0.3, wspace=0.4)
    fig.suptitle('NDT Download Distributions')

In [153]:
df_ndt_variance = run_query("""
WITH mlab_ndt AS (
  SELECT
    connection_spec.server_hostname as server_hostname,
    log_time,
    web100_log_entry.connection_spec.remote_ip AS remote_ip,
    (8 * (web100_log_entry.snap.HCThruOctetsAcked / (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd))) AS download_mbps
  FROM
    `measurement-lab.base_tables.ndt*`
  WHERE

  (    TIMESTAMP_TRUNC(log_time, DAY) BETWEEN TIMESTAMP("2018-02-11") AND TIMESTAMP("2018-02-25")
    OR TIMESTAMP_TRUNC(log_time, DAY) BETWEEN TIMESTAMP("2018-03-11") AND TIMESTAMP("2018-03-25"))
  AND REGEXP_CONTAINS(connection_spec.server_hostname, r"mlab1.(dfw|lga|nuq)\d\d")
  AND web100_log_entry.snap.HCThruOctetsAcked >= 1000000
  AND (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd) >= 9000000
  AND (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd) < 600000000
  AND connection_spec.data_direction = 1
  
  GROUP BY
    server_hostname,
    log_time,
    web100_log_entry.connection_spec.remote_ip,
    web100_log_entry.connection_spec.local_ip,
    web100_log_entry.connection_spec.remote_port,
    web100_log_entry.connection_spec.local_port,
    download_mbps)


SELECT
  UPPER(REGEXP_EXTRACT(server_hostname, r'mlab[1-4].([a-z]{3})[0-9]{2}.*')) AS metro,
  REGEXP_EXTRACT(server_hostname, r'mlab[1-4].([a-z]{3}[0-9]{2}).*') AS site,
  REGEXP_EXTRACT(server_hostname, r'(mlab[1-4].[a-z]{3}[0-9]{2}).*') AS hostname,
  CASE
    WHEN TIMESTAMP_TRUNC(log_time, DAY) BETWEEN TIMESTAMP("2018-02-11") AND TIMESTAMP("2018-02-25") THEN 'before-2w'
    WHEN TIMESTAMP_TRUNC(log_time, DAY) BETWEEN TIMESTAMP("2018-03-11") AND TIMESTAMP("2018-03-25") THEN 'after-2w'
    ELSE 'what'
  END AS period,
  remote_ip,
  STDDEV(download_mbps) AS download_stddev,
  (STDDEV(download_mbps) / AVG(download_mbps)) AS download_cv,
  MAX(download_mbps) AS download_max,
  MIN(download_mbps) AS download_min,
  AVG(download_mbps) AS download_avg

FROM
  mlab_ndt
WHERE
  remote_ip IN(
    SELECT
      remote_ip
    FROM (
      SELECT
        remote_ip, count(*) as c1
      FROM
        mlab_ndt
      WHERE
        TIMESTAMP_TRUNC(log_time, DAY) BETWEEN TIMESTAMP("2018-02-11") AND TIMESTAMP("2018-02-25")
      GROUP BY
        remote_ip
      HAVING c1 > 5
    ) INNER JOIN (
      SELECT
        remote_ip AS remote_ip, count(*) as c2
      FROM
        mlab_ndt
      WHERE
        TIMESTAMP_TRUNC(log_time, DAY) BETWEEN TIMESTAMP("2018-03-11") AND TIMESTAMP("2018-03-25")
      GROUP BY
        remote_ip
      HAVING c2 > 5
    ) USING (remote_ip)) 
GROUP BY
  server_hostname,
  period,
  remote_ip
  --download_mbps

HAVING download_stddev is not NULL
""")



In [157]:
f = plot_hist(
    df_ndt_variance, 'download_max', lambda r: int(math.sqrt(len(r))),
    fig_by='metro', axes_by='site', group_by='period',
    suptitle='Distribution of NDT Downloads - MAX(per remote_ip)',
    label='{group} ({size})',
    title='{axis}', axes=(3, 2),
    xlim=(math.log10(.01), math.log10(1000)),
    cdf=False, xlog=True, figsize=(9, 7))

In [158]:
f = plot_hist(
    df_ndt_variance, 'download_avg', lambda r: int(math.sqrt(len(r))),
    fig_by='metro', axes_by='site', group_by='period',
    suptitle='Distribution of NDT Downloads - AVERAGE(per remote_ip)',
    label='{group} ({size})',
    title='{axis}', axes=(3, 2),
    xlim=(math.log10(.01), math.log10(1000)),
    cdf=False, xlog=True, figsize=(9, 7))


In [3]:
if False:
    df_test_counts = run_query("""
CREATE TEMPORARY FUNCTION
  timeBin(ts_usec INT64,
    size INT64) AS ( CAST(TRUNC(ts_usec / 1e6 / 10) * 10 AS INT64) );

WITH
  mlab_ndt_dedup AS (
  SELECT
    test_id,
    log_time,
    connection_spec.server_hostname AS hostname,
    web100_log_entry.snap.StartTimeStamp as StartTimeStamp,
    (8 * (web100_log_entry.snap.HCThruOctetsAcked / (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd))) AS download_mbps

  FROM
    `measurement-lab.base_tables.ndt*`
  WHERE
          (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd) >= 9000000
      AND (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd) < 600000000
      AND web100_log_entry.snap.HCThruOctetsAcked >= 1000000
      AND connection_spec.data_direction = 1
      AND REGEXP_CONTAINS(connection_spec.server_hostname, r"mlab1.(dfw02)")
      AND log_time BETWEEN TIMESTAMP("2017-06-01") AND TIMESTAMP("2018-08-01")

  GROUP BY
    test_id,
    log_time,
    hostname,
    StartTimeStamp,
    web100_log_entry.connection_spec.remote_ip,
    web100_log_entry.connection_spec.local_ip,
    web100_log_entry.connection_spec.remote_port,
    web100_log_entry.connection_spec.local_port,
    download_mbps)
    
  ,ndt_test_ids_with_discards AS (
  SELECT
    ndt.test_id as test_id,
    ndt.hostname as hostname,
    ndt.day as day,
    SUM(disco.discards) AS discards,
    ndt.download_mbps as download_mbps
  FROM (
    SELECT
      hostname,
      UNIX_SECONDS(sample.timestamp) - 10 AS tstart,
      UNIX_SECONDS(sample.timestamp) AS tend,
      sample.value AS discards
    FROM
      `measurement-lab.base_tables.switch*`,
      UNNEST(sample) AS sample
    WHERE
      metric LIKE 'switch.discards.uplink.tx'
      AND sample.timestamp BETWEEN TIMESTAMP("2017-06-01") AND TIMESTAMP("2018-08-01")
      AND REGEXP_CONTAINS(hostname, r"mlab1.(dfw02)")
    GROUP BY
      hostname,
      tstart,
      tend,
      discards
    HAVING
      discards > 0
  ) AS disco
  JOIN (
    SELECT
      test_id,
      connection_spec.server_hostname as hostname,
      TIMESTAMP_TRUNC(log_time, DAY) as day,
      timeBin(web100_log_entry.snap.StartTimeStamp, 10) AS tstart,
      timeBin(web100_log_entry.snap.StartTimeStamp, 10) + 20 AS tend,
      (8 * (web100_log_entry.snap.HCThruOctetsAcked / (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd))) AS download_mbps

  FROM
    `measurement-lab.base_tables.ndt*`
  WHERE
          (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd) >= 9000000
      AND (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd) < 600000000
      AND web100_log_entry.snap.HCThruOctetsAcked >= 1000000
      AND connection_spec.data_direction = 1
      AND REGEXP_CONTAINS(connection_spec.server_hostname, r"mlab1.(dfw02)")
      AND log_time BETWEEN TIMESTAMP("2017-06-01") AND TIMESTAMP("2018-08-01")
    GROUP BY
      test_id,
      hostname,
      day,
      tstart,
      tend,
      download_mbps ) AS ndt
  ON (disco.hostname = ndt.hostname
      AND (disco.tstart = ndt.tstart OR disco.tend = ndt.tend))
  GROUP BY
    day, hostname, test_id, download_mbps
  )


-- Split the two timebins into separate periods: before-2w and after-2w. Select clients (remote_ips) with more than 5 tests in both periods.
-- All tests from the before-2w period will have a test_id found in ndt_test_ids_with_discards.
SELECT
  day, metro, site, hostname, discards, COUNT(*) as count
FROM
(
  SELECT
    TIMESTAMP_TRUNC(log_time, DAY) as day,
    UPPER(REGEXP_EXTRACT(hostname, r'mlab[1-4].([a-z]{3})[0-9]{2}.*')) AS metro,
    REGEXP_EXTRACT(hostname, r'mlab[1-4].([a-z]{3}[0-9]{2}).*') AS site,
    REGEXP_EXTRACT(hostname, r'(mlab[1-4].[a-z]{3}[0-9]{2}).*') AS hostname,
    CASE
      WHEN test_id IN(SELECT test_id from ndt_test_ids_with_discards) THEN 'discards'
      WHEN test_id NOT IN(SELECT test_id from ndt_test_ids_with_discards) THEN 'without'
      ELSE 'what'
    END as discards

  FROM
    mlab_ndt_dedup
  )
GROUP BY
  day, metro, site, hostname, discards
""")



In [33]:
import time
def query(site):
    print 'running query', site, time.ctime()
    return """
CREATE TEMPORARY FUNCTION
  timeBin(ts_usec INT64,
    size INT64) AS ( CAST(TRUNC(ts_usec / 1e6 / 10) * 10 AS INT64) );

WITH
  mlab_ndt_dedup AS (
  SELECT
    test_id,
    log_time,
    connection_spec.server_hostname AS hostname,
    web100_log_entry.snap.StartTimeStamp as StartTimeStamp,
    (8 * (web100_log_entry.snap.HCThruOctetsAcked / (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd))) AS download_mbps

  FROM
    `measurement-lab.release.ndt_all`
  WHERE
          log_time BETWEEN TIMESTAMP("2016-06-01") AND TIMESTAMP("2018-08-01")
      AND (connection_spec.server_hostname = "mlab1."""+site+""".measurement-lab.org" OR connection_spec.server_hostname = "ndt.iupui.mlab1."""+site+""".measurement-lab.org")
      AND connection_spec.data_direction = 1
      AND (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd) >= 9000000
      AND (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd) < 600000000
      AND web100_log_entry.snap.HCThruOctetsAcked >= 1000000


  GROUP BY
    test_id,
    log_time,
    hostname,
    StartTimeStamp,
    web100_log_entry.connection_spec.remote_ip,
    web100_log_entry.connection_spec.local_ip,
    web100_log_entry.connection_spec.remote_port,
    web100_log_entry.connection_spec.local_port,
    download_mbps)
    
  ,ndt_test_ids_with_discards AS (
  SELECT
    ndt.test_id as test_id,
    ndt.hostname as hostname,
    ndt.day as day,
    SUM(disco.discards) AS discards,
    ndt.download_mbps as download_mbps
  FROM (
    SELECT
      hostname,
      UNIX_SECONDS(sample.timestamp) - 10 AS tstart,
      UNIX_SECONDS(sample.timestamp) AS tend,
      sample.value AS discards
    FROM
      `measurement-lab.base_tables.switch*`,
      UNNEST(sample) AS sample
    WHERE
      metric = 'switch.discards.uplink.tx'
      AND sample.timestamp BETWEEN TIMESTAMP("2016-06-01") AND TIMESTAMP("2018-08-01")
      AND hostname = "mlab1."""+site+""".measurement-lab.org"
    GROUP BY
      hostname,
      tstart,
      tend,
      discards
    HAVING
      discards > 0
  ) AS disco
  JOIN (
    SELECT
      test_id,
      REGEXP_EXTRACT(connection_spec.server_hostname, r"(mlab1."""+site+""".measurement-lab.org)") as hostname,
      TIMESTAMP_TRUNC(log_time, DAY) as day,
      timeBin(web100_log_entry.snap.StartTimeStamp, 10) AS tstart,
      timeBin(web100_log_entry.snap.StartTimeStamp, 10) + 20 AS tend,
      (8 * (web100_log_entry.snap.HCThruOctetsAcked / (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd))) AS download_mbps

  FROM
    `measurement-lab.release.ndt_all`
    --`measurement-lab.base_tables.ndt*`
  WHERE
          log_time BETWEEN TIMESTAMP("2016-06-01") AND TIMESTAMP("2018-08-01")
      AND connection_spec.data_direction = 1
      AND (connection_spec.server_hostname = "mlab1."""+site+""".measurement-lab.org" OR connection_spec.server_hostname = "ndt.iupui.mlab1."""+site+""".measurement-lab.org")
      AND (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd) >= 9000000
      AND (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd) < 600000000
      AND web100_log_entry.snap.HCThruOctetsAcked >= 1000000

    GROUP BY
      test_id,
      hostname,
      day,
      tstart,
      tend,
      download_mbps ) AS ndt
  ON (disco.hostname = ndt.hostname
      AND (disco.tstart = ndt.tstart OR disco.tend = ndt.tend))
  GROUP BY
    day, hostname, test_id, download_mbps
  )


SELECT
  day, metro, site, hostname, discards, COUNT(*) as count
FROM
(
  SELECT
    TIMESTAMP_TRUNC(log_time, DAY) as day,
    UPPER(REGEXP_EXTRACT(connection_spec.server_hostname, r'mlab[1-4].([a-z]{3})[0-9]{2}.*')) AS metro,
    REGEXP_EXTRACT(connection_spec.server_hostname, r'mlab[1-4].([a-z]{3}[0-9]{2}).*') AS site,
    REGEXP_EXTRACT(connection_spec.server_hostname, r'(mlab[1-4].[a-z]{3}[0-9]{2}).*') AS hostname,
    CASE
      WHEN test_id IN(SELECT test_id from ndt_test_ids_with_discards) THEN 'non-zero'
      ELSE 'zero'
--      WHEN test_id NOT IN(SELECT test_id from ndt_test_ids_with_discards) THEN 'without'
--      ELSE 'what'
    END as discards

  FROM
    --`measurement-lab.base_tables.ndt*`
    `measurement-lab.release.ndt_all`
  WHERE
        log_time BETWEEN TIMESTAMP("2016-06-01") AND TIMESTAMP("2018-08-01")
    AND connection_spec.data_direction = 1
    AND (connection_spec.server_hostname = "mlab1."""+site+""".measurement-lab.org" OR connection_spec.server_hostname = "ndt.iupui.mlab1."""+site+""".measurement-lab.org")
    AND (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd) >= 9000000
    AND (web100_log_entry.snap.SndLimTimeRwin + web100_log_entry.snap.SndLimTimeCwnd + web100_log_entry.snap.SndLimTimeSnd) < 600000000
    AND web100_log_entry.snap.HCThruOctetsAcked >= 1000000

  )
GROUP BY
  day, metro, site, hostname, discards
"""
                           
df_test_counts = pd.concat([
    run_query(query("dfw01")),
    run_query(query("dfw02")),
    run_query(query("dfw03")),
    run_query(query("dfw04")),
    run_query(query("dfw05")),
    run_query(query("dfw06")),
    run_query(query("lga02")),
    run_query(query("lga03")),
    run_query(query("lga04")),
    run_query(query("lga05")),
    run_query(query("lga06")),
    run_query(query("lga07")),
])

running query dfw01 Tue Aug 28 23:26:57 2018
running query dfw02 Tue Aug 28 23:27:28 2018
running query dfw03 Tue Aug 28 23:30:34 2018
running query dfw04 Tue Aug 28 23:31:38 2018
running query dfw05 Tue Aug 28 23:34:11 2018
running query dfw06 Tue Aug 28 23:36:14 2018
running query lga02 Tue Aug 28 23:36:58 2018
running query lga03 Tue Aug 28 23:37:40 2018
running query lga04 Tue Aug 28 23:39:19 2018
running query lga05 Tue Aug 28 23:39:47 2018
running query lga06 Tue Aug 28 23:40:20 2018
running query lga07 Tue Aug 28 23:41:54 2018


In [35]:
# NOTE: does not preserve binsize across group_by. Each line re-calculates the bin size.
print df_test_counts.keys()
plot_scatter(
    df_test_counts, 'day', 'count',
    fig_by='metro', axes_by='site', group_by='discards',
    suptitle='NDT Test Counts (with or without discards)',
    label='{group}',
    title='{axis}',
    axes=(3, 2), figsize=(12, 10),
    ylim=(-200, 30000),
    xlim=(pd.to_datetime("2016-05-31"), pd.to_datetime("2018-08-01")),
    fx=lambda l: [pd.to_datetime(t) for t in l])

Index([u'count', u'day', u'discards', u'hostname', u'metro', u'site'], dtype='object')


{u'DFW': (<matplotlib.figure.Figure at 0x1a3d1289d0>,
  array([[<matplotlib.axes._subplots.AxesSubplot object at 0x1a3d12a190>,
          <matplotlib.axes._subplots.AxesSubplot object at 0x1a23d60590>],
         [<matplotlib.axes._subplots.AxesSubplot object at 0x1a3d9869d0>,
          <matplotlib.axes._subplots.AxesSubplot object at 0x1a3d9e12d0>],
         [<matplotlib.axes._subplots.AxesSubplot object at 0x1a3da30dd0>,
          <matplotlib.axes._subplots.AxesSubplot object at 0x1a3d8b4c90>]],
        dtype=object)),
 u'LGA': (<matplotlib.figure.Figure at 0x1a28746310>,
  array([[<matplotlib.axes._subplots.AxesSubplot object at 0x1a287ee850>,
          <matplotlib.axes._subplots.AxesSubplot object at 0x1a28834810>],
         [<matplotlib.axes._subplots.AxesSubplot object at 0x1a2885f1d0>,
          <matplotlib.axes._subplots.AxesSubplot object at 0x1a287e6790>],
         [<matplotlib.axes._subplots.AxesSubplot object at 0x1a288f8550>,
          <matplotlib.axes._subplots.AxesSubplot

In [24]:
# NOTE: does not preserve binsize across group_by. Each line re-calculates the bin size.
print df_test_counts.keys()
plot_scatter(
    df_test_counts_lga, 'day', 'count',
    fig_by='metro', group_by='discards',
    suptitle='NDT Test Counts (with or without discards)',
    label='{group}',
    title='{figure}',
    axes=(1, 1), figsize=(12, 10),
    ylim=(0, 30000),
    xlim=(pd.to_datetime("2016-05-31"), pd.to_datetime("2018-08-01")),
    fx=lambda l: [pd.to_datetime(t) for t in l])

Index([u'count', u'day', u'discards', u'hostname', u'metro', u'site'], dtype='object')


{u'DFW': (<matplotlib.figure.Figure at 0x1a3b3e9310>,
  array([[<matplotlib.axes._subplots.AxesSubplot object at 0x1a38c29550>]],
        dtype=object)),
 u'LGA': (<matplotlib.figure.Figure at 0x1a38bd2cd0>,
  array([[<matplotlib.axes._subplots.AxesSubplot object at 0x1a3b457050>]],
        dtype=object))}