# Tools for managing the master beacon annotatons

See the README.md for setup.

NB: This notebook is unreliable unless cells are run one at a time.

In [1]:
import os
import math
import numpy as np
import pandas as pd
import matplotlib.dates as dates
import matplotlib.pyplot as plt
import matplotlib.ticker
import datetime
import collections
import pickle
import time
# Depends on: pip install --upgrade google-cloud-bigquery
from google.cloud import bigquery

# Some matplotlib features are version dependent.
assert(matplotlib.__version__ >= '2.1.2')
print "Done"

Done


# Global flags

Invoke cell individually or reorder them to change defaults

In [2]:
EndDate = '2018-05-13'
project = 'mlab-sandbox'
dataset_id = 'mattmathis'

In [3]:
# Skip slow/expensive queries when incrementally rerunning cells
DoQueries=False

In [4]:
# Force queries
DoQueries=True

In [5]:
# Enable interactive figures that pan and zoom
interactive = True

In [6]:
# Enable figures to load inline in the browser and saved (github etc).
interactive = False

In [7]:
def setupmatplotlib(force=None):
    global interactive
    if force == 'inline':
        %matplotlib inline
        return
    elif force == 'interactive':
        %matplotlib
        return
    elif force is not None:
        print 'Unknown option, using default'
    if interactive:
        print 'default interactive'
        %matplotlib
        return
    else:
        print 'default inline'
        %matplotlib inline
        return
setupmatplotlib()

default inline


In [8]:
# BigQuery interface
def expand_query(query, **kwargs):
    """expand_query: expands nested {parameter} substitutions.
    Stashes forensic output in globals.
    """
    global DebugQuery # For pasting into BQ, after the fact
    global NumberedQuery # For grocking BQ error line numbers.
    global DefaultArgs # To ignore some 

    # Only allow argument substitution 4 levels deep, because
    # accidental infinite recursion risks crashing the notebook.
    args = DefaultArgs.copy()
    args.update(kwargs)
    query=query.format(**args)
    query=query.format(**args)
    query=query.format(**args)
    query=query.format(**args)
    if '{' in query:
        raise "Unexpanded substitutions"
    
    # Leave crumbs incase we need a postmortem
    DebugQuery = query
    NumberedQuery = ""
    for i, l in enumerate(query.split('\n')):
          NumberedQuery += "%3d %s\n"%(i, l)

    return query

def run_query(query, project=project, otherindex=None, timeindex='partition_date', **kwargs):
    """ run_query
        Accepts nested {parameter} substitutions.
        
        Stashes forensic output in globals.
    """
    global NumberedQuery
    query=expand_query(query,  **kwargs)

    # do the work
    client = bigquery.Client(project=project)
    job = client.query(query)  # All errors are delayed

    # Marshal the results, catching async errors
    try:
        results = collections.defaultdict(list)
        for row in job.result(timeout=300):
            for key in row.keys():
                results[key].append(row.get(key)) 
    except:
        print NumberedQuery
        raise

    if otherindex:
        return pd.DataFrame(results, index=results[timeined])
    if timeindex:
        return pd.DataFrame(results, index=pd.DatetimeIndex(results[timeindex]))
    # set timeindex=None to force a raw DataFrame
    return pd.DataFrame(results)

def write_query_table(query, otable,
                      project=project,
                      dataset_id=dataset_id,
                      **kwargs):
    """ write_query_table
        Accepts nested {parameter} substitutions.
        
        Stashes forensic output in globals.
    """
    global NumberedQuery
    query=expand_query(query,  **kwargs)

    # do the work
    client = bigquery.Client(project=project)
    job_config = bigquery.QueryJobConfig()
    table_ref = client.dataset(dataset_id).table(otable)
    job_config.destination = table_ref
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
    job = client.query(query, location='US', job_config=job_config)

    # Marshal the results, catching async errors
    try:
        res = job.result()  # Get the first row to make sure it starts
        while not job.done():
            print 'tick'
            time.sleep(5)
        assert job.state == 'DONE'
    except:
        print "Query Errored"
        print NumberedQuery
        raise
    print "Query completed"
    return

