In [None]:
%pip install dnspython
account = "toszrseus2018"

# optional params
endpoint = ".blob.core.windows.net"
consecutive_failure_threshold_to_create_incident = 10
# consecutive_success_threshold_to_mitigate_incident = 20
accumulated_failure_threshold_to_create_incident = 60
failure_rate_threshold_to_create_incident = 0.5
time_to_probe_in_seconds = 60
per_request_timeout_in_seconds = 10
per_request_interval_in_seconds = 1

# test only params
consecutive_failure = 0
consecutive_success = 0
incident_id = None

In [None]:
hostname = f'{account}{endpoint}'
summary = []
title = f"[ByteDance Connectivity Monitor][Data] account {account} encountered a connectivity issue"

# Utils

In [None]:
import os
print(os.environ['NODE_NAME'])

In [None]:
from xportal.utils import in_aks, get_template_path, get_nb_blob_id
from xportal import kusto
import datetime

def get_job_link():
    job_id = None
    template_path = None
    job_link = None
    job_id = None
    if in_aks():
        job_id = get_nb_blob_id().split('_')[0]
        template_path = get_template_path()
        job_link = f"https://xportal.trafficmanager.net/xjupyterlitereport?path={job_id}_{template_path}.html"
    print(job_link)
    return job_id, job_link

job_id, job_link = get_job_link()
if job_link:
    summary.append(f'''<a href="{job_link}">Job Logs</a>''')

In [None]:
from xportal import kusto

query = f'''BDAccountProbeHistory
| where Timestamp >= ago({time_to_probe_in_seconds + 300}s)
| where StatusCode <= 0'''

if job_id:
    query += f'''| where JobId == "{job_id}"'''

kusto_link = kusto.get_kusto_query_link('xlivesite', 'xlivesite', query)
summary.append(f'''<a href="{kusto_link}">KUSTO query for failed requests</a>''')
summary.append(f'''<a href="https://eng.ms/docs/cloud-ai-platform/azure-core/azure-storage/azure-storage-dev-mansah/xstore/front-end-layer/tsgs/networking/xfe_network_tsg/xfe_network_tsg">XFE Network TSG</a>''')

In [None]:
import datetime
import json
from xportal.utils import RestHelper, get_endpoint
from xportal.icm import IncidentCreationResult
from urllib.parse import urljoin

async def create_incident():
    now = datetime.datetime.utcnow()
    body = {
            "Status": "Active",
            "Title": title,
            "Severity": 2,
            "RoutingId": "mdm://XStore/Triage",
            "CorrelationId": f"{account}{now.strftime('%Y%m%d')}"
    }
    if len(summary):
        body["Summary"] = "<br>".join(summary)
    result = await RestHelper.fetch_post(
        urljoin(get_endpoint(), "/api/v1/Icm/CreateIncident"),
        json.dumps(body, default=str),
    )
    return IncidentCreationResult(result)

In [None]:
import ssl
import requests
from requests.adapters import HTTPAdapter
from urllib3.poolmanager import PoolManager

class HostSSLContext(ssl.SSLContext):
    def __new__(cls, hostname):
        instance = super(HostSSLContext, cls).__new__(cls, ssl.PROTOCOL_TLS_CLIENT)
        instance._hostname = hostname
        return instance

    def wrap_socket(self, *args, **kwargs):
        kwargs['server_hostname'] = self._hostname
        return super(HostSSLContext, self).wrap_socket(*args, **kwargs)

class HostHeaderSSLAdapter(HTTPAdapter):
    def send(self, request, **kwargs):
        hostname = request.headers.get('Host', None)

        if hostname:
            context = HostSSLContext(hostname)
            self.init_poolmanager(self._pool_connections, self._pool_maxsize, block=self._pool_block, ssl_context=context)

        connection_pool_kwargs = self.poolmanager.connection_pool_kw

        if hostname:
            connection_pool_kwargs['assert_hostname'] = hostname
        elif 'assert_hostname' in connection_pool_kwargs:
            # cleanup previous SSL host binding
            connection_pool_kwargs.pop('assert_hostname', None)

        return super(HostHeaderSSLAdapter, self).send(request, **kwargs)

In [None]:
import dns.resolver

def get_cname(qname, resolver=dns.resolver.Resolver()):
    try:
        cname_records = [rdata.target for rdata in resolver.resolve(qname, "CNAME")]
        if len(cname_records) != 0:
            return cname_records[0].to_text()
    except dns.resolver.NoAnswer:
        return None

def resolve_cname_recursively(domain, resolver=dns.resolver.Resolver()):
    cname = get_cname(domain, resolver)
    while cname:
        print(cname)
        next_cname = get_cname(cname, resolver)
        if not next_cname:
            break
        cname = next_cname
    return cname

