In [1]:
from dask.distributed import Client
from dask.delayed import delayed
import dask.dataframe as dd
import pandas as pd
from PySquashfsImage import SquashFsImage
import zipfile
import tempfile
import os
import shutil
from dask.distributed import get_worker
import re
import itertools
from dateutil.tz import gettz
from bs4 import BeautifulSoup
import dateutil
import datetime
import json
import numpy as np
from dateutil import parser as dateparser
import string
from itertools import zip_longest
from scipy.special import comb
import pytz

In [2]:
tzinfos = {
    'PST': gettz('Etc/GMT-8'),
    'PDT': gettz('Etc/GMT-7'),
    # https://www.timeanddate.com/time/zones/est
    'EST': gettz('Etc/GMT-5'),
    # https://www.timeanddate.com/time/zones/edt
    'EDT': gettz('Etc/GMT-4'),
    # https://www.timeanddate.com/time/zones/cest
    'CET': gettz('Etc/GMT+1')
}

def process_downdetector_file(filename, file_obj, metadata):
    service_name = filename.split('.')[0]
    html = str(file_obj.read())
    # pprint(html)
    if html is None:
        raise ValueError(f'ERROR: HTML IS NONE \n filename {filename} \t file_obj {file_obj} ')
    json_lines = []

    json_key_value_regex = \
        r"{{1}\s(\w)+:\s+\\'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+\d{2}:\d{2}\\',\s+\w+:\s+\d+\s+}{1},"
    json_lines = [x.group() for x in re.finditer(json_key_value_regex, html)]
    dates_values = [x.replace('\\', '').split(',') for x in json_lines]

    # TODO make a better regex
    date_rgx = re.compile(r"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{6}\+[0-9]{2}:[0-9]{2}")
    value_rgx = re.compile(r"[0-9]+")

    date_list, value_list = [], []
    for dv in dates_values:
        if date_rgx.search(dv[0]) is not None:
            date_list.append(date_rgx.search(dv[0]).group(0))
        else:
            raise ValueError(f'ERROR: Regex failed to find timestamp -> '
                             f'filename: {filename} \t file_obj {file_obj}\n ')

        if value_rgx.search(dv[1]) is not None:
            value_list.append(value_rgx.search(dv[1]).group(0))
        else:
            raise ValueError(f'ERROR:  Regex failed to find severity-> '
                             f'filename: {filename} \t file_obj {file_obj}\n ')

    if len(date_list) != len(value_list):
        raise ValueError(f'ERROR: Length of lists dont match \n '
                         f'date_list length:{len(date_list)} must equal value_list length:{len(value_list)}\n'
                         f'filename {filename} \t file_obj {file_obj} ')
    # pprint(date_list)

    # timeseries = [parser.parse(date, tzinfos=tzinfos) for date in date_list]

    return pd.DataFrame({'event_time': [dateparser.parse(date, tzinfos=tzinfos) for date in date_list],
                         'status_code': value_list,
#                          'request_time': dateparser.parse(f'{existing_parts[2]} EST', tzinfos=tzinfos),
                         'status_info_updated': None,  # TODO: Missing
                         'last_status_change': None,  # TODO: Missing
                         'last_w_service_disruption': None,  # TODO: Missing
                        })