# test code
if False:
    testQ="""
    SELECT *
    FROM `{dateset_id}.master_annotations`
    """
    write_query_table(testQ, otable='test_results2')

print "Done"

Done


In [9]:
# Quieries for creating table master_annotations, which is the core of the paper

# Query the list of beacons
BeaconQ ="""
SELECT
  {beacon_fields}
FROM
  `{dataset_id}.master_annotations`
WHERE
  clientIP NOT IN (
    '45.56.98.222',
    '64.9.225.99',
    '64.9.225.190' ) # exclude eb, etc
    {beacon_where}
"""

# A useful diagnostic query
CountQ="""
SELECT
    count(*) as CNT
from (
{beacons}
     )
"""
    
# New decombosible Master Beacon query
RawStats="""
      web100_log_entry.connection_spec.remote_ip AS clientIP,
      IFNULL(substr(connection_spec.server_hostname, -25, 3), "UNK") AS server_metro,
      COUNT(*) AS num_tests,
      MIN( web100_log_entry.log_time ) AS first_time,
      MAX( web100_log_entry.log_time ) AS last_time,
      COUNTIF(connection_spec.data_direction = 0) AS num_upload,
      COUNTIF(connection_spec.data_direction = 1) AS num_download,
      COUNTIF(substr(connection_spec.client_application,1,3) = "cli") AS num_cli,
      COUNTIF(REGEXP_CONTAINS(connection_spec.client_application, "confine")) AS num_confine,
      COUNTIF(connection_spec.client_browser = "- (web100clt)") AS num_web100clt,
      COUNTIF(substr(connection_spec.client_browser,1,12) = "BTWebClient/") AS num_BTWebClient,
      COUNTIF(substr(connection_spec.client_browser,1,7) = "Chrome/") AS num_Chrome,
      COUNTIF(substr(connection_spec.client_browser,1,8) = "Firefox/") AS num_Firefox,
      COUNTIF(substr(connection_spec.client_browser,1,3) = "IE/") AS num_IE,
      SUM(IF(connection_spec.data_direction = 0, web100_log_entry.snap.HCDataOctetsIn, 0)) AS upload_bytes,
      SUM(IF(connection_spec.data_direction = 1, web100_log_entry.snap.HCDataOctetsOut, 0)) AS download_bytes,
      SUM(IF(connection_spec.data_direction = 0, web100_log_entry.snap.duration, 0)) AS upload_duration,
      SUM(IF(connection_spec.data_direction = 1, web100_log_entry.snap.duration, 0)) AS download_duration
"""

MetroAgg="""
    clientIP,
    ARRAY_AGG(CONCAT("'",server_metro,"':",CAST(num_tests AS STRING)) ORDER BY num_tests DESC) as metro_details,
    COUNT(DISTINCT server_metro) AS series_num_metros,
    MIN(first_time) AS series_start,
    MAX(last_time) AS series_end,
    SUM(num_tests) AS series_count,
    SUM(num_upload) AS series_uploads,
    SUM(num_download) AS series_downloads,
    SUM(num_cli) AS series_num_cli,
    SUM(num_confine) AS series_num_confine,
    SUM(num_web100clt) AS series_num_web100clt,
    SUM(num_BTWebClient) AS series_num_BTWebClient,
    SUM(num_Chrome) AS series_num_Chrome,
    SUM(num_Firefox) AS series_num_Firefox,
    SUM(num_IE) AS series_num_IE,
    SUM(upload_bytes) AS series_upload_bytes,
    SUM(download_bytes) AS series_download_bytes,
    SUM(upload_duration) AS series_upload_duration,
    SUM(download_duration) AS series_download_duration
"""

