In [1]:
import json
from pprint import pprint
import os
import unicodedata
import datetime
import pandas as pd
import numpy as np
from subprocess import call
import pickle
from datetime import datetime
import calendar
import threading
import thread
import itertools
import math
from bisect import bisect_left
from __future__ import print_function

## Define functions to get raw data from the archive

In [2]:
def findIndexClosestToTimestamp_mod(timestamp, dataPoints, collumThatIsNotNAN):
    '''
    Returns a data point from dataPoints, that is closest to the given timestamp.
    As well the dataPoint will not be nan in the collum "collumThatIsNotNAN"
    Complexity of this command: O( log( len(dataPoints) ) )
    '''
    timestamps_toSearch = dataPoints.axes[0]
    cols = list(dataPoints.axes[1])
    data = dataPoints.values
    # bisect only works because we know our list is sorted
    pos = bisect_left(timestamps_toSearch, timestamp)
    if pos == len(dataPoints):
        pos = len(dataPoints)-1
    #print(pos)
    # find the closest datapoint, that is not a NAN and return it
    currentDataPoint = dict(zip(cols, dataPoints.values[pos]))
    if not math.isnan(currentDataPoint[collumThatIsNotNAN]):
        return pos
    else:
        pos = findIndexClostestThatIsNotNAN(pos, dataPoints, cols.index(collumThatIsNotNAN))
        return pos

def readPacketloss(file_lock, base_data=pd.DataFrame(), appendix=""):
    data = base_data.copy(deep=True)
    with open(file_lock) as data_file:
        data_raw = json.load(data_file)
    for dataPoint in data_raw:
        # skip summaries
        if dataPoint['summary_window'] != 0:
            continue
        timestomp = datetime.strptime(dataPoint['timestamp'], "%Y-%m-%d %H:%M:%S")
        timestamp_epoch = int(calendar.timegm(timestomp.utctimetuple()))
        data.set_value(timestamp_epoch, "packet_loss"+appendix, dataPoint['value'])
    return data

def readTimeErrorEstimates(file_lock, base_data=pd.DataFrame(), appendix=""):
    data = base_data.copy(deep=True)
    with open(file_lock) as data_file:
        data_raw = json.load(data_file)
    for dataPoint in data_raw:
        # skip summaries
        if dataPoint['summary_window'] != 0:
            continue
        timestomp = datetime.strptime(dataPoint['timestamp'], "%Y-%m-%d %H:%M:%S")
        timestamp_epoch = int(calendar.timegm(timestomp.utctimetuple()))
        data.set_value(timestamp_epoch, "time_error_estimates"+appendix, dataPoint['value'])
    return data

def readTTL(file_lock, base_data=pd.DataFrame(), appendix=""):
    data = base_data.copy(deep=True)
    with open(file_lock) as data_file:
        data_raw = json.load(data_file)
    for dataPoint in data_raw:
        # skip summaries
        if dataPoint['summary_window'] != 0:
            continue
        timestomp = datetime.strptime(dataPoint['timestamp'], "%Y-%m-%d %H:%M:%S")
        timestamp_epoch = int(calendar.timegm(timestomp.utctimetuple()))
        avgStdMedian = getAvgStdMedianFromHistogram(dataPoint['value'])
        data.set_value(timestamp_epoch, "ttl_avg"+appendix, avgStdMedian[0])
        data.set_value(timestamp_epoch, "ttl_std"+appendix, avgStdMedian[1])
        data.set_value(timestamp_epoch, "ttl_median"+appendix, avgStdMedian[2])
    return data

def readThroughput(file_lock, base_data=pd.DataFrame(), appendix=""):
    data = base_data.copy(deep=True)
    with open(file_lock) as data_file:
        data_raw = json.load(data_file)
    for dataPoint in data_raw:
        # skip summaries
        if dataPoint['summary_window'] != 0:
            continue
        timestomp = datetime.strptime(dataPoint['timestamp'], "%Y-%m-%d %H:%M:%S")
        timestamp_epoch = int(calendar.timegm(timestomp.utctimetuple()))
        # put it to the closest appearence of another point, since we don't want nans...
        pos = findIndexClosestToTimestamp_mod(timestamp_epoch, data, 'delay_avg')
        target_timestamp = data.iloc[pos].name
        data.set_value(target_timestamp, "throughput_perfSonar"+appendix, dataPoint['value'])
    return data