In [3]:
class DRNParser:

    def time_since_timestart(self, elem):
        return int(1440 * elem)  # 24 * 60 = 1440


    def get_availability(self, code):
        status = {
            83: 1.0,  # 'Up',
            66: 0.8,  # 'Recent Signs of Service Trouble',
            49: 0.6,  # 'Possible Service Trouble',
            33: 0.4,  # 'Likely Service Disruption',
            16: 0  # 'Confirmed Service Disruption'
        }
        return status.get(code)


    def decode64(self, raw_string):
        allowed_characters = string.ascii_uppercase + string.ascii_lowercase + string.digits + '-.__'
        chr_groups = [
            (m, n or '__')
            for m, n
            in list(zip_longest(raw_string, raw_string[1:]))[::2]
        ]
        return [
            (allowed_characters.index(m) * 64 + allowed_characters.index(n)) / 4095
            for m, n
            in chr_groups
        ]


    def trans_service_status(self, series):
        return [self.get_availability(int(s * 100)) for s in series]


    # 159451   2016-01-31 23:37:00-02:00  1.0 yahoomail    2016-01-31 23:37:00-02:00
    # 2013-01-24 21:37+02:00
    # ONLY WORKS IF THE FILE IS SOURCED FROM CLOUD AVAILABILITY ARCHIVE -> filetime
    def trans_time_series(self, series, filetime):
        time_series_data = []

        # turn the clock back by one day since the chart is 24 hours long
        timestart = filetime - datetime.timedelta(days=1)
        for inx, elem in enumerate(list(series)):
            if inx == 0:  # timeseries[0] begins is timestart
                time_series_data.append(timestart)
            else:
                time_since_start = timestart + datetime.timedelta(minutes=self.time_since_timestart(elem))
                time_series_data.append(time_since_start)
        # pp.pprint(time_series_data)
        return time_series_data


    def process_file(self, filename, file_obj, scrape_time):
        html = str(file_obj.read())
        if html is None:
            raise ValueError(f'ERROR: HTML IS NONE \n filename {filename} \t file_obj {file_obj} ')
        if html == '':
            # WARNING!!! IGNORING ERRORS. AFAIK, FEW EXIST. THIS DATA SHOULD BE DUPLICATED IN OTHER FILES.
            return pd.DataFrame({'event_time': [datetime.datetime.now()],
                                 'status_code': [-1]})
            
        try:
            series_reg = re.compile("chd=e:([^&]{0,})")
            series_tuple = series_reg.findall(html)
            if (len(series_tuple[0]) - 1) % 2 != 0:
                print(html)
                raise ValueError('Needs to be even')

            splits = series_tuple[0].split(',')
            enc_timeseries, enc_service_status = splits[0], splits[1]
    #         filetime = list(existing_parts[5][:-4])
            filetime = list(scrape_time)
            filetime[8], filetime[11] = ' ', ':'  # cant parse the format w/ the standard library
            filetime.append(' EST') # All data should be parsed as EST since it was scraped in N. Virginia
            file_datetime = dateparser.parse(''.join(filetime[:-1]), tzinfos=tzinfos)
            timeseries = self.trans_time_series(self.decode64(enc_timeseries), file_datetime)
            service_status = self.trans_service_status(self.decode64(enc_service_status))

            timestamp_reg = re.compile("timestamp_\\d{0,}\"\\>([^\\</]{0,})")
            all_timestamps = timestamp_reg.findall(html)
            new_timestamps = [dateparser.parse(ma1, tzinfos=tzinfos) for ma1 in all_timestamps]
        except Exception as e:
            print(html)
            raise e

        return pd.DataFrame({'event_time': timeseries,
                             'status_code': service_status})
#                              'status_info_updated': new_timestamps[0],
#                              'last_status_change': new_timestamps[1],
#                              'last_w_service_disruption': new_timestamps[2]})

In [4]:
class OutageReportParser:

    def process_outagereport(self, filename, file_obj):
        html = str(file_obj.read())
        if html is None:
            raise ValueError(f'ERROR: HTML IS NONE \n filename {filename} \t file_obj {file_obj} ')

        matches = re.search('"chartData":(.*?)],', html)
        statuses = []
        if matches is not None:
            statuses = json.loads(matches.group(1) + ']') # We used the closing brackets in the regex and don't capture them, adding them back again
        
        event_times = []
        people_reporting = []
#         eastern = pytz.timezone('US/Eastern')
        for datapoint in statuses:
            event_times.append(datetime.datetime.utcfromtimestamp(datapoint['ts']))
            people_reporting.append(datapoint['count'])

        return pd.DataFrame({'event_time': event_times,
                             'status_code': people_reporting})

In [5]:
APP_TEMP_PREFIX = 'ctzph-01-'
pathSplitRegex = re.compile('[./]')
PLACEHOLDER_DF = pd.DataFrame([{
                    'event_time': datetime.datetime.now(),
                    'status_code': np.int32(-1),
                    # 'request_time': dateparser.parse(f'{existing_parts[2]} EST', tzinfos=tzinfos),
#                     'status_info_updated': None,
#                     'last_status_change': None,
#                     'last_w_service_disruption': None,
                    'vendor': '',
                    'monitor': '',
                    'location': ''
                }])

