From ea10212524fd5920dd935657d97d76ead7eecb3a Mon Sep 17 00:00:00 2001 From: Stephen Soltesz Date: Sun, 25 Aug 2013 13:53:12 -0400 Subject: [PATCH 01/14] basic setup for IP2ASN queries --- setup.sh | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100755 setup.sh diff --git a/setup.sh b/setup.sh new file mode 100755 index 0000000..e44167a --- /dev/null +++ b/setup.sh @@ -0,0 +1,75 @@ +#!/usr/bin/env bash + +function find_command () { + local command=$1 + if ! type -P $command &> /dev/null ; then + echo "ERROR: could no locate '$command' in current PATH" + echo "ERROR: either install $command, or update PATH" + exit 1 + fi +} + +find_command wget +find_command m4 + +set -x + +IP2ASNFILE=GeoIPASNum2 +if ! test -f $IP2ASNFILE.zip ; then + wget http://download.maxmind.com/download/geoip/database/asnum/$IP2ASNFILE.zip +fi +if ! test -f $IP2ASNFILE.csv ; then + unzip $IP2ASNFILE.zip + if test $? -ne 0 ; then + echo "Error: failed to unzip $IP2ASNFILE.zip" + exit 1 + fi +fi + +function generate_ispquery () { + local ISPNAME=$1 + AFTERFIRST= + rm -f $ISPNAME.input + + if ! test -f $ISPNAME.input ; then + FILTER_PREFIX="PARSE_IP(web100_log_entry.connection_spec.remote_ip) " + grep -i $ISPNAME $IP2ASNFILE.csv | \ + awk -F, '{print $1,$2}' | \ + while read IP_low IP_high ; do + if test -n "$AFTERFIRST" ; then echo " OR" ; fi + FILTER="$FILTER_PREFIX BETWEEN $IP_low AND $IP_high " + echo -n " $FILTER" + AFTERFIRST=1 + done > $ISPNAME.input + fi + + if ! test -f sql/$ISPNAME.lga01.sql ; then + m4 -DISP_FILTER_FILENAME=$ISPNAME.input sql/ndt-tmpl-generic.m4.sql > sql/$ISPNAME.lga01.sql + fi + if ! test -f sql/$ISPNAME.lga02.sql ; then + m4 -DISP_FILTER_FILENAME=$ISPNAME.input sql/ndt-tmpl-generic.m4.sql > sql/$ISPNAME.lga02.sql + fi + + QV=$HOME/source/m-lab.analysis/queryview/queryview.py + $QV --query $ISPNAME.lga01 \ + --count test_count \ + --timestamp day_timestamp \ + -l med_rate \ + -D DATETABLE=[m_lab.2013_08] \ + -D SERVERIPS="'74.63.50.19','74.63.50.32','74.63.50.47'" \ + --output graphs/$ISPNAME.lga01.png + + $QV --query $ISPNAME.lga02 \ + --count test_count \ + --timestamp day_timestamp \ + -l med_rate \ + -D DATETABLE=[m_lab.2013_08] \ + -D SERVERIPS="'38.106.70.147','38.106.70.160','38.106.70.173'" \ + --output graphs/$ISPNAME.lga02.png +} + +generate_ispquery comcast +generate_ispquery cablevision +generate_ispquery warner +generate_ispquery rcn +generate_ispquery verizon From 332621169f164bea07083628fe4794c075b4bb00 Mon Sep 17 00:00:00 2001 From: Stephen Soltesz Date: Sun, 25 Aug 2013 23:50:18 -0400 Subject: [PATCH 02/14] ndt download sql template --- sql/ndt-tmpl-generic.m4.sql | 53 +++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 sql/ndt-tmpl-generic.m4.sql diff --git a/sql/ndt-tmpl-generic.m4.sql b/sql/ndt-tmpl-generic.m4.sql new file mode 100644 index 0000000..b008e8e --- /dev/null +++ b/sql/ndt-tmpl-generic.m4.sql @@ -0,0 +1,53 @@ +SELECT + -- web100_log_entry.connection_spec.local_ip AS server_ip, + INTEGER(web100_log_entry.log_time/7200)*7200 AS day_timestamp, + COUNT(web100_log_entry.log_time) AS test_count, + -- RATE + NTH(90,QUANTILES(8*web100_log_entry.snap.HCThruOctetsAcked/( + web100_log_entry.snap.SndLimTimeRwin + + web100_log_entry.snap.SndLimTimeCwnd + + web100_log_entry.snap.SndLimTimeSnd),101)) as quantile_90, + NTH(10,QUANTILES(8*web100_log_entry.snap.HCThruOctetsAcked/( + web100_log_entry.snap.SndLimTimeRwin + + web100_log_entry.snap.SndLimTimeCwnd + + web100_log_entry.snap.SndLimTimeSnd),101)) as quantile_10, + NTH(50,QUANTILES(8*web100_log_entry.snap.HCThruOctetsAcked/( + web100_log_entry.snap.SndLimTimeRwin + + web100_log_entry.snap.SndLimTimeCwnd + + web100_log_entry.snap.SndLimTimeSnd),101)) as med_rate, + STDDEV(8*web100_log_entry.snap.HCThruOctetsAcked/( + web100_log_entry.snap.SndLimTimeRwin + + web100_log_entry.snap.SndLimTimeCwnd + + web100_log_entry.snap.SndLimTimeSnd)) as std_rate, +FROM + DATETABLE +WHERE + IS_EXPLICITLY_DEFINED(project) + AND IS_EXPLICITLY_DEFINED(connection_spec.data_direction) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.is_last_entry) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.snap.HCThruOctetsAcked) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.snap.CongSignals) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.connection_spec.remote_ip) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.connection_spec.local_ip) + -- NDT download + AND project = 0 + AND connection_spec.data_direction = 1 + AND web100_log_entry.is_last_entry = True + AND web100_log_entry.snap.HCThruOctetsAcked >= 8192 + AND web100_log_entry.snap.HCThruOctetsAcked < 1000000000 + AND web100_log_entry.snap.CongSignals > 0 + AND (web100_log_entry.snap.SndLimTimeRwin + + web100_log_entry.snap.SndLimTimeCwnd + + web100_log_entry.snap.SndLimTimeSnd) >= 9000000 + AND (web100_log_entry.snap.SndLimTimeRwin + + web100_log_entry.snap.SndLimTimeCwnd + + web100_log_entry.snap.SndLimTimeSnd) < 3600000000 + AND web100_log_entry.snap.MinRTT < 1e7 + -- restrict to NY lga01 servers, and given ISP address ranges. + AND web100_log_entry.connection_spec.local_ip IN(SERVERIPS) + AND ( include(ISP_FILTER_FILENAME) ) +GROUP BY + day_timestamp +ORDER BY + day_timestamp; + From 56473bb15fd2c923ef8cae2b865a09ab3b3d4b0a Mon Sep 17 00:00:00 2001 From: Stephen Soltesz Date: Sun, 25 Aug 2013 23:51:40 -0400 Subject: [PATCH 03/14] add updated queryview & client config --- .client_secrets.json | 12 + queryview.py | 727 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 739 insertions(+) create mode 100644 .client_secrets.json create mode 100755 queryview.py diff --git a/.client_secrets.json b/.client_secrets.json new file mode 100644 index 0000000..85a5b61 --- /dev/null +++ b/.client_secrets.json @@ -0,0 +1,12 @@ +{ + "installed": { + "auth_uri":"https://accounts.google.com/o/oauth2/auth", + "client_secret":"xA4FouEl4U4KwQ638jOfCXYt", + "token_uri":"https://accounts.google.com/o/oauth2/token", + "client_email":"", + "redirect_uris": [ "urn:ietf:wg:oauth:2.0:oob","oob" ], + "client_x509_cert_url":"", + "client_id":"754187384106-b3fds86t81gp3pspfe70fnhqkdk3ratr.apps.googleusercontent.com", + "auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs" + } +} diff --git a/queryview.py b/queryview.py new file mode 100755 index 0000000..261965b --- /dev/null +++ b/queryview.py @@ -0,0 +1,727 @@ +#!/usr/bin/env python + +import csv +import os +import sys +import time +import subprocess +import statvfs + +import httplib2 +import logging +import pprint +import sys + +try: + import matplotlib + import pylab + import gflags + from apiclient.discovery import build + from apiclient.errors import HttpError + from oauth2client.client import AccessTokenRefreshError + from oauth2client.client import flow_from_clientsecrets + from oauth2client.file import Storage + from oauth2client.tools import run +except: + import traceback + print "Error: Failed to import a dependency:" + print "This is a common package in most Linux distros:" + print " python-matplotlib" + print " py-matplotlib" + print " python-gflags" + print " google-api-python-client" + print "Or, try: " + print " easy_install --upgrade google-api-python-client" + print " easy_install -U matplotlib" + traceback.print_exc() + sys.exit(1) + + +gflags.DEFINE_string('query', None, + "Required: name of SQL query in $PWD/sql/", + short_name='q') +gflags.DEFINE_string('csvfile', None, + "Optional: name of CSV file instead of --query", + short_name='c') +gflags.DEFINE_multistring('mergefiles', [], + "Optional: merge multiple CSV files", + short_name='m') +gflags.DEFINE_string('timestamp', None, + "Required: X-axis column name with timestamps.", + short_name='t') + +gflags.DEFINE_multistring('columns', [], + ("Required: Y-axis column name to plot as a line. "+ + "Can be specified multiple times. "+ + "To add an error bar to the line, add a second "+ + "column with a comma, such as: column1,column2. "), + short_name='l') + + +gflags.DEFINE_bool("refresh", False, + ("By default, query results are cached and reused "+ + "on subsequent calls. The cache file is refreshed"+ + " automatically when either the query file mtime "+ + "is greater or a day has passed from "+ + "the cache file's mtime."), + short_name='r') + +gflags.DEFINE_string("count_column", None, + ("Create a second plot below the main axis. Use "+ + "the given column name for values.")) +gflags.DEFINE_multistring("date_vline", [], + ("Draw a vertical line at given date: YYYY-MM-DD")) + +gflags.DEFINE_multistring("color_list", [], + "Colors are applied to lines in order specified.", + short_name='C') + +gflags.DEFINE_multistring("define_values", [], + ("Pseudo macros translate SQL query by replacing "+ + "all instances of KEY with VALUE."), + short_name='D') +gflags.DEFINE_string("output", None, + "Output filename. Used for plot image or merged csv.") + +gflags.DEFINE_float("ymax", None, "YMAX on graph") + +gflags.DEFINE_string("ylabel", None, "Y-Label on graph.") +gflags.DEFINE_string("title", None, "Title on graph.") + +gflags.DEFINE_bool("plot", True, + "Enable/disable plotting of the resulting data.") + +gflags.DEFINE_bool("verbose", False, + "Verbose mode: print extra details.", + short_name='v') +gflags.DEFINE_bool("api", True, + ("Use http API rather than 'bq' executable, say --noapi "+ + "to use 'bq' command-line tool instead.")) + +gflags.DEFINE_string('priority', 'INTERACTIVE', + 'Priority at which to run the query', + short_name = 'p') +gflags.RegisterValidator('priority', + lambda value: value in [ 'BATCH', 'INTERACTIVE' ], + message='--priority must be "BATCH" or "INTERACTIVE"') + +def cmd_exists(cmd): + """ Returns: bool, True if 'cmd' is in PATH, False otherwise.""" + return subprocess.call("type " + cmd, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) == 0 + +def bigquery_exec(query_string, output_filename, options): + """ A wrapper for the 'bq' command line tool. + Args: + query_string - a bigquery-SQL query. Since this must be passed to + 'bq' via the command line, mind your use of quotes. + TODO: find a way to call the bq python function. + output_filename - filename to save results. If None, a temporary + file is used to save bq output, then deleted after + parsed. + verbose - if True, print bq command before executing. + Raises: + Exception - on command execution error. + Returns: + True on success + """ + verbose = options.verbose + if not cmd_exists("bq"): + print "Error: Could not find 'bq' (big query command line tool) in path" + print "Look here: https://code.google.com/p/google-bigquery-tools/" + sys.exit(1) + + # load query + if query_string is None: + cmd = """echo "count,ips\n80090,34745\n80091,34746" > %s """ % ( + output_filename) + else: + # TODO: find a way to call this via python + cmd = "bq -q --format=csv query --max_rows 100000000 \"%s\" > %s" % ( + query_string, output_filename) + + if verbose: print cmd + r = os.system(cmd) + if r != 0: + # NOTE: read content of outfile for error message. + msg = open(output_filename, 'r').read() + print query_string + os.unlink(output_filename) + raise Exception("Non-zero exit-status: %s" % msg) + + return True + +PROJECT_ID = 'measurement-lab' +DATASET_ID = 'm_lab' +FLOW_OBJ = flow_from_clientsecrets('.client_secrets.json', + scope='https://www.googleapis.com/auth/bigquery') + +def authorize_and_build_bigquery_service(): + storage = Storage('.bigquery_credentials.dat') + credentials = storage.get() + + if credentials is None or credentials.invalid: + credentials = run(FLOW_OBJ, storage) + + http = httplib2.Http() + + logging.info('Authorizing...') + http = credentials.authorize(http) + + logging.info('Building BQ service...') + return build('bigquery', 'v2', http = http) + +def write_reply_to_csv(query_reply, output_fd, header=True): + + header_cols = query_reply['schema']['fields'] + header_names = [ h['name'] for h in header_cols ] + writer = csv.DictWriter(output_fd, header_names) + + if header: + writer.writeheader() + + for row_vals in query_reply['rows']: + vals = [ v['v'] for v in row_vals['f'] ] + writer.writerow(dict(zip(header_names, vals))) + + output_fd.flush() + +def bigquery_api(query_string, output_filename, options): + BQ_SERVICE = authorize_and_build_bigquery_service() + + output_fd = open(output_filename, 'w') + + try: + logging.info('Running %s', query_string) + job_collection = BQ_SERVICE.jobs() + job_data = { + 'configuration': { + 'query': { + 'query': query_string, + 'priority': options.priority + } + } + } + + insert_response = job_collection.insert(projectId = PROJECT_ID, + body = job_data).execute() + + current_status = 'INVALID' + while current_status != 'DONE': + logging.info("sleeping 3") + time.sleep(3) + status = job_collection.get( + projectId = PROJECT_ID, + jobId = insert_response['jobReference']['jobId']).execute() + current_status = status['status']['state'] + logging.info(status['status']) + logging.info('%s', current_status) + + current_row = 0 + logging.info('getQueryResults %d', current_row) + query_reply = job_collection.getQueryResults( + projectId = PROJECT_ID, + jobId = insert_response['jobReference']['jobId'], + startIndex = current_row).execute() + + total_rows = int(query_reply['totalRows']) + show_header = True + while ('rows' in query_reply) and current_row < total_rows: + logging.info('Received rows from %d / %d [%.2f%%]', + current_row, + total_rows, + 100.0 * float(current_row) / float(total_rows)) + write_reply_to_csv(query_reply, output_fd, show_header) + show_header=False + current_row += len(query_reply['rows']) + logging.info('getQueryResults %d', current_row) + query_reply = job_collection.getQueryResults( + projectId = PROJECT_ID, + jobId = query_reply['jobReference']['jobId'], + startIndex = current_row).execute() + + except HttpError as err: + logging.error('Error running query: %s', pprint.pprint(err.content)) + sys.exit(1) + + except AccessTokenRefreshError: + logging.error('Credentials have been revoked or expired. Please re-run ' + 'the application to re-authorize.') + sys.exit(1) + + output_fd.close() + +def usage(): + return """ +Summary: + Queryview performs a BigQuery request, caches the result, and graphs the + result data. + + At a minimum, every row in the query result should include two columns: + a 'timestamp' column (x-axis), and + a 'data' column (y-axis). + All values are interpreted as floats. + + Queryview looks for SQL files in $PWD/sql/*.sql. Query files are specified + on the command line without the .sql extension (see EXAMPLES). + + Query results are cached in $PWD/cache/*.csv. Cache files will be reused + (rather than re-running the query) for one day, until the query file is + modified, or until you manually specifiy "--refresh". + + NOTE/TODO: + Queryview only uses your default bigquery data set. + +Examples: + ./queryview.py -q example -t day_timestamp -l avg + + ./queryview.py -q example -t day_timestamp -l avg -C red + + ./queryview.py -q example -t day_timestamp -l avg -C red \\ + --date_vline 2013-05-08 + + ./queryview.py -q example -t day_timestamp -l avg -C red \\ + --date_vline 2013-05-08 \\ + --count_column test_count + + ./queryview.py -q example -t day_timestamp -l avg -C red \\ + -l median -C blue \\ + -l quantile_90 -C green \\ + -l quantile_10 -C green \\ + --date_vline 2013-05-08 \\ + --count_column test_count \\ + --ylabel Milliseconds \\ + --title "web100.MinRTT on one Machine" \\ + --output minrtt_image.png """ + +def parse_args(): + try: + gflags.FLAGS(sys.argv) + except gflags.FlagsError, err: + print usage() + print '%s\nUsage: %s ARGS\n%s' % (err, sys.argv[0], gflags.FLAGS) + sys.exit(1) + + if len(sys.argv) == 1: + print usage() + print 'Usage: %s ARGS\n%s' % (sys.argv[0], gflags.FLAGS) + sys.exit(1) + + options = gflags.FLAGS + + if options.verbose: + level = logging.DEBUG + else: + level = logging.WARNING + + logging.basicConfig(format = '[%(asctime)s] %(levelname)s: %(message)s', + level = level) + + if (options.query is None and + options.csvfile is None and + len(options.mergefiles) == 0): + print "Error: Please provide --query , or" + print "Error: --csv , or" + print "Error: --merge " + sys.exit(1) + + if len(options.mergefiles) == 1: + print "Error: please specify at least 2 csv files to merge." + sys.exit(1) + + if options.timestamp is None: + print "Error: Please provide --timestamp " + sys.exit(1) + + if len(options.columns) == 0 and len(options.mergefiles) == 0: + print ("Error: Provide at least one line, --column ") + sys.exit(1) + + return (options, []) + +SQL_PATH="sql" +CSV_PATH="cache" +def get_filenames(query_name, define_values=[]): + if not os.path.exists(SQL_PATH): + os.makedirs(SQL_PATH) + if not os.path.exists(CSV_PATH): + os.makedirs(CSV_PATH) + extension = "" + + # NOTE: replace unfriendly characters + for val in define_values: + val = val.replace("=","_") + val = val.replace("/", "_") + val = val.replace("'","") + val = val.replace("(","_") + extension += "-"+val + + # NOTE: truncate extension to max filename length + max_len = os.statvfs(CSV_PATH)[statvfs.F_NAMEMAX] + extension = extension[:max_len-len(query_name)-4] + + query_filename = os.path.join(SQL_PATH, query_name+'.sql') + cache_filename = os.path.join(CSV_PATH, query_name+extension+'.csv') + return (query_filename, cache_filename) + +def read_csvfile(cache_file, return_dicts): + """ + Args: + cache_file - name of a csv file. + return_dicts - if True, return a csv.DictReader(), + else plain csv.reader() + if csv.DictReader() then each row will be a dict, + with column names for keys and values, + if csv.reader() every row is a list of values. This + includes the row for column names. + Returns: + A csv_reader object, the type determined by return_dicts. + NOTE: All values are STRINGS. + """ + input_fd = open(cache_file,'r') + if return_dicts: + reader = csv.DictReader(input_fd) + else: + # NOTE: also returns headers as a row. + reader = csv.reader(input_fd) + return reader + +def read_file(filename): + """ If file does not exist, function exists. + Returns: + content of filename as string. + """ + try: + query_str = open(filename,'r').read() + except: + print "Error: could not read content of %s" % filename + sys.exit(1) + return query_str + +def process_macros(query, define_values): + for key_val in define_values: + key,val = key_val.split("=") + query = query.replace(key,val) + return query + +def ts2d(xl): + """convert a list of unix timestamps and to matplotlib numbers suitable + for plotting. + """ + return matplotlib.dates.epoch2num(xl) + +def plot_data(x_list, y_lists, y_errs, c_list, options): + + if len(x_list) == 0: + print "WARNING: zero-length data set." + print "WARNING: nothing to plot." + return + + # Constants + textsize = 9 + left, width = 0.1, 0.8 + axescolor = '#f6f6f6' # the axes background color + + # NOTE: default size fills everything + rect1 = [left, 0.1, width, 0.75] #left, bottom, width, height + if options.count_column: + # if the count image is included, then resize. + rect1 = [left, 0.25, width, 0.6] #left, bottom, width, height + rect2 = [left, 0.1, width, 0.15] + + fig = pylab.figure(figsize=(8,4), facecolor='white') + ax1 = fig.add_axes(rect1, axisbg=axescolor) + ax1.grid() + if options.ylabel: + ax1.set_ylabel(options.ylabel) + if options.title: + ax1.set_title(options.title) + + # NOTE: if color list is empty, fill it. + if options.color_list == []: + for i in range(len(options.columns)): + options.color_list.append(ax1._get_lines.color_cycle.next()) + + if options.count_column: + # NOTE: if we have two axes, then 'sharex' and disable x labels on top + ax2 = fig.add_axes(rect2, axisbg=axescolor, sharex=ax1) + [ label.set_visible(False) for label in ax1.get_xticklabels() ] + for label in ax2.get_xticklabels(): + #label.set_rotation(10) + label.set_horizontalalignment('center') + ax2.set_ylabel("Count") + ax2.grid() + ylocator = matplotlib.ticker.MaxNLocator(5, prune='both') + ax2.yaxis.set_major_locator(ylocator) + else: + for label in ax1.get_xticklabels(): + #label.set_rotation(10) + label.set_horizontalalignment('center') + + date_ts = None + if options.date_vline: + for date_str in options.date_vline: + _tup = time.strptime(date_str, "%Y-%m-%d") + date_ts = int(time.mktime(_tup)) + ax1.axvline(ts2d(date_ts), color='brown', linewidth=2, + linestyle='--', label="Update") + + ymax=0 + split_column_names = map(split_column_name, options.columns) + for i,(y_col,y_err_col) in enumerate(split_column_names): + if options.verbose: print "Column:", y_col + + if options.ymax is not None: + ymax = options.ymax + else: + if len(y_errs[y_err_col]) > 0: + ymax = max(y_lists[y_col]+[ymax])+max(y_errs[y_err_col])*2 + else: + ymax = max(y_lists[y_col]+[ymax]) + + if len(y_errs[y_err_col]) == 0: + y_err=None + else: + y_err=y_errs[y_err_col] + + ax1.axis([ts2d(min(x_list)),ts2d(max(x_list)),0,ymax]) + + color = options.color_list[i%len(options.color_list)] + p, = ax1.plot_date(ts2d(x_list), y_lists[y_col], + xdate=True, ydate=False, marker='.', markersize=3, + color=color, linewidth=(1 if y_err is None else 1.5), + linestyle='-', figure=fig, label=y_col) + + if y_err is not None: + ax1.errorbar(ts2d(x_list), y_lists[y_col], + yerr=y_err, errorevery=4, + ecolor=color, + fmt=None, + figure=fig, + label=y_err_col) + + ncols = len(options.columns) + if options.date_vline: + ncols+=len(options.date_vline) + if ncols > 1: # if == 1, causes divide-by-zero error in library. + # NOTE: some versions support fontsize here, others don't + leg = ax1.legend(bbox_to_anchor=(0., 1.15, 1., .08), loc=1, + ncol=ncols, mode="expand", + borderaxespad=0.) # , fontsize=10) + # This always works. + for t in leg.get_texts(): + t.set_fontsize('small') + + ax = ax1 # by default, use the first axis + if options.count_column: + ax = ax2 # but if the count plot is enabled, use ax2 + ax.plot_date(ts2d(x_list), c_list, linestyle='-', + marker=None, drawstyle='steps-mid') + + # NOTE: when this is set earlier, somehow it's reset. + # TODO: may want to make the date format configurable. + xformatter = matplotlib.dates.DateFormatter("%b %d") + ax.xaxis.set_major_formatter(xformatter) + ax.xaxis.set_minor_formatter(xformatter) + + if options.output: + pylab.savefig(options.output) + else: + pylab.show() + +def split_column_name(column): + y_fields = column.split(",") + if len(y_fields) == 1: + y_col = column + y_err_col = None + elif len(y_fields) == 2: + (y_col,y_err_col) = y_fields + else: + print "Error: wrong column specification: %s" % column + print "Error: we only accept or ," + sys.exit(1) + return (y_col, y_err_col) + +ONE_DAY=(60*60*24) +def has_recent_mtime(filename, compare_to, threshold=ONE_DAY): + """ + Args: + filename - string, the file we're deciding has a recent mtime or not. + compare_to - string, compare filename's mtime to compare_to's mtime. + threshold - seconds, window within which mtime is considered 'recent' + Returns: + True - if filename mtime is within 'threshold' of current time and + greater than compare_to's mtime. + False - otherwise. + """ + if not os.path.exists(filename) or not os.path.exists(compare_to): + return False + + s = os.stat(filename) + c = os.stat(compare_to) + + if s.st_mtime < c.st_mtime: + return False + + if s.st_mtime + threshold < time.time(): + return False + + return True + +def merge_rows(row_list, options): + """ Merges all DictReader() rows in row_list and return a single dict. It + is expected that options.timestamp is present in each row, and that the + value there is equal. If they are not all equal, an AssertionError is + raised. + + Args: + row_list - list of rows from DictReader().next() + options - result of parse_args() + + Raises: + AssertionError - if the timestamps for the row values do not match, an + assertion is raised. + Returns: + merged dict. + """ + ts=None + merge_dict = {} + for d in row_list: + if ts is None: + ts = d[options.timestamp] + else: + assert(ts == d[options.timestamp]) + merge_dict.update(d) + return merge_dict + +def merge_csvfiles(options): + + """ Think of this as a 'join' across options.mergefiles on equal values of + the column options.timestamp. This function takes each file in + options.mergefiles, reads them, and combines their columns in + options.output. The only common column should be options.timestamp. The + results are undefined if the mergefiles share other column names. + + Args: + options.mergefiles - list of csv filenames + options.output - filename of merged csv file from this operation + Returns: + bool - True if success + Raises: + AssertionError - if merging encounters an error. + """ + + records = {} + all_header_names = [] + records_list = [] + + # collect all header fields from mergefiles + for filename in options.mergefiles: + records = read_csvfile(filename, True) + records_list.append(records) + all_header_names += records.fieldnames + all_header_names = sorted(set(all_header_names)) + + # eliminate duplicate $header + output_fd = open(options.output,'w') + writer = csv.DictWriter(output_fd, all_header_names) + writer.writeheader() + + try: + # read all values until StopIteration is reached. + while True: + merge_list = [ records.next() for records in records_list ] + merge_dict = merge_rows(merge_list, options) + writer.writerow(merge_dict) + + except StopIteration: + pass + + output_fd.close() + return True + +def main(options): + """ + The primary modes of operation are: + * run query & save output, read cache, plot results + --query + * read cache or csv file, plot results + --csvfile + * merge csv files, noplot + --mergefiles --mergefiles --output + """ + + cache_file = None + if options.query: + # RUN QUERY + (query_file,cache_file) = get_filenames(options.query, + options.define_values) + query_str = process_macros(read_file(query_file), options.define_values) + + if options.refresh or not has_recent_mtime(cache_file, query_file): + if options.api: + bigquery_api(query_str, cache_file, options) + else: + bigquery_exec(query_str, cache_file, options) + + elif len(options.mergefiles) >= 2: + # HANDLE MERGE (if applicable) + success = merge_csvfiles(options) + return + elif options.csvfile is not None: + # USE EXPLICIT CSV FILE (instead of running a query first) + cache_file = options.csvfile + else: + print "Error: failed to identify operation." + sys.exit(1) + + if not options.plot: + return + + # READ RECORDS + records = read_csvfile(cache_file, True) + + # INITIALIZE + x_list = [] + y_lists = {} + y_errs = {} + c_list = [] + split_column_names = map(split_column_name, options.columns) + for y_col,y_err_col in split_column_names: + y_lists[y_col] = [] + y_errs[y_err_col] = [] + + # SORT DATA + # TODO/NOTE: expects 'timestamp' values are sorted in ascending order + for row in records: + if options.verbose: print row + + try: + # NOTE: First convert all values in this row. + x = float(row[options.timestamp]) + if options.count_column: c = float(row[options.count_column]) + y = {col:None for col,err in split_column_names} + e = {err:None for col,err in split_column_names if err is not None} + for y_col,y_err_col in split_column_names: + y[y_col] = float(row[y_col]) + if y_err_col: e[y_err_col] = float(row[y_err_col]) + + # NOTE: only save values if all converted successfully + x_list.append(x) + if options.count_column: c_list.append(c) + for y_col,y_err_col in split_column_names: + y_lists[y_col].append(y[y_col]) + if y_err_col: y_errs[y_err_col].append(e[y_err_col]) + + except ValueError: + # NOTE: a conversion failed. and, b/c conversion & save are + # separate, the data saved so far is still valid. + continue + + # VISUALIZE + plot_data(x_list, y_lists, y_errs, c_list, options) + +if __name__ == "__main__": + (options, args) = parse_args() + main(options) From 4ab0485676212b5f753540cb78df48b20c9280e4 Mon Sep 17 00:00:00 2001 From: Stephen Soltesz Date: Sun, 25 Aug 2013 23:52:10 -0400 Subject: [PATCH 04/14] add readme for qv --- README.queryview | 54 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 README.queryview diff --git a/README.queryview b/README.queryview new file mode 100644 index 0000000..5113d7a --- /dev/null +++ b/README.queryview @@ -0,0 +1,54 @@ +QueryView +========= + +Queryview can help an analyst quickly perform BigQuery queries, cache the +result, and graph the data in the result for review and sharing. + +Installation +------------ + +The script is self-contained, so can be placed anywhere. Just be aware that it +reads SQL files and writes cache files from/to the $PWD. + +Queryview relies on two dependencies: + + * BigQuery command line tool `bq`: + + https://code.google.com/p/google-bigquery-tools/" + + * `matplotlib`: http://matplotlib.org/ + + Most Linux distributions have this available as a package as + 'python-matplotlib' or 'py-matplotlib', or similar. + +Usage +----- + +See the usage from: + + ./queryview.py --help + +Examples: +-------- + +You may immediately explore any web100 variable for NDT download tests using +the included template sql/ndt-tmpl-web100.sql. It expects three variables: + + * DATETABLE for the months of the query. + * ADDRESS for a single IP address of a host + * WEB100VAR the full path of the web100 variable in m_lab bigquery table. + + ./queryview.py -q ndt-tmpl-web100 \ + -D DATETABLE=[m_lab.2013_03],[m_lab.2013_04],[m_lab.2013_05] \ + -D ADDRESS=38.102.0.109 \ + -D WEB100VAR=web100_log_entry.snap.SmoothedRTT \ + -t day_timestamp \ + -l med_web100 -C blue \ + -l quantile_10 -C green \ + -l quantile_90 -C green \ + --count_column test_count --ymax 500 \ + --date_vline 2013-05-09 \ + --title 'Daily web100.SmoothedRTT for NDT on mlab3.sea01' \ + --ylabel Milliseconds \ + --output mlab3.sea01.SmoothedRTT.png + From f1e4815b8e94dd4705aa464db1d06445803098a2 Mon Sep 17 00:00:00 2001 From: Stephen Soltesz Date: Sun, 25 Aug 2013 23:56:27 -0400 Subject: [PATCH 05/14] big fixs, use local queryview --- setup.sh | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/setup.sh b/setup.sh index e44167a..61de32a 100755 --- a/setup.sh +++ b/setup.sh @@ -50,7 +50,8 @@ function generate_ispquery () { m4 -DISP_FILTER_FILENAME=$ISPNAME.input sql/ndt-tmpl-generic.m4.sql > sql/$ISPNAME.lga02.sql fi - QV=$HOME/source/m-lab.analysis/queryview/queryview.py + mkdir -p graphs + QV=./queryview.py $QV --query $ISPNAME.lga01 \ --count test_count \ --timestamp day_timestamp \ @@ -70,6 +71,6 @@ function generate_ispquery () { generate_ispquery comcast generate_ispquery cablevision -generate_ispquery warner -generate_ispquery rcn -generate_ispquery verizon +#generate_ispquery warner +#generate_ispquery rcn +#generate_ispquery verizon From d117a46a9d49a3cd2d0c14fadde112c761f8006d Mon Sep 17 00:00:00 2001 From: "soltesz@opentechinstitute.org" Date: Mon, 26 Aug 2013 18:07:33 -0400 Subject: [PATCH 06/14] work in progress --- TODO | 18 ++++++++++ sql/stage1-ndt.m4.sql | 43 ++++++++++++++++++++++++ sql/stage2-ndt.m4.sql | 23 +++++++++++++ stage1.sh | 76 +++++++++++++++++++++++++++++++++++++++++++ stage2.sh | 61 ++++++++++++++++++++++++++++++++++ 5 files changed, 221 insertions(+) create mode 100644 TODO create mode 100644 sql/stage1-ndt.m4.sql create mode 100644 sql/stage2-ndt.m4.sql create mode 100755 stage1.sh create mode 100755 stage2.sh diff --git a/TODO b/TODO new file mode 100644 index 0000000..ea8c98a --- /dev/null +++ b/TODO @@ -0,0 +1,18 @@ +Stage 1: +======== + + * create IP filters from ISPs in GeoIPASNum2 to get raw NDT results + +Stage 2 +======= + + * create filters from raw results of stage 1 (sip, cip, ts) to extract + (sip, cip, ts, test_id) + +Stage 3 +======= + + * create filters from raw results of stage 2 (sip, cip, ts, test_id) to extract + all hops for test_id (sip, cip, test_id, h1...hn) + + diff --git a/sql/stage1-ndt.m4.sql b/sql/stage1-ndt.m4.sql new file mode 100644 index 0000000..b20f274 --- /dev/null +++ b/sql/stage1-ndt.m4.sql @@ -0,0 +1,43 @@ +SELECT + INTEGER(web100_log_entry.log_time) AS day_timestamp, + web100_log_entry.connection_spec.local_ip AS server_ip, + web100_log_entry.connection_spec.remote_ip AS client_ip, + + -- RATE + 8*web100_log_entry.snap.HCThruOctetsAcked/( + web100_log_entry.snap.SndLimTimeRwin + + web100_log_entry.snap.SndLimTimeCwnd + + web100_log_entry.snap.SndLimTimeSnd) AS raw_download_rate, + -- RETRANSMISSION + (web100_log_entry.snap.OctetsRetrans/web100_log_entry.snap.DataOctetsOut) AS raw_retrans, + -- TODO: maybe network or client or server-limited time ratios? +FROM + DATETABLE +WHERE + IS_EXPLICITLY_DEFINED(project) + AND IS_EXPLICITLY_DEFINED(connection_spec.data_direction) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.is_last_entry) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.snap.HCThruOctetsAcked) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.snap.CongSignals) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.connection_spec.remote_ip) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.connection_spec.local_ip) + -- NDT download + AND project = 0 + AND connection_spec.data_direction = 1 + AND web100_log_entry.is_last_entry = True + AND web100_log_entry.snap.HCThruOctetsAcked >= 8192 + AND web100_log_entry.snap.HCThruOctetsAcked < 1000000000 + AND web100_log_entry.snap.CongSignals > 0 + AND (web100_log_entry.snap.SndLimTimeRwin + + web100_log_entry.snap.SndLimTimeCwnd + + web100_log_entry.snap.SndLimTimeSnd) >= 9000000 + AND (web100_log_entry.snap.SndLimTimeRwin + + web100_log_entry.snap.SndLimTimeCwnd + + web100_log_entry.snap.SndLimTimeSnd) < 3600000000 + AND web100_log_entry.snap.MinRTT < 1e7 + -- restrict to NY lga01 servers, and given ISP address ranges. + AND web100_log_entry.connection_spec.local_ip IN(SERVERIPS) + AND ( include(ISP_FILTER_FILENAME) ) +ORDER BY + day_timestamp, server_ip, client_ip; + diff --git a/sql/stage2-ndt.m4.sql b/sql/stage2-ndt.m4.sql new file mode 100644 index 0000000..d11574c --- /dev/null +++ b/sql/stage2-ndt.m4.sql @@ -0,0 +1,23 @@ +SELECT + INTEGER(web100_log_entry.log_time) AS day_timestamp, + web100_log_entry.connection_spec.local_ip AS server_ip, + web100_log_entry.connection_spec.remote_ip AS client_ip, + test_id AS test_id +FROM + DATETABLE +WHERE + IS_EXPLICITLY_DEFINED(project) + AND IS_EXPLICITLY_DEFINED(connection_spec.data_direction) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.is_last_entry) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.snap.HCThruOctetsAcked) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.snap.CongSignals) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.connection_spec.remote_ip) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.connection_spec.local_ip) + -- traceroute traffic + AND project = 3 + -- restrict to NY servers, and given filters ranges. + AND web100_log_entry.connection_spec.local_ip IN(SERVERIPS) + AND ( include(STAGE2_FILTER_FILENAME) ) +ORDER BY + day_timestamp, server_ip, client_ip; + diff --git a/stage1.sh b/stage1.sh new file mode 100755 index 0000000..8d0172a --- /dev/null +++ b/stage1.sh @@ -0,0 +1,76 @@ +#!/usr/bin/env bash + +function find_command () { + local command=$1 + if ! type -P $command &> /dev/null ; then + echo "ERROR: could no locate '$command' in current PATH" + echo "ERROR: either install $command, or update PATH" + exit 1 + fi +} + +find_command wget +find_command m4 + + +IP2ASNFILE=GeoIPASNum2 +if ! test -f $IP2ASNFILE.zip ; then + wget http://download.maxmind.com/download/geoip/database/asnum/$IP2ASNFILE.zip +fi +if ! test -f $IP2ASNFILE.csv ; then + unzip $IP2ASNFILE.zip + if test $? -ne 0 ; then + echo "Error: failed to unzip $IP2ASNFILE.zip" + exit 1 + fi +fi +set -e +set -x + +function generate_ispquery () { + local ISPNAME=$1 + AFTERFIRST= + rm -f $ISPNAME.input + + if ! test -f $ISPNAME.input ; then + FILTER_PREFIX="PARSE_IP(web100_log_entry.connection_spec.remote_ip) " + grep -i $ISPNAME $IP2ASNFILE.csv | \ + awk -F, '{print $1,$2}' | \ + while read IP_low IP_high ; do + if test -n "$AFTERFIRST" ; then echo " OR" ; fi + FILTER="$FILTER_PREFIX BETWEEN $IP_low AND $IP_high " + echo -n " $FILTER" + AFTERFIRST=1 + done > $ISPNAME.input + fi + + if ! test -f sql/$ISPNAME.s1.lga01.sql ; then + m4 -DISP_FILTER_FILENAME=$ISPNAME.input \ + -DDATETABLE=[m_lab.2013_08] \ + -DSERVERIPS="'74.63.50.19','74.63.50.32','74.63.50.47'" \ + sql/stage1-ndt.m4.sql > sql/$ISPNAME.s1.lga01.sql + fi + if ! test -f sql/$ISPNAME.s1.lga02.sql ; then + m4 -DISP_FILTER_FILENAME=$ISPNAME.input \ + -DDATETABLE=[m_lab.2013_08] \ + -DSERVERIPS="'38.106.70.147','38.106.70.160','38.106.70.173'" \ + sql/stage1-ndt.m4.sql > sql/$ISPNAME.s1.lga02.sql + fi + + QV=./queryview.py + $QV --query $ISPNAME.s1.lga01 \ + --noplot \ + --timestamp day_timestamp \ + -l junk + + $QV --query $ISPNAME.s1.lga02 \ + --noplot \ + --timestamp day_timestamp \ + -l junk +} + +generate_ispquery comcast +generate_ispquery cablevision +#generate_ispquery warner +#generate_ispquery rcn +#generate_ispquery verizon diff --git a/stage2.sh b/stage2.sh new file mode 100755 index 0000000..109fa69 --- /dev/null +++ b/stage2.sh @@ -0,0 +1,61 @@ +#!/usr/bin/env bash + +set -e +set -x + +function generate_ispquery () { + local ISPNAME=$1 + AFTERFIRST= + stage1input=$ISPNAME.s1 + stage2output=$ISPNAME.s2 + stage2sql=$ISPNAME.s2 + + rm -f $stage2output + + if ! test -f $stage2output.lga01.input ; then + + FILTER_PREFIX="" + awk -F, '{print $1,$2,$3}' $stage1input | \ + while read ts server_ip client_ip ; do + if test -n "$AFTERFIRST" ; then echo " OR" ; fi + + FILTER="( $ts BETWEEN "$(( $ts-120))" AND "$(( $ts+120 ))" AND + connection_spec.client_ip='$client_ip' )" + echo -n " $FILTER" + AFTERFIRST=1 + done > $stage2output + fi + + # '38.106.70.146','38.106.70.151','38.106.70.172' + # '74.63.50.10','74.63.50.23','74.63.50.43' + + if ! test -f sql/$stage2sql.lga01.sql ; then + m4 -DISP_FILTER_FILENAME=$stage2output \ + -DDATETABLE=[m_lab.2013_08] \ + -DSERVERIPS="'74.63.50.19','74.63.50.32','74.63.50.47'" \ + sql/stage2-ndt.m4.sql > sql/$stage2sql.lga01.sql + fi + if ! test -f sql/$stage2sql.lga02.sql ; then + m4 -DISP_FILTER_FILENAME=$stage2output \ + -DDATETABLE=[m_lab.2013_08] \ + -DSERVERIPS="'38.106.70.147','38.106.70.160','38.106.70.173'" \ + sql/stage1-ndt.m4.sql > sql/$ISPNAME.s1.lga02.sql + fi + + QV=./queryview.py + $QV --query $ISPNAME.s1.lga01 \ + --noplot \ + --timestamp day_timestamp \ + -l junk + + $QV --query $ISPNAME.s1.lga02 \ + --noplot \ + --timestamp day_timestamp \ + -l junk +} + +generate_ispquery comcast +generate_ispquery cablevision +#generate_ispquery warner +#generate_ispquery rcn +#generate_ispquery verizon From 201daade403dc52d9fe79a2975ddbf1b39856883 Mon Sep 17 00:00:00 2001 From: "soltesz@opentechinstitute.org" Date: Mon, 26 Aug 2013 18:09:24 -0400 Subject: [PATCH 07/14] move templates to m4, to make qv cache filenames flat. add titles/ylabels --- setup.sh | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/setup.sh b/setup.sh index 61de32a..08a02ed 100755 --- a/setup.sh +++ b/setup.sh @@ -44,10 +44,16 @@ function generate_ispquery () { fi if ! test -f sql/$ISPNAME.lga01.sql ; then - m4 -DISP_FILTER_FILENAME=$ISPNAME.input sql/ndt-tmpl-generic.m4.sql > sql/$ISPNAME.lga01.sql + m4 -DISP_FILTER_FILENAME=$ISPNAME.input \ + -DDATETABLE=[m_lab.2013_08] \ + -DSERVERIPS="'74.63.50.19','74.63.50.32','74.63.50.47'" \ + sql/ndt-tmpl-generic.m4.sql > sql/$ISPNAME.lga01.sql fi if ! test -f sql/$ISPNAME.lga02.sql ; then - m4 -DISP_FILTER_FILENAME=$ISPNAME.input sql/ndt-tmpl-generic.m4.sql > sql/$ISPNAME.lga02.sql + m4 -DISP_FILTER_FILENAME=$ISPNAME.input \ + -DDATETABLE=[m_lab.2013_08] \ + -DSERVERIPS="'38.106.70.147','38.106.70.160','38.106.70.173'" \ + sql/ndt-tmpl-generic.m4.sql > sql/$ISPNAME.lga02.sql fi mkdir -p graphs @@ -56,16 +62,16 @@ function generate_ispquery () { --count test_count \ --timestamp day_timestamp \ -l med_rate \ - -D DATETABLE=[m_lab.2013_08] \ - -D SERVERIPS="'74.63.50.19','74.63.50.32','74.63.50.47'" \ + --ylabel "Mbps" \ + --title "LGA01 - Internap" \ --output graphs/$ISPNAME.lga01.png $QV --query $ISPNAME.lga02 \ --count test_count \ --timestamp day_timestamp \ -l med_rate \ - -D DATETABLE=[m_lab.2013_08] \ - -D SERVERIPS="'38.106.70.147','38.106.70.160','38.106.70.173'" \ + --ylabel "Mbps" \ + --title "LGA02 - Cogent" \ --output graphs/$ISPNAME.lga02.png } From 462c95f37be21c8a09222da9464f41ec8c7d4c22 Mon Sep 17 00:00:00 2001 From: "soltesz@opentechinstitute.org" Date: Tue, 27 Aug 2013 09:57:43 -0400 Subject: [PATCH 08/14] include additional queries to fetch per-isp, per-site, trace data --- TODO | 14 ++- queryview.py | 12 ++- setup.sh | 14 +-- sql/ndt-tmpl-generic.m4.sql | 53 ---------- sql/stage2-ndt.m4.sql | 23 ----- stage1.sh | 76 -------------- stage2.sh | 61 ----------- stages.sh | 176 ++++++++++++++++++++++++++++++++ {sql => tmpl}/stage1-ndt.m4.sql | 0 tmpl/stage2-ndt.m4.sql | 19 ++++ tmpl/stage3-ndt.m4.sql | 19 ++++ 11 files changed, 240 insertions(+), 227 deletions(-) delete mode 100644 sql/ndt-tmpl-generic.m4.sql delete mode 100644 sql/stage2-ndt.m4.sql delete mode 100755 stage1.sh delete mode 100755 stage2.sh create mode 100755 stages.sh rename {sql => tmpl}/stage1-ndt.m4.sql (100%) create mode 100644 tmpl/stage2-ndt.m4.sql create mode 100644 tmpl/stage3-ndt.m4.sql diff --git a/TODO b/TODO index ea8c98a..57f44bc 100644 --- a/TODO +++ b/TODO @@ -1,18 +1,26 @@ -Stage 1: -======== +Stage 1 +======= * create IP filters from ISPs in GeoIPASNum2 to get raw NDT results + * input: isp names, geoipasnum2, + * intermediate: stage1-per-isp-asn-filters + * output: stage1-per-isp-site-raw-downloads (sip, cip, ts, bw) Stage 2 ======= * create filters from raw results of stage 1 (sip, cip, ts) to extract (sip, cip, ts, test_id) + * input: stage1-per-isp-site-raw-downloads (which include isp name) + * intermediate: stage2-filters-from-stage1-per-isp-raw-download + * output: stage2-per-isp-site-testids (sip', cip, ts, test_id) Stage 3 ======= * create filters from raw results of stage 2 (sip, cip, ts, test_id) to extract all hops for test_id (sip, cip, test_id, h1...hn) - + * input: stage2-per-isp-site-testids + * intermediate: stage3-filters-from-stage2 + * output: stage3-per-isp-site-hops (sip', [h1, ... hn], cip, ts, test_id) diff --git a/queryview.py b/queryview.py index 261965b..55ac2b7 100755 --- a/queryview.py +++ b/queryview.py @@ -243,6 +243,7 @@ def bigquery_api(query_string, output_filename, options): except HttpError as err: logging.error('Error running query: %s', pprint.pprint(err.content)) + print 'Error running query: %s' % pprint.pprint(status['status']) sys.exit(1) except AccessTokenRefreshError: @@ -330,11 +331,11 @@ def parse_args(): print "Error: please specify at least 2 csv files to merge." sys.exit(1) - if options.timestamp is None: + if options.timestamp is None and options.plot: print "Error: Please provide --timestamp " sys.exit(1) - if len(options.columns) == 0 and len(options.mergefiles) == 0: + if len(options.columns) == 0 and len(options.mergefiles) == 0 and options.plot: print ("Error: Provide at least one line, --column ") sys.exit(1) @@ -361,8 +362,11 @@ def get_filenames(query_name, define_values=[]): max_len = os.statvfs(CSV_PATH)[statvfs.F_NAMEMAX] extension = extension[:max_len-len(query_name)-4] - query_filename = os.path.join(SQL_PATH, query_name+'.sql') - cache_filename = os.path.join(CSV_PATH, query_name+extension+'.csv') + query_filename = query_name + cache_filename = os.path.join(CSV_PATH, os.path.basename(query_name)+extension+'.csv') + if not os.path.exists(query_name): + query_filename = os.path.join(SQL_PATH, query_name+'.sql') + cache_filename = os.path.join(CSV_PATH, query_name+extension+'.csv') return (query_filename, cache_filename) def read_csvfile(cache_file, return_dicts): diff --git a/setup.sh b/setup.sh index 08a02ed..927d72b 100755 --- a/setup.sh +++ b/setup.sh @@ -29,9 +29,9 @@ fi function generate_ispquery () { local ISPNAME=$1 AFTERFIRST= - rm -f $ISPNAME.input + rm -f input/$ISPNAME.input - if ! test -f $ISPNAME.input ; then + if ! test -f input/$ISPNAME.input ; then FILTER_PREFIX="PARSE_IP(web100_log_entry.connection_spec.remote_ip) " grep -i $ISPNAME $IP2ASNFILE.csv | \ awk -F, '{print $1,$2}' | \ @@ -40,20 +40,20 @@ function generate_ispquery () { FILTER="$FILTER_PREFIX BETWEEN $IP_low AND $IP_high " echo -n " $FILTER" AFTERFIRST=1 - done > $ISPNAME.input + done > input/$ISPNAME.input fi if ! test -f sql/$ISPNAME.lga01.sql ; then - m4 -DISP_FILTER_FILENAME=$ISPNAME.input \ + m4 -DISP_FILTER_FILENAME=input/$ISPNAME.input \ -DDATETABLE=[m_lab.2013_08] \ -DSERVERIPS="'74.63.50.19','74.63.50.32','74.63.50.47'" \ - sql/ndt-tmpl-generic.m4.sql > sql/$ISPNAME.lga01.sql + tmpl/ndt-tmpl-generic.m4.sql > sql/$ISPNAME.lga01.sql fi if ! test -f sql/$ISPNAME.lga02.sql ; then - m4 -DISP_FILTER_FILENAME=$ISPNAME.input \ + m4 -DISP_FILTER_FILENAME=input/$ISPNAME.input \ -DDATETABLE=[m_lab.2013_08] \ -DSERVERIPS="'38.106.70.147','38.106.70.160','38.106.70.173'" \ - sql/ndt-tmpl-generic.m4.sql > sql/$ISPNAME.lga02.sql + tmpl/ndt-tmpl-generic.m4.sql > sql/$ISPNAME.lga02.sql fi mkdir -p graphs diff --git a/sql/ndt-tmpl-generic.m4.sql b/sql/ndt-tmpl-generic.m4.sql deleted file mode 100644 index b008e8e..0000000 --- a/sql/ndt-tmpl-generic.m4.sql +++ /dev/null @@ -1,53 +0,0 @@ -SELECT - -- web100_log_entry.connection_spec.local_ip AS server_ip, - INTEGER(web100_log_entry.log_time/7200)*7200 AS day_timestamp, - COUNT(web100_log_entry.log_time) AS test_count, - -- RATE - NTH(90,QUANTILES(8*web100_log_entry.snap.HCThruOctetsAcked/( - web100_log_entry.snap.SndLimTimeRwin + - web100_log_entry.snap.SndLimTimeCwnd + - web100_log_entry.snap.SndLimTimeSnd),101)) as quantile_90, - NTH(10,QUANTILES(8*web100_log_entry.snap.HCThruOctetsAcked/( - web100_log_entry.snap.SndLimTimeRwin + - web100_log_entry.snap.SndLimTimeCwnd + - web100_log_entry.snap.SndLimTimeSnd),101)) as quantile_10, - NTH(50,QUANTILES(8*web100_log_entry.snap.HCThruOctetsAcked/( - web100_log_entry.snap.SndLimTimeRwin + - web100_log_entry.snap.SndLimTimeCwnd + - web100_log_entry.snap.SndLimTimeSnd),101)) as med_rate, - STDDEV(8*web100_log_entry.snap.HCThruOctetsAcked/( - web100_log_entry.snap.SndLimTimeRwin + - web100_log_entry.snap.SndLimTimeCwnd + - web100_log_entry.snap.SndLimTimeSnd)) as std_rate, -FROM - DATETABLE -WHERE - IS_EXPLICITLY_DEFINED(project) - AND IS_EXPLICITLY_DEFINED(connection_spec.data_direction) - AND IS_EXPLICITLY_DEFINED(web100_log_entry.is_last_entry) - AND IS_EXPLICITLY_DEFINED(web100_log_entry.snap.HCThruOctetsAcked) - AND IS_EXPLICITLY_DEFINED(web100_log_entry.snap.CongSignals) - AND IS_EXPLICITLY_DEFINED(web100_log_entry.connection_spec.remote_ip) - AND IS_EXPLICITLY_DEFINED(web100_log_entry.connection_spec.local_ip) - -- NDT download - AND project = 0 - AND connection_spec.data_direction = 1 - AND web100_log_entry.is_last_entry = True - AND web100_log_entry.snap.HCThruOctetsAcked >= 8192 - AND web100_log_entry.snap.HCThruOctetsAcked < 1000000000 - AND web100_log_entry.snap.CongSignals > 0 - AND (web100_log_entry.snap.SndLimTimeRwin + - web100_log_entry.snap.SndLimTimeCwnd + - web100_log_entry.snap.SndLimTimeSnd) >= 9000000 - AND (web100_log_entry.snap.SndLimTimeRwin + - web100_log_entry.snap.SndLimTimeCwnd + - web100_log_entry.snap.SndLimTimeSnd) < 3600000000 - AND web100_log_entry.snap.MinRTT < 1e7 - -- restrict to NY lga01 servers, and given ISP address ranges. - AND web100_log_entry.connection_spec.local_ip IN(SERVERIPS) - AND ( include(ISP_FILTER_FILENAME) ) -GROUP BY - day_timestamp -ORDER BY - day_timestamp; - diff --git a/sql/stage2-ndt.m4.sql b/sql/stage2-ndt.m4.sql deleted file mode 100644 index d11574c..0000000 --- a/sql/stage2-ndt.m4.sql +++ /dev/null @@ -1,23 +0,0 @@ -SELECT - INTEGER(web100_log_entry.log_time) AS day_timestamp, - web100_log_entry.connection_spec.local_ip AS server_ip, - web100_log_entry.connection_spec.remote_ip AS client_ip, - test_id AS test_id -FROM - DATETABLE -WHERE - IS_EXPLICITLY_DEFINED(project) - AND IS_EXPLICITLY_DEFINED(connection_spec.data_direction) - AND IS_EXPLICITLY_DEFINED(web100_log_entry.is_last_entry) - AND IS_EXPLICITLY_DEFINED(web100_log_entry.snap.HCThruOctetsAcked) - AND IS_EXPLICITLY_DEFINED(web100_log_entry.snap.CongSignals) - AND IS_EXPLICITLY_DEFINED(web100_log_entry.connection_spec.remote_ip) - AND IS_EXPLICITLY_DEFINED(web100_log_entry.connection_spec.local_ip) - -- traceroute traffic - AND project = 3 - -- restrict to NY servers, and given filters ranges. - AND web100_log_entry.connection_spec.local_ip IN(SERVERIPS) - AND ( include(STAGE2_FILTER_FILENAME) ) -ORDER BY - day_timestamp, server_ip, client_ip; - diff --git a/stage1.sh b/stage1.sh deleted file mode 100755 index 8d0172a..0000000 --- a/stage1.sh +++ /dev/null @@ -1,76 +0,0 @@ -#!/usr/bin/env bash - -function find_command () { - local command=$1 - if ! type -P $command &> /dev/null ; then - echo "ERROR: could no locate '$command' in current PATH" - echo "ERROR: either install $command, or update PATH" - exit 1 - fi -} - -find_command wget -find_command m4 - - -IP2ASNFILE=GeoIPASNum2 -if ! test -f $IP2ASNFILE.zip ; then - wget http://download.maxmind.com/download/geoip/database/asnum/$IP2ASNFILE.zip -fi -if ! test -f $IP2ASNFILE.csv ; then - unzip $IP2ASNFILE.zip - if test $? -ne 0 ; then - echo "Error: failed to unzip $IP2ASNFILE.zip" - exit 1 - fi -fi -set -e -set -x - -function generate_ispquery () { - local ISPNAME=$1 - AFTERFIRST= - rm -f $ISPNAME.input - - if ! test -f $ISPNAME.input ; then - FILTER_PREFIX="PARSE_IP(web100_log_entry.connection_spec.remote_ip) " - grep -i $ISPNAME $IP2ASNFILE.csv | \ - awk -F, '{print $1,$2}' | \ - while read IP_low IP_high ; do - if test -n "$AFTERFIRST" ; then echo " OR" ; fi - FILTER="$FILTER_PREFIX BETWEEN $IP_low AND $IP_high " - echo -n " $FILTER" - AFTERFIRST=1 - done > $ISPNAME.input - fi - - if ! test -f sql/$ISPNAME.s1.lga01.sql ; then - m4 -DISP_FILTER_FILENAME=$ISPNAME.input \ - -DDATETABLE=[m_lab.2013_08] \ - -DSERVERIPS="'74.63.50.19','74.63.50.32','74.63.50.47'" \ - sql/stage1-ndt.m4.sql > sql/$ISPNAME.s1.lga01.sql - fi - if ! test -f sql/$ISPNAME.s1.lga02.sql ; then - m4 -DISP_FILTER_FILENAME=$ISPNAME.input \ - -DDATETABLE=[m_lab.2013_08] \ - -DSERVERIPS="'38.106.70.147','38.106.70.160','38.106.70.173'" \ - sql/stage1-ndt.m4.sql > sql/$ISPNAME.s1.lga02.sql - fi - - QV=./queryview.py - $QV --query $ISPNAME.s1.lga01 \ - --noplot \ - --timestamp day_timestamp \ - -l junk - - $QV --query $ISPNAME.s1.lga02 \ - --noplot \ - --timestamp day_timestamp \ - -l junk -} - -generate_ispquery comcast -generate_ispquery cablevision -#generate_ispquery warner -#generate_ispquery rcn -#generate_ispquery verizon diff --git a/stage2.sh b/stage2.sh deleted file mode 100755 index 109fa69..0000000 --- a/stage2.sh +++ /dev/null @@ -1,61 +0,0 @@ -#!/usr/bin/env bash - -set -e -set -x - -function generate_ispquery () { - local ISPNAME=$1 - AFTERFIRST= - stage1input=$ISPNAME.s1 - stage2output=$ISPNAME.s2 - stage2sql=$ISPNAME.s2 - - rm -f $stage2output - - if ! test -f $stage2output.lga01.input ; then - - FILTER_PREFIX="" - awk -F, '{print $1,$2,$3}' $stage1input | \ - while read ts server_ip client_ip ; do - if test -n "$AFTERFIRST" ; then echo " OR" ; fi - - FILTER="( $ts BETWEEN "$(( $ts-120))" AND "$(( $ts+120 ))" AND - connection_spec.client_ip='$client_ip' )" - echo -n " $FILTER" - AFTERFIRST=1 - done > $stage2output - fi - - # '38.106.70.146','38.106.70.151','38.106.70.172' - # '74.63.50.10','74.63.50.23','74.63.50.43' - - if ! test -f sql/$stage2sql.lga01.sql ; then - m4 -DISP_FILTER_FILENAME=$stage2output \ - -DDATETABLE=[m_lab.2013_08] \ - -DSERVERIPS="'74.63.50.19','74.63.50.32','74.63.50.47'" \ - sql/stage2-ndt.m4.sql > sql/$stage2sql.lga01.sql - fi - if ! test -f sql/$stage2sql.lga02.sql ; then - m4 -DISP_FILTER_FILENAME=$stage2output \ - -DDATETABLE=[m_lab.2013_08] \ - -DSERVERIPS="'38.106.70.147','38.106.70.160','38.106.70.173'" \ - sql/stage1-ndt.m4.sql > sql/$ISPNAME.s1.lga02.sql - fi - - QV=./queryview.py - $QV --query $ISPNAME.s1.lga01 \ - --noplot \ - --timestamp day_timestamp \ - -l junk - - $QV --query $ISPNAME.s1.lga02 \ - --noplot \ - --timestamp day_timestamp \ - -l junk -} - -generate_ispquery comcast -generate_ispquery cablevision -#generate_ispquery warner -#generate_ispquery rcn -#generate_ispquery verizon diff --git a/stages.sh b/stages.sh new file mode 100755 index 0000000..06d1b33 --- /dev/null +++ b/stages.sh @@ -0,0 +1,176 @@ +#!/usr/bin/env bash + +function find_command () { + local command=$1 + if ! type -P $command &> /dev/null ; then + echo "ERROR: could no locate '$command' in current PATH" + echo "ERROR: either install $command, or update PATH" + exit 1 + fi +} + +find_command wget +find_command m4 + + +IP2ASNFILE=GeoIPASNum2 +if ! test -f $IP2ASNFILE.zip ; then + wget http://download.maxmind.com/download/geoip/database/asnum/$IP2ASNFILE.zip +fi +if ! test -f $IP2ASNFILE.csv ; then + unzip $IP2ASNFILE.zip + if test $? -ne 0 ; then + echo "Error: failed to unzip $IP2ASNFILE.zip" + exit 1 + fi +fi +mkdir -p cache +mkdir -p input +mkdir -p sql + +set -e +set -x + +function handle_stage1_query () { + local ispname=$1 + local stage=$2 + local site=$3 + local iplist=$4 + + AFTERFIRST= + filtername=input/$stage.$ispname.$site.input + sqlname=$stage.$ispname.$site.sql + rm -f $filtername + + if ! test -f $filtername ; then + FILTER_PREFIX="PARSE_IP(web100_log_entry.connection_spec.remote_ip) " + grep -i $ispname $IP2ASNFILE.csv | \ + awk -F, '{print $1,$2}' | \ + while read IP_low IP_high ; do + if test -n "$AFTERFIRST" ; then echo " OR" ; fi + FILTER="$FILTER_PREFIX BETWEEN $IP_low AND $IP_high " + echo -n " $FILTER" + AFTERFIRST=1 + done > $filtername + fi + + if ! test -f sql/$sqlname ; then + m4 -DISP_FILTER_FILENAME=$filtername \ + -DDATETABLE=[m_lab.2013_08] \ + -DSERVERIPS="$iplist" \ + tmpl/stage1-ndt.m4.sql > sql/$sqlname + fi + + QV=./queryview.py + $QV --query sql/$sqlname --noplot + +} + +function handle_stage2_query () { + local ispname=$1 + local stage=$2 + local site=$3 + local iplist=$4 + + AFTERFIRST= + + inputcsv=cache/stage1.$ispname.$site.sql.csv + filtername=input/$stage.$ispname.$site.input + sqlname=$stage.$ispname.$site.sql + + rm -f $filtername + + if ! test -f $filtername ; then + + FILTER_PREFIX="" + grep -v day_timestamp $inputcsv | awk -F, '{print $1,$2,$3}' | \ + while read ts server_ip client_ip ; do + if test -z "$AFTERFIRST" ; then + echo "connection_spec.client_ip IN(" + fi + if test -n "$AFTERFIRST" ; then + echo "," ; + fi + + #FILTER="( $ts BETWEEN "$(( $ts-300))" AND "$(( $ts+300 ))" AND + # connection_spec.client_ip='$client_ip' )" + FILTER="'$client_ip'" + echo -n " $FILTER" + AFTERFIRST=1 + done > $filtername + echo ")" >> $filtername + fi + + if ! test -f sql/$sqlname ; then + m4 -DSTAGE2_FILTER_FILENAME=$filtername \ + -DDATETABLE=[m_lab.2013_08] \ + -DSERVERIPS="$iplist" \ + tmpl/stage2-ndt.m4.sql > sql/$sqlname + fi + + QV=./queryview.py + $QV -v --query sql/$sqlname --noplot + +} + +function handle_stage3_query () { + local ispname=$1 + local stage=$2 + local site=$3 + local iplist=$4 + + AFTERFIRST= + + inputcsv=cache/stage2.$ispname.$site.sql.csv + filtername=input/$stage.$ispname.$site.input + sqlname=$stage.$ispname.$site.sql + + rm -f $filtername + + if ! test -f $filtername ; then + + FILTER_PREFIX="" + grep -v day_timestamp $inputcsv | tr ' ' ' ' | sed -e 's/ $//g' | awk -F, '{print $1,$2,$3,$4}' | \ + while read ts server_ip client_ip test_id ; do + if test -z "$AFTERFIRST" ; then + echo "test_id IN(" + fi + if test -n "$AFTERFIRST" ; then + echo "," ; + fi + FILTER="'$test_id'" + echo -n " $FILTER" + AFTERFIRST=1 + done > $filtername + echo ")" >> $filtername + fi + + if ! test -f sql/$sqlname ; then + m4 -DSTAGE3_FILTER_FILENAME=$filtername \ + -DDATETABLE=[m_lab.2013_08] \ + -DSERVERIPS="$iplist" \ + tmpl/stage3-ndt.m4.sql > sql/$sqlname + fi + + QV=./queryview.py + $QV -v --query sql/$sqlname --noplot + +} + +# NDT server ip addrs +handle_stage1_query comcast stage1 lga01 "'74.63.50.19','74.63.50.32','74.63.50.47'" +#handle_stage1_query comcast stage1 lga02 "'38.106.70.147','38.106.70.160','38.106.70.173'" +#handle_stage1_query cablevision stage1 lga01 "'74.63.50.19','74.63.50.32','74.63.50.47'" +#handle_stage1_query cablevision stage1 lga02 "'38.106.70.147','38.106.70.160','38.106.70.173'" + +# NPAD (*not* NDT) server ip addrs +handle_stage2_query comcast stage2 lga01 "'74.63.50.10','74.63.50.23','74.63.50.43'" +#handle_stage2_query comcast stage2 lga02 "'38.106.70.146','38.106.70.151','38.106.70.172'" + +# NPAD (*not* NDT) server ip addrs +handle_stage3_query comcast stage3 lga01 "'74.63.50.10','74.63.50.23','74.63.50.43'" + + +#generate_ispquery warner +#generate_ispquery rcn +#generate_ispquery verizon diff --git a/sql/stage1-ndt.m4.sql b/tmpl/stage1-ndt.m4.sql similarity index 100% rename from sql/stage1-ndt.m4.sql rename to tmpl/stage1-ndt.m4.sql diff --git a/tmpl/stage2-ndt.m4.sql b/tmpl/stage2-ndt.m4.sql new file mode 100644 index 0000000..2556238 --- /dev/null +++ b/tmpl/stage2-ndt.m4.sql @@ -0,0 +1,19 @@ +SELECT + INTEGER(log_time) AS day_timestamp, + connection_spec.server_ip AS server_ip, + connection_spec.client_ip AS client_ip, + test_id AS test_id +FROM + DATETABLE +WHERE + IS_EXPLICITLY_DEFINED(project) + -- traceroute traffic + AND project = 3 + -- restrict to NY servers, and given filters ranges. + AND connection_spec.server_ip IN(SERVERIPS) + AND include(STAGE2_FILTER_FILENAME) +GROUP BY + day_timestamp, server_ip, client_ip, test_id +ORDER BY + day_timestamp, server_ip, client_ip, test_id; + diff --git a/tmpl/stage3-ndt.m4.sql b/tmpl/stage3-ndt.m4.sql new file mode 100644 index 0000000..2327360 --- /dev/null +++ b/tmpl/stage3-ndt.m4.sql @@ -0,0 +1,19 @@ +SELECT + INTEGER(log_time) AS day_timestamp, + connection_spec.server_ip AS server_ip, + connection_spec.client_ip AS client_ip, + paris_traceroute_hop.src_ip AS hop_src_ip, + paris_traceroute_hop.dest_ip AS hop_dest_ip, + test_id AS test_id +FROM + DATETABLE +WHERE + IS_EXPLICITLY_DEFINED(project) + -- traceroute traffic + AND project = 3 + -- restrict to NY servers, and given filters ranges. + AND connection_spec.server_ip IN(SERVERIPS) + AND include(STAGE3_FILTER_FILENAME) +ORDER BY + day_timestamp, test_id, server_ip, client_ip; + From bebc307d94412bbe52570897996b12966b936cfe Mon Sep 17 00:00:00 2001 From: "soltesz@opentechinstitute.org" Date: Tue, 27 Aug 2013 09:58:11 -0400 Subject: [PATCH 09/14] move generic ndt template --- tmpl/ndt-tmpl-generic.m4.sql | 53 ++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 tmpl/ndt-tmpl-generic.m4.sql diff --git a/tmpl/ndt-tmpl-generic.m4.sql b/tmpl/ndt-tmpl-generic.m4.sql new file mode 100644 index 0000000..b008e8e --- /dev/null +++ b/tmpl/ndt-tmpl-generic.m4.sql @@ -0,0 +1,53 @@ +SELECT + -- web100_log_entry.connection_spec.local_ip AS server_ip, + INTEGER(web100_log_entry.log_time/7200)*7200 AS day_timestamp, + COUNT(web100_log_entry.log_time) AS test_count, + -- RATE + NTH(90,QUANTILES(8*web100_log_entry.snap.HCThruOctetsAcked/( + web100_log_entry.snap.SndLimTimeRwin + + web100_log_entry.snap.SndLimTimeCwnd + + web100_log_entry.snap.SndLimTimeSnd),101)) as quantile_90, + NTH(10,QUANTILES(8*web100_log_entry.snap.HCThruOctetsAcked/( + web100_log_entry.snap.SndLimTimeRwin + + web100_log_entry.snap.SndLimTimeCwnd + + web100_log_entry.snap.SndLimTimeSnd),101)) as quantile_10, + NTH(50,QUANTILES(8*web100_log_entry.snap.HCThruOctetsAcked/( + web100_log_entry.snap.SndLimTimeRwin + + web100_log_entry.snap.SndLimTimeCwnd + + web100_log_entry.snap.SndLimTimeSnd),101)) as med_rate, + STDDEV(8*web100_log_entry.snap.HCThruOctetsAcked/( + web100_log_entry.snap.SndLimTimeRwin + + web100_log_entry.snap.SndLimTimeCwnd + + web100_log_entry.snap.SndLimTimeSnd)) as std_rate, +FROM + DATETABLE +WHERE + IS_EXPLICITLY_DEFINED(project) + AND IS_EXPLICITLY_DEFINED(connection_spec.data_direction) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.is_last_entry) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.snap.HCThruOctetsAcked) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.snap.CongSignals) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.connection_spec.remote_ip) + AND IS_EXPLICITLY_DEFINED(web100_log_entry.connection_spec.local_ip) + -- NDT download + AND project = 0 + AND connection_spec.data_direction = 1 + AND web100_log_entry.is_last_entry = True + AND web100_log_entry.snap.HCThruOctetsAcked >= 8192 + AND web100_log_entry.snap.HCThruOctetsAcked < 1000000000 + AND web100_log_entry.snap.CongSignals > 0 + AND (web100_log_entry.snap.SndLimTimeRwin + + web100_log_entry.snap.SndLimTimeCwnd + + web100_log_entry.snap.SndLimTimeSnd) >= 9000000 + AND (web100_log_entry.snap.SndLimTimeRwin + + web100_log_entry.snap.SndLimTimeCwnd + + web100_log_entry.snap.SndLimTimeSnd) < 3600000000 + AND web100_log_entry.snap.MinRTT < 1e7 + -- restrict to NY lga01 servers, and given ISP address ranges. + AND web100_log_entry.connection_spec.local_ip IN(SERVERIPS) + AND ( include(ISP_FILTER_FILENAME) ) +GROUP BY + day_timestamp +ORDER BY + day_timestamp; + From d55aa369bd242a1f69c61210013f00f6554216da Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Tue, 27 Aug 2013 12:15:32 -0400 Subject: [PATCH 10/14] Calculate the hop matrix. --- hops.py | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100755 hops.py diff --git a/hops.py b/hops.py new file mode 100755 index 0000000..1c8a50a --- /dev/null +++ b/hops.py @@ -0,0 +1,97 @@ +#!/usr/bin/python +import socket +import struct + +# +# from the web! +# +def iptoint(ip): + return int(socket.inet_aton(ip).encode('hex'),16) + +def as_array(filename,skip=1): + ases = {} + f = open(filename, 'r') + skipped = 0 + counter = 0 + for line in f: + if (skipped < skip): + skipped += 1 + continue + s = line.split(',') + as_no = (s[2].split(' '))[0].lstrip("\"") + low = s[0] + high = s[1] + ases[counter] = as_no + "," + low + "," + high + counter += 1 + f.close() + return ases + +def lookup_as(ip, ases, cache): + if (ip in cache): + return cache[ip] + for as_and_range in ases: + s = ases[as_and_range].split(',') + low = int(s[1]) + high = int(s[2]) + if (low <= ip and ip <= high): + cache[ip] = s[0] + return s[0] + return "NO AS" + +def rate_array(filename, skip=1): + rate = {} + f = open(filename, 'r') + skipped = 0 + for line in f: + if (skipped < skip): + skipped += 1 + continue + s = line.split(',') + site = (s[1].rpartition('.'))[0] + client = s[2] + index = site + "," + client + bw = s[3] + rate[index] = bw + f.close() + return rate + +def hop_array(filename, rates, ases, skip=1): + hops = {} + as_cache = {} + f = open(filename, 'r') + skipped = 0 + for line in f: + if (skipped < skip): + skipped += 1 + continue + s = line.split(',') + site = (s[1].rpartition('.'))[0] + client = s[2] + print "Another data point ..." + hop_a = lookup_as(s[3], ases, as_cache) + hop_b = lookup_as(s[4], ases, as_cache) + rates_index = site + "," + client + if (hop_a not in hops): + hops[hop_a] = {} + if (hop_b not in hops[hop_a]): + hops[hop_a][hop_b] = "" + hops[hop_a][hop_b] += rates[rates_index] + "," + f.close() + return hops + +def write_hop_array(filename, hops): + f = open(filename, 'w') + for hop_a in hops.keys(): + for hop_b in hops[hop_a].keys(): + # + # The number of results reported is a + # little off because of fence-post + # issue with trailing , in the list + # + f.write(hop_a + "," + hop_b + "," + str(len(hops[hop_a][hop_b].split(','))) + ":" + hops[hop_a][hop_b] + "\n") + f.close() + +rate = rate_array("cache/stage1.comcast.lga01.sql.csv") +ases = as_array("GeoIPASNum2.csv", 0) +hops = hop_array("cache/stage3.comcast.lga01.sql.csv", rate, ases) +write_hop_array("cache/hops.csv", hops) From 035367b7da9d9484c6be719441f37efba0f11b57 Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Tue, 27 Aug 2013 14:28:21 -0400 Subject: [PATCH 11/14] Do as calculation in a separate pass --- hops.py | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/hops.py b/hops.py index 1c8a50a..c4b6837 100755 --- a/hops.py +++ b/hops.py @@ -55,9 +55,8 @@ def rate_array(filename, skip=1): f.close() return rate -def hop_array(filename, rates, ases, skip=1): +def hop_array(filename, rates, skip=1): hops = {} - as_cache = {} f = open(filename, 'r') skipped = 0 for line in f: @@ -68,8 +67,10 @@ def hop_array(filename, rates, ases, skip=1): site = (s[1].rpartition('.'))[0] client = s[2] print "Another data point ..." - hop_a = lookup_as(s[3], ases, as_cache) - hop_b = lookup_as(s[4], ases, as_cache) + #hop_a = lookup_as(s[3], ases, as_cache) + #hop_b = lookup_as(s[4], ases, as_cache) + hop_a = s[3] + hop_b = s[4] rates_index = site + "," + client if (hop_a not in hops): hops[hop_a] = {} @@ -79,6 +80,21 @@ def hop_array(filename, rates, ases, skip=1): f.close() return hops +def asify_hop_array(hops, ases): + as_hops = {} + as_cache = {} + for hop_a in hops: + for hop_b in hops[hop_a]: + print "Hop pair ..." + as_hop_a = lookup_as(hop_a, ases, as_cache) + as_hop_b = lookup_as(hop_b, ases, as_cache) + if as_hop_a not in as_hops: + as_hops[as_hop_a] = {} + if as_hop_b not in as_hops[as_hop_a]: + as_hops[as_hop_a][as_hop_b] = "" + as_hops[as_hop_a][as_hop_b] += hops[hop_a][hop_b] + "," + return as_hops + def write_hop_array(filename, hops): f = open(filename, 'w') for hop_a in hops.keys(): @@ -93,5 +109,6 @@ def write_hop_array(filename, hops): rate = rate_array("cache/stage1.comcast.lga01.sql.csv") ases = as_array("GeoIPASNum2.csv", 0) -hops = hop_array("cache/stage3.comcast.lga01.sql.csv", rate, ases) -write_hop_array("cache/hops.csv", hops) +hops = hop_array("cache/stage3.comcast.lga01.sql.csv", rate) +as_hops = asify_hop_array(hops, ases) +write_hop_array("cache/hops.csv", as_hops) From a26947ec50ff85e551fbc40c51cff2ca3d41121f Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Tue, 27 Aug 2013 15:00:44 -0400 Subject: [PATCH 12/14] Fix bug (iptoint before lookup) --- hops.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/hops.py b/hops.py index c4b6837..4c65a45 100755 --- a/hops.py +++ b/hops.py @@ -27,14 +27,15 @@ def as_array(filename,skip=1): return ases def lookup_as(ip, ases, cache): - if (ip in cache): + if (cache and ip in cache): return cache[ip] for as_and_range in ases: s = ases[as_and_range].split(',') low = int(s[1]) high = int(s[2]) if (low <= ip and ip <= high): - cache[ip] = s[0] + if cache: + cache[ip] = s[0] return s[0] return "NO AS" @@ -86,8 +87,8 @@ def asify_hop_array(hops, ases): for hop_a in hops: for hop_b in hops[hop_a]: print "Hop pair ..." - as_hop_a = lookup_as(hop_a, ases, as_cache) - as_hop_b = lookup_as(hop_b, ases, as_cache) + as_hop_a = lookup_as(iptoint(hop_a), ases, as_cache) + as_hop_b = lookup_as(iptoint(hop_b), ases, as_cache) if as_hop_a not in as_hops: as_hops[as_hop_a] = {} if as_hop_b not in as_hops[as_hop_a]: @@ -112,3 +113,4 @@ def write_hop_array(filename, hops): hops = hop_array("cache/stage3.comcast.lga01.sql.csv", rate) as_hops = asify_hop_array(hops, ases) write_hop_array("cache/hops.csv", as_hops) +#print lookup_as(iptoint("8.8.8.8"), ases, None) From ca2a29736aea0b5c6c9303d20c5593854743959f Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Tue, 27 Aug 2013 16:06:20 -0400 Subject: [PATCH 13/14] Make hops output pretty (by AS string) --- pretty_hops.sh | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100755 pretty_hops.sh diff --git a/pretty_hops.sh b/pretty_hops.sh new file mode 100755 index 0000000..16ebd8e --- /dev/null +++ b/pretty_hops.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +awk --field-separator=, ' + function to_as_string(as_no) + { + if (as_no == "NO AS") + return as_no + "grep " as_no " GeoIPASNum2.csv | head -n1 | sed \"s/^.*\\(AS.*$\\)/\\1/\"" | getline output + return output; + } + { + if ($1 != $2) { + print to_as_string($1) "(" $1 ")" "->" to_as_string($2) "(" $2 ")"; + } +}' cache/hops.csv + +#"s/^.*\(AS[0-9]\+\).*$/\1/" + From 92bd98e867b15d73072f547663e912a85c686549 Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Tue, 27 Aug 2013 16:15:25 -0400 Subject: [PATCH 14/14] Get rid of extra , --- hops.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/hops.py b/hops.py index 4c65a45..5f658a0 100755 --- a/hops.py +++ b/hops.py @@ -68,8 +68,6 @@ def hop_array(filename, rates, skip=1): site = (s[1].rpartition('.'))[0] client = s[2] print "Another data point ..." - #hop_a = lookup_as(s[3], ases, as_cache) - #hop_b = lookup_as(s[4], ases, as_cache) hop_a = s[3] hop_b = s[4] rates_index = site + "," + client @@ -93,7 +91,7 @@ def asify_hop_array(hops, ases): as_hops[as_hop_a] = {} if as_hop_b not in as_hops[as_hop_a]: as_hops[as_hop_a][as_hop_b] = "" - as_hops[as_hop_a][as_hop_b] += hops[hop_a][hop_b] + "," + as_hops[as_hop_a][as_hop_b] += hops[hop_a][hop_b] return as_hops def write_hop_array(filename, hops):