def readOWD(file_lock, base_data=pd.DataFrame(), appendix=""):
    data = base_data.copy(deep=True)
    with open(file_lock) as data_file:
        data_raw = json.load(data_file)
    for dataPoint in data_raw:
        # skip summaries
        if dataPoint['summary_window'] != 0:
            continue
        timestomp = datetime.strptime(dataPoint['timestamp'], "%Y-%m-%d %H:%M:%S")
        timestamp_epoch = int(calendar.timegm(timestomp.utctimetuple()))
        avgStdMedian = getAvgStdMedianFromHistogram(dataPoint['value'])
        data.set_value(timestamp_epoch, "delay_avg"+appendix, avgStdMedian[0])
        data.set_value(timestamp_epoch, "delay_std"+appendix, avgStdMedian[1])
        data.set_value(timestamp_epoch, "delay_median"+appendix, avgStdMedian[2])
    return data

def getAvgStdMedianFromHistogram(hitogram_dict):
    histogram = np.array(hitogram_dict.items(), float)
    values, weights_out = np.split(histogram, 2, axis=1)
    avg = np.average(values, weights=weights_out)
    std = np.sqrt(np.average((values-avg)**2, weights=weights_out))
    summ = np.cumsum(weights_out)
    index = np.searchsorted(summ, summ[len(summ)-1]/2)
    median = values[index][0]
    return avg, std, median


def getRawDataFromArchive(src, dest):
    data_in = pd.DataFrame()
    data_out = pd.DataFrame()
    for i in range(2):
        data = pd.DataFrame()
        start_date = "September 11"
        command_latency = "/home/hendrik/Downloads/esmond/esmond_client/clients/esmond-ps-get-bulk -u "+src['latency_url']+" -s "+src['latency_IP']+" -d "+dest['latency_IP']+" --start-time '"+start_date+"' --output-format=json -D /tmp"
        command_throughput = "/home/hendrik/Downloads/esmond/esmond_client/clients/esmond-ps-get-bulk -u "+src['throughput_url']+" -s "+src['throughput_IP']+" -d "+dest['throughput_IP']+" --start-time '"+start_date+"' --output-format=json -D /tmp"
        
        print("Getting raw data from archive: " + src['name'])
        call(command_latency, shell=True)
        call(command_throughput, shell=True)

        date_string = datetime.today().strftime('%Y-%m-%d')
        packetloss_file = "/tmp/"+src['latency_url'][7:]+"_"+dest['latency_url'][7:]+"_packet-loss-rate_2016-09-11_"+date_string+".json"
        ttl_file = "/tmp/"+src['latency_url'][7:]+"_"+dest['latency_url'][7:]+"_histogram-ttl_2016-09-11_"+date_string+".json"
        tee_file = "/tmp/"+src['latency_url'][7:]+"_"+dest['latency_url'][7:]+"_time-error-estimates_2016-09-11_"+date_string+".json"
        owd_file = "/tmp/"+src['latency_url'][7:]+"_"+dest['latency_url'][7:]+"_histogram-owdelay_2016-09-11_"+date_string+".json"
        throughput_file = "/tmp/"+src['throughput_url'][7:]+"_"+dest['throughput_url'][7:]+"_throughput_2016-09-11_"+date_string+".json"
        

        # read json file
        print("Reading raw data files")
        data = readPacketloss(packetloss_file, base_data=data)
        data = readTTL(ttl_file, base_data=data)
        data = readTimeErrorEstimates(tee_file, base_data=data)
        data = readOWD(owd_file, base_data=data)
        data = readThroughput(throughput_file, base_data=data)
        src, dest = dest, src
        if i == 0:
            data_out = data.copy(deep=True)
        else:
            data_in = data.copy(deep=True)
    print("Sorting raw data")
    data_out.sort_index(inplace=True, ascending=False)
    data_in.sort_index(inplace=True, ascending=False)
    data = {'out': data_out, 'in': data_in}
    with lock:
        if src['name'] not in data_global.keys():
            data_global[src['name']] = {}
        data_global[src['name']][dest['name']] = data
        print("Finished with: "+ src['name']+ " <-> "+ dest['name'])
    return data


In [3]:
def starlikeConnector_linear(center_site_data, dest_data_list):
    for dest_data in dest_data_list:
        getRawDataFromArchive(center_site_data, dest_data)
    
def starlikeConnector_threaded(site_data, dest_data_list):
    for dest_data in dest_data_list:
        thread.start_new_thread(getRawDataFromArchive, (site_data, dest_data))
        # rather use the new threading module for multithreading!
        