def populateMetadataFromSourceDirName(metadata, sourceInfo):
    if sourceInfo.startswith('cloud-amazon-web-services'):
        metadata['vendor'] = 'AWS'
        metadata['monitor'] = 'AWS'
        metadata['org_type'] = 'cloud'
    elif sourceInfo.startswith('cloud-google-apps'):
        metadata['vendor'] = 'Google Apps'
        metadata['monitor'] = 'Google Apps'
        metadata['org_type'] = 'application'
    elif sourceInfo.startswith('cloud-google-cloud-platform'):
        metadata['vendor'] = 'GCP'
        metadata['monitor'] = 'GCP'
        metadata['org_type'] = 'cloud'
    elif sourceInfo.startswith('cloud-microsoft-azure'):
        metadata['vendor'] = 'Azure'
        metadata['monitor'] = 'Azure'
        metadata['org_type'] = 'cloud'
    elif sourceInfo.startswith('cloudflare-status'):
        metadata['vendor'] = 'Cloudflare'
        metadata['monitor'] = 'Cloudflare'
        metadata['org_type'] = 'cloud'
    elif sourceInfo.startswith('downdetector'):
        metadata['monitor'] = 'Down Detector'
        
        if sourceInfo == 'downdetector':
            metadata['location'] = 'USA'
        elif sourceInfo.endswith('united-arab-emirates'):
            metadata['location'] = 'UAE'
        elif sourceInfo.endswith('argentina'):
            metadata['location'] = 'Argentina'
        elif sourceInfo.endswith('australia'):
            metadata['location'] = 'Australia'
        elif sourceInfo.endswith('austria'):
            metadata['location'] = 'Austria'
        elif sourceInfo.endswith('belgium'):
            metadata['location'] = 'Belgium'
        elif sourceInfo.endswith('brazil'):
            metadata['location'] = 'Brazil'
        elif sourceInfo.endswith('canada'):
            metadata['location'] = 'Canada'
        elif sourceInfo.endswith('switzerland'):
            metadata['location'] = 'Switzerland'
        elif sourceInfo.endswith('chile'):
            metadata['location'] = 'Chile'
        elif sourceInfo.endswith('denmark'):
            metadata['location'] = 'Denmark'
        elif sourceInfo.endswith('germany'):
            metadata['location'] = 'Germany'
        elif sourceInfo.endswith('spain'):
            metadata['location'] = 'Spain'
        elif sourceInfo.endswith('finland'):
            metadata['location'] = 'Finland'
        elif sourceInfo.endswith('france'):
            metadata['location'] = 'France'
        elif sourceInfo.endswith('great-britain'):
            metadata['location'] = 'Great Britain'
        elif sourceInfo.endswith('india'):
            metadata['location'] = 'India'
        elif sourceInfo.endswith('ireland'):
            metadata['location'] = 'Ireland'
        elif sourceInfo.endswith('italy'):
            metadata['location'] = 'Italy'
        elif sourceInfo.endswith('japan'):
            metadata['location'] = 'Japan'
        elif sourceInfo.endswith('mexico'):
            metadata['location'] = 'Mexico'
        elif sourceInfo.endswith('netherlands'):
            metadata['location'] = 'Netherlands'
        elif sourceInfo.endswith('norway'):
            metadata['location'] = 'Norway'
        elif sourceInfo.endswith('new-zealand'):
            metadata['location'] = 'New Zealand'
        elif sourceInfo.endswith('poland'):
            metadata['location'] = 'Poland'
        elif sourceInfo.endswith('portugal'):
            metadata['location'] = 'Portugal'
        elif sourceInfo.endswith('russia'):
            metadata['location'] = 'Russia'
        elif sourceInfo.endswith('singapore'):
            metadata['location'] = 'Singapore'
        elif sourceInfo.endswith('sweden'):
            metadata['location'] = 'Sweden'
        elif sourceInfo.endswith('south-africa'):
            metadata['location'] = 'South Africa'
        else:
            raise Exception('Unknown location')
            
    elif sourceInfo.startswith('downrightnow'):
        metadata['monitor'] = 'Down Right Now'
    elif sourceInfo.startswith('github-status'):
        metadata['vendor'] = 'Github'
        metadata['monitor'] = 'Github'
        metadata['org_type'] = 'application'
    elif sourceInfo.startswith('outage'):
        metadata['monitor'] = 'Outage Report'
    elif sourceInfo.startswith('cloud-apple-consumer'):
        return False
    elif sourceInfo.startswith('atlassian'):
        metadata['vendor'] = 'Atlassian'
        metadata['monitor'] = 'Atlassian'
        metadata['org_type'] = 'application'
    elif sourceInfo.startswith('docker'):
        metadata['vendor'] = 'Docker'
        metadata['monitor'] = 'Docker'
        metadata['org_type'] = 'application'
    elif sourceInfo.startswith('slack'):
        metadata['vendor'] = 'Slack'
        metadata['monitor'] = 'Slack'
        metadata['org_type'] = 'application'
    elif sourceInfo.startswith('minecraft'):
        metadata['vendor'] = 'Minecraft'
        metadata['monitor'] = 'Minecraft'
        metadata['org_type'] = 'application'
    elif sourceInfo.startswith('nintendo'):
        metadata['vendor'] = 'Nintendo'
        metadata['monitor'] = 'Nintendo'
        metadata['org_type'] = 'application'
    elif sourceInfo.startswith('discord'):
        metadata['vendor'] = 'Discord'
        metadata['monitor'] = 'Discord'
        metadata['org_type'] = 'application'
    elif sourceInfo.startswith('gpanel'):
        return False
    else:
        raise Exception('Unknown data source: ' + sourceInfo)
        
    return True