SummaryStats="""
  * EXCEPT ( metro_details ),
  SAFE_DIVIDE(series_upload_bytes, series_upload_duration)*125.0 AS series_average_upload, # kbps
  SAFE_DIVIDE(series_download_bytes, series_download_duration)*125.0 AS series_average_download, # kbps
  TIMESTAMP_SECONDS(series_start) AS series_start_asc,
  TIMESTAMP_SECONDS(series_end) AS series_end_asc,
  ( series_end - series_start ) / 86400.0 AS series_elapsed_days,
  ( series_end - series_start ) / (series_count - 1) / 3600.0 AS series_interval_hours,
  ARRAY_TO_STRING(metro_details, " ") AS series_metro_details
"""


MasterQ="""
#standardSQL
#
# Gather a master table summarizing all clients that run repeated tests
#
# Patch (sed etc) to replace NUM_TESTS
SELECT
    {SummaryStats}
FROM (
  SELECT
    {MetroAgg}
  FROM (
    SELECT
      {RawStats}
    FROM
      `measurement-lab.release.ndt_all`
    WHERE
        partition_date <= '{enddate}'
    GROUP BY
      clientIP,
      server_metro)
  GROUP BY
    clientIP )
  INNER JOIN (
      SELECT
          ClientIP, dailymax
      FROM
          `{dataset_id}.clients_test_count`
      )
  USING ( clientIP )
WHERE
#  series_count > NUM_TESTS OR
  ( series_end - series_start ) / 86400.0 > {MIN_DURATION}
    AND ( series_end - series_start ) / (series_count - 1) < {MAX_INTERVAL}*24*60*60 # 2 weeks + fudge
    AND dailymax < {DAILY_MAX}
ORDER BY series_num_metros DESC
"""

global EndDate # prevent irrelevant changes

# Default values for optional parameters
DefaultArgs = {
    'RawStats':RawStats,
    'MetroAgg':MetroAgg,
    'SummaryStats':SummaryStats,
    'enddate':EndDate,   # TODO, must enforce
# Common fields used for parsing the master beacons data
    'beacons':BeaconQ,
    'beacon_fields':'ClientIP',
    'beacon_where':'',
    'dataset_id':dataset_id 
}

In [10]:

# Find all clients that ran too many tests per day
AbusiveQ="""
SELECT
    *
FROM (
    SELECT
        clientIP,
        MAX(dailycnt) AS dailymax
    FROM (
        SELECT
            partition_date,
            web100_log_entry.connection_spec.remote_ip AS clientIP,
            count (*) as dailycnt
        FROM
           `measurement-lab.release.ndt_all`   
        WHERE
            partition_date <= '{enddate}'
        GROUP BY
            partition_date, clientIP
        )
    GROUP BY clientIP
    )
WHERE
    dailymax >= {dailylimit}
# ORDER BY dailymax DESC
"""

In [11]:
# Test code to look at abusive clients
if False:
    absusive_clients = run_query(AbusiveQ, timeindex=None, dailylimit = 100)
    print len(absusive_clients)
    print absusive_clients[0:10]
    print absusive_clients[-10:-1]

# This was useful for exploring the data
if False:
    write_query_table(AbusiveQ, otable='abusive_clients', dailylimit = 100)

print 'Done'

Done


In [12]:
# DO the heavy lifting
# This produces the annotated master beacon table used in the MLab beacons paper
# Once the output table has been created, this query only needs to be rerun if
# somthing above changes.

if True:
    %time write_query_table(AbusiveQ, otable='clients_test_count', dailylimit = 0)    
    query = expand_query(MasterQ, MIN_DURATION=365, MAX_INTERVAL=20, DAILY_MAX=100)
    %time write_query_table(query, otable='master_annotations')

Query completed
CPU times: user 48 ms, sys: 16 ms, total: 64 ms
Wall time: 1min 2s
Query completed
CPU times: user 60 ms, sys: 16 ms, total: 76 ms
Wall time: 2min 17s