def get_a(qname):
    a_records = [rdata.address for rdata in dns.resolver.resolve(qname, "A")]
    if len(a_records) != 0:
        return a_records[0]

In [None]:
from functools import wraps
import asyncio

def exp_retry(retrycount, initial_backoff=10):
    def decorator(f):
        @wraps(f)
        async def wrapped(*args, **kwargs):
            iteration = 0
            backoff = initial_backoff
            while iteration < retrycount:
                try:
                    return await f(*args, **kwargs)
                except Exception as e:
                    print(f'{f.__name__} failed for the iteration {iteration} with {e}')
                
                iteration += 1
                if iteration < retrycount:
                    backoff *= 2
                    print(f'Waiting for {backoff} seconds then retry...')
                    await asyncio.sleep(backoff)
                    print(f'Retrying for {f.__name__}, iteration: {iteration}')
                else:
                    print(f'Exceeded max retry count for {f.__name__}.')
                    raise Exception(f'Exceeded max retry count for {f.__name__}.')
        return wrapped
    return decorator

# wrap all async operation with retry
@exp_retry(3)
async def with_retry(func, *args, **kwargs):
    return await func(*args, **kwargs)

# Probe

In [None]:
import datetime
import requests
from requests.exceptions import RequestException
import time
from xportal import icm
import uuid

session = requests.Session()
session.mount('https://', HostHeaderSSLAdapter())

start_time = time.perf_counter()
cnames = dict()
ingest_data = []
total_probe = 0
failed_probe = 0
# probe till timer ends
while time.perf_counter() - start_time < time_to_probe_in_seconds:
    timestamp = datetime.datetime.now(datetime.timezone.utc)
    request_id = uuid.UUID(int=0)
    status_code = -1
    error_detail = ""
    per_request_start_time = time.perf_counter()
    url = ""
    final_cname = ""
    ip_address = ""

    try:
        # get IP
        final_cname = resolve_cname_recursively(hostname)
        ip_address = get_a(final_cname)
        print(f"Final CNAME: {final_cname}, IP address: {ip_address}")
        url = f"https://{ip_address}/bdprobe?"
        
        if final_cname not in cnames:
            cnames[final_cname] = {ip_address: 1}
        else:
            if ip_address not in cnames[final_cname]:
                cnames[final_cname][ip_address] = 1
            else:
                cnames[final_cname][ip_address] += 1

        # send HEAD request
        response = session.head(url, headers={"Connection": "close", "Host": hostname}, timeout=per_request_timeout_in_seconds)
        request_id = response.headers.get("x-ms-request-id", request_id)
        status_code = response.status_code
        
        # if response.status_code >= 500:
        #     response.raise_for_status()
        print(f"HEAD request to {url} succeeded with status code {response.status_code}, {request_id}")
        consecutive_failure = 0
        if incident_id:
            consecutive_success += 1
    except Exception as e:
        print(f"HEAD request failed: {e}")
        consecutive_failure += 1
        consecutive_success = 0
        failed_probe += 1
        error_detail = str(e)
    
    total_probe += 1
    
    # metric reporting
    per_request_elapsed_time = time.perf_counter() - per_request_start_time
    ingest_data.append([timestamp, account, url, request_id, status_code, per_request_elapsed_time, error_detail, final_cname, ip_address, job_id])
    
    # incident handling
    if not incident_id:
        if consecutive_failure >= consecutive_failure_threshold_to_create_incident or failed_probe >= accumulated_failure_threshold_to_create_incident:
            incident = await create_incident()
            incident_id = incident.incident_id
            print(f"Created incident {incident_id}")

    # don't send too fast
    per_request_elapsed_time = time.perf_counter() - per_request_start_time
    remaining_time = per_request_interval_in_seconds - per_request_elapsed_time
    if remaining_time > 0:
        time.sleep(remaining_time)

In [None]:
print(cnames)

In [None]:
if total_probe == 0:
    raise Exception("Probe failed as no probe was done.")
elif not incident_id and failed_probe/total_probe >= failure_rate_threshold_to_create_incident:
    incident = await create_incident()
    incident_id = incident.incident_id
    print(f"Created incident {incident_id}")

In [None]:
from xportal import kusto

columns = ['Timestamp', 'Account', 'Endpoint', 'RequestId', 'StatusCode', 'Latency', 'ErrorDetail', 'CNAME', 'IP', 'JobId']
await with_retry(kusto.ingest, 'xlivesite', 'xlivesite', 'BDAccountProbeHistory', columns, ingest_data)