def processHourDir(oneHourDirPath, metadata):
    # ['', 'tmp', 'tmp9_ms31tl', '20171114T000001']
    hourInfo = oneHourDirPath.split('/')[-1][9:11]
    metadata['hour'] = hourInfo
    
    return_df = None

    # add parse conditions here
    # also extract monitor source and target service from sourceinfo here
    # data also extracted from inside the file
    #
    # pass dictionary to be merged inside parser
    # then, it would be like
    monitor_name = metadata['monitor']
    if monitor_name == 'Down Detector':
        ls = []
        for filename in os.listdir(oneHourDirPath):
            with open(os.path.join(oneHourDirPath, filename), 'r') as file_obj:
                processed_file_df = process_downdetector_file(filename, file_obj, metadata)
                processed_file_df['vendor'] = filename.split('.')[0]
                ls.append(processed_file_df)
        if len(ls) == 0:
            return_df = None
        else:
            return_df = pd.concat(ls)
    elif monitor_name == 'Down Right Now':
#         return_df=None
        ls = []
        for filename in os.listdir(oneHourDirPath):
            with open(os.path.join(oneHourDirPath, filename), 'r') as file_obj:
                processor = DRNParser()
                processed_file_df = processor.process_file(filename, file_obj, oneHourDirPath.split('/')[-1])
                processed_file_df['vendor'] = filename.split('.')[0]
                ls.append(processed_file_df)
        if len(ls) == 0:
            return_df = None
        else:
            return_df = pd.concat(ls)
    elif monitor_name == 'Outage Report':
#         return_df=None # for now, will get back to it later
        ls = []
        for filename in os.listdir(oneHourDirPath):
            with open(os.path.join(oneHourDirPath, filename), 'r') as file_obj:
                processor = OutageReportParser()
                processed_file_df = processor.process_outagereport(filename, file_obj)
                processed_file_df['vendor'] = filename.split('.')[0]
                ls.append(processed_file_df)
        if len(ls) == 0:
            return_df = None
        else:
            return_df = pd.concat(ls)
    
    if return_df is None or len(return_df) == 0:
        return_df = PLACEHOLDER_DF
    else: 
        return_df['monitor'] = metadata['monitor']
        if 'location' in metadata:
            return_df['location'] = metadata['location']
        else:
            return_df['location'] = None
    
#     print(return_df.dtypes)
#     print(return_df.loc[0, :])
        
    return return_df

def processDayZip(oneDayZipPath, metadata):
    # ['', 'tmp', 'tmp9_ms31tl', '20171114', 'zip']
    dayInfo = int(pathSplitRegex.split(oneDayZipPath)[-2][6:])
    metadata['day'] = dayInfo
    
    oneDayZip = zipfile.ZipFile(oneDayZipPath)
    
    with tempfile.TemporaryDirectory(prefix=APP_TEMP_PREFIX) as dayTempDir:
        oneDayZip.extractall(dayTempDir)
        hourDirPaths = map(lambda x: os.path.join(dayTempDir, x), os.listdir(dayTempDir))

        return list(map(lambda oneHourDirPath: processHourDir(oneHourDirPath, metadata), hourDirPaths))