def allConnections_threaded(sites_list):
    for sites in itertools.combinations(site_list, 2):
        thread.start_new_thread(getRawDataFromArchive, (sites[0], sites[1]))

## Get raw data

In [4]:
src = {'name': 'CERN', 'latency_url': "http://perfsonar-lt.cern.ch", 'throughput_url': 'http://perfsonar-bw.cern.ch',
      'latency_IP': '128.142.223.247', 'throughput_IP': '128.142.223.246'}
dest1 = {'name': 'PIC', 'latency_url': "http://psl01.pic.es", 'throughput_url': 'http://psb01.pic.es',
      'latency_IP': '193.109.172.188', 'throughput_IP': '193.109.172.187'}
dest2 = {'name': 'RAL', 'latency_url': "http://lcgps01.gridpp.rl.ac.uk", 'throughput_url': 'http://lcgps02.gridpp.rl.ac.uk',
      'latency_IP': '130.246.176.109', 'throughput_IP': '130.246.176.110'}
dest3 = {'name': 'TRIUMF', 'latency_url': "http://ps-latency.lhcmon.triumf.ca", 'throughput_url': 'http://ps-bandwidth.lhcmon.triumf.ca',
      'latency_IP': '206.12.9.2', 'throughput_IP': '206.12.9.1'}
dest4 = {'name': 'KIT', 'latency_url': "http://perfsonar2-de-kit.gridka.de", 'throughput_url': 'http://perfsonar-de-kit.gridka.de',
      'latency_IP': '192.108.47.12', 'throughput_IP': '192.108.47.6'}
dest5 = {'name': 'CCIN2P3', 'latency_url': "http://ccperfsonar2.in2p3.fr", 'throughput_url': 'http://ccperfsonar1.in2p3.fr',
      'latency_IP': '193.48.99.76', 'throughput_IP': '193.48.99.77'}
dest6 = {'name': 'BNL', 'latency_url': "http://lhcperfmon.bnl.gov", 'throughput_url': 'http://lhcmon.bnl.gov',
      'latency_IP': '192.12.15.26', 'throughput_IP': '192.12.15.23'}
dest7 = {'name': 'CNAF', 'latency_url': "http://perfsonar-ow.cnaf.infn.it", 'throughput_url': 'http://perfsonar-ps.cnaf.infn.it',
      'latency_IP': '131.154.254.12', 'throughput_IP': '131.154.254.11'}
dest8 = {'name': 'JINR-T1', 'latency_url': "http://t1-pfsn2.jinr-t1.ru", 'throughput_url': 'http://t1-pfsn1.jinr-t1.ru',
      'latency_IP': '159.93.229.151', 'throughput_IP': '159.93.229.150'}
dests = [dest1, dest2, dest3, dest4, dest5, dest6, dest7, dest8]


lock = threading.Lock()
data_global = {}
starlikeConnector_threaded(src, dests)



Getting raw data from archive: CERNGetting raw data from archive: CERN
Getting raw data from archive: CERNGetting raw data from archive: CERNGetting raw data from archive: CERNGetting raw data from archive: CERNGetting raw data from archive: CERN
Getting raw data from archive: CERN





Reading raw data files
Reading raw data files
Reading raw data files
Reading raw data files
Reading raw data files
Reading raw data files
Reading raw data files
Reading raw data files
Getting raw data from archive: JINR-T1
Reading raw data files
Getting raw data from archive: RAL
Reading raw data files
Getting raw data from archive: CCIN2P3
Getting raw data from archive: TRIUMF
Getting raw data from archive: BNL
Getting raw data from archive: PIC
Getting raw data from archive: CNAF
Getting raw data from archive: KIT
Sorting raw data
Finished with: CERN <-> JINR-T1
Sorting raw data
Finished with: CERN <-> RAL
Reading raw data files
Reading raw data files
Reading raw data files
Reading raw data files
Read

## Save raw data

In [6]:
pickle.dump(data_global, open('raw_toolkit_CERN_to_RAL-PIC-TRIUMF-KIT-IN2P3-BNL-CNAF-JINR-T1_sorted.pkl', "w"))

In [5]:
# test which keys we have
for key1 in data_global.keys():
    print(key1 + " <-> ", end="")
    for key2 in data_global[key1].keys():
        print(key2 + " | ", end="")
    print(" ")

CERN <-> TRIUMF | RAL | PIC | JINR-T1 | KIT | BNL | CCIN2P3 | CNAF |  