def processMonthZip(squashFsImage, monthZipPath):
    arrayOfDataFrames = []
    
    # Collecting info for the dataframe
    # ['', 'cloud-amazon-web-services', '2017', '201711', 'zip']
    try:
        monthZipPathSplits = pathSplitRegex.split(monthZipPath)
        sourceInfo = monthZipPathSplits[1]
        # outage.report has a period in the name. Thus, will lead to additional splits
        if sourceInfo.startswith('outage'):
            yearInfo = int(monthZipPathSplits[3])
            if yearInfo > 2020:
                return []
            monthInfo = int(monthZipPathSplits[4][4:])
        else:
            yearInfo = int(monthZipPathSplits[2])
            if yearInfo > 2020:
                return []
            monthInfo = int(monthZipPathSplits[3][4:])
    except Exception as e:
        print(repr(Exception))
        print(monthZipPath)
        return []
    
    metadata = {
        'year': yearInfo,
        'month': monthInfo
    }
    
    validSource = populateMetadataFromSourceDirName(metadata, sourceInfo)
    if not validSource:
        return []
    
    monthZipHandle = squashFsImage.root.select(monthZipPath)
    with tempfile.TemporaryFile(prefix=APP_TEMP_PREFIX) as monthZipTempFile:
        monthZipTempFile.write(monthZipHandle.getContent())
        oneMonthZip = zipfile.ZipFile(monthZipTempFile)
        
        with tempfile.TemporaryDirectory(prefix=APP_TEMP_PREFIX) as monthTempDir:
            oneMonthZip.extractall(monthTempDir)
            dayZipPaths = map(lambda x: os.path.join(monthTempDir, x), os.listdir(monthTempDir))
            
            return list(itertools.chain.from_iterable(map(lambda oneDayZipPath: processDayZip(oneDayZipPath, metadata), dayZipPaths)))

In [6]:
year_of_interest = 2020
archive_path = "/var/scratch/atlarge/traces/cloud-availability-sacheen-2020-05-11.sqsh"
# Install pysquashfs directly from git. The version on pypi has bug and hasn't been fixed in over 4 years.
image = SquashFsImage(archive_path)
zipfiles = []
for i in image.root.findAll():
    if not i.isFolder() and str(year_of_interest) in i.getPath():
#         print(i.getPath())
#         if 'amazon' in i.getPath() or 'azure' in i.getPath() or 'google' in i.getPath():
#             zipfiles.append(i.getPath())
        zipfiles.append(i.getPath())

# There is a zipfile for each month

In [7]:
from dask_jobqueue import SLURMCluster
from dask.distributed import Client

In [8]:
cluster = SLURMCluster(cores=16, memory="64 GB", processes=16,
                       local_directory="./scheduler_spill",
                       scheduler_options={'dashboard_address': ':8787'},
                       interface='ib0', walltime='02:00:00')
cluster.scale_up(10)
client = Client(cluster)

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


In [9]:
def metaProcessMonthZip(path):
    image = SquashFsImage(archive_path)
#     processMonthZip(image, path)
#     return pd.DataFrame([])
    l = processMonthZip(image, path)
    if type(l) != list or len(l) == 0:
#         print(path)
        return PLACEHOLDER_DF
    return pd.concat(l)

tasks = list(map(delayed(metaProcessMonthZip), zipfiles))
df2 = dd.from_delayed(tasks, meta={
    'event_time': 'datetime64[ns]',
    'status_code': np.float,
#     'status_info_updated': 'datetime64[ns]',
#     'last_status_change': 'datetime64[ns]',
#     'last_w_service_disruption': 'datetime64[ns]',
    'vendor': str,
    'monitor': str,
    'location': str
}).drop_duplicates()
# df2.compute()

In [10]:
# fut = client.persist(df2)
# client.recreate_error_locally(fut)

start = datetime.datetime.now()
# df2.to_parquet('/var/scratch/stalluri/crowdsourced_failures_2020_with_outagereport')
df2.to_parquet('/var/scratch/stalluri/crowdsourced_failures_2020')
end = datetime.datetime.now()



In [11]:
print(end - start)

0:03:32.936628


In [12]:
await cluster.scale_down(cluster.workers)