In [None]:
from datetime import datetime
from ripe.atlas.cousteau import AtlasRequest, AtlasResultsRequest
from ripe.atlas.sagan import Result, DnsResult

from ipwhois import IPWhois
STOPPED = 4


def get_measurements_collection_by_date_and_type(date, measurement_type):
    timestamp = datetime.strptime(date, "%Y-%m-%d").timestamp()
    filters = {"type": measurement_type,
               "status": STOPPED,
               "start_time__gte": timestamp,
               "start_time__lt": timestamp+24*60*60,
               "fields": "id,start_time,stop_time,af,query_argument,query_type,result"
               }
    url_path = "/api/v2/measurements/"
    is_success, response = AtlasRequest(
        **{"url_path": url_path}).get(**filters)
    if is_success:
        return response["results"]
    else:
        return None


def get_measurement_results(measurement_id):
    url_path = f"/api/v2/measurements/{measurement_id}/results/"
    is_success, response_results = AtlasRequest(
        **{"url_path": url_path}).get()
    if is_success:
        return response_results
    else:
        return None


def get_probes_geolocation_by_id_list(probe_id_list):
    url_path = "/api/v2/probes/"
    result = []
    for sub_list in [probe_id_list[i:i+500] for i in range(0, len(probe_id_list), 500)]:
        is_success, response_results = AtlasRequest(
            **{"url_path": url_path}).get(**{"id__in": ",".join(sub_list)})
        if is_success:
            result.extend(                  response_results["results"])
        else:
            continue
    return result


def map_measurement_result_to_probe_info(parsed_result):
    return {
        "probe_id": parsed_result.probe_id,
        "rtt": parsed_result.responses[0].response_time,
    }


def create_measurement_hash(measurement_id):
    
    def find_object_by_id(collection, id):
        for obj in collection:
            if obj["id"] == id:
                return obj  # Return the object if found
            
    meausrement_results = get_measurement_results(measurement_id)
    probes_information = [
        map_measurement_result_to_probe_info(DnsResult(result))
        for result in meausrement_results if not DnsResult(result).is_error]
    probes_geolocation = get_probes_geolocation_by_id_list(
        [str(probe_obj["probe_id"]) for probe_obj in probes_information])
    for probe_info in probes_information:
        according_probe_geolocation = find_object_by_id(
            probes_geolocation, probe_info["probe_id"])
        probe_info


def check_dns_measurements(date):
    measurements = get_measurements_collection_by_date_and_type(date, "dns")
    for measurement in measurements:
        print('Measurement ID:', measurement.get("id"))
        print('Start Time:', datetime.fromtimestamp(
            measurement.get("start_time")))
        print('Stop Time:', datetime.fromtimestamp(
            measurement.get("stop_time")))
        print(
            f"Query: {measurement.get('af')} {measurement.get('query_argument')} {measurement.get('query_type')}")
        print('Results Link:', measurement.get("result"))
        print('----')
        measurement_hash = create_measurement_hash(measurement.get("id"))
        print()
        # meausrement_results = get_measurement_results(measurement.get('id'))
        # probes_information = [
        #     map_measurement_result_to_probe_info(DnsResult(result))
        #     for result in meausrement_results if not DnsResult(result).is_error]

        # probes_geolocation=get_probes_geolocation_by_id_list([str(probe_obj["probe_id"]) for probe_obj in probes_information])
        # for res in meausrement_results:
        #     parsed_result = DnsResult(res)
        #     if not parsed_result.is_error:
        #         is_success, results = AtlasRequest(
        #             **{"url_path": f"/api/v2/probes/{parsed_result.probe_id}/"}).get()
        #         print('DNS Response Time:',
        #               parsed_result.responses[0].response_time)
        #         print('Origin IP:', parsed_result.origin)
        #         # print('DNS Response Time:', parsed_result.response_time)
        #         print('DNS Result:', parsed_result.result)
        #         print('----')
        # else:
        #     print('Failed to get the results for the measurement with ID:', result.get("id"))


check_dns_measurements("2022-05-01")

In [2]:
from datetime import datetime, timedelta
from ripe.atlas.cousteau import AtlasRequest

def probes_ipv6_check(probe, start_date, finish_date):
    filters = {"probe": probe,
               "date__gte": start_date,
               "date__lte": finish_date
               }
    url_path = "/api/v2/probes/archive"
    is_success, response = AtlasRequest(
        **{"url_path": url_path}).get(**filters)
    if is_success:
        return [{
                "id":result["id"],
                "asn_v6":result["asn_v6"],
                "asn_v4":result["asn_v4"],
                "date":result["date"],
                } 
                for result in response["results"]    
                ]
    else:
        return None
    
def get_probes_for_country(country_code):
    filters = {
                "country_code": country_code
    }
    url_path = "/api/v2/probes/"
    is_success, response = AtlasRequest(
        **{"url_path": url_path}).get(**filters)
    if is_success:
        return response["results"]
    else:
        return None
    
def check_as_for_probes(country_code, start_date, finish_date):
    probes = get_probes_for_country(country_code)
    start_datetime = datetime.strptime(start_date, "%Y-%m-%d")
    end_datetime = datetime.strptime(finish_date, "%Y-%m-%d")
    delta = end_datetime - start_datetime
    ids = [item["id"] for item in probes]
    results = probes_ipv6_check(ids, start_date, finish_date)
    for i in range(delta.days + 1):
        current_date = start_datetime + timedelta(days=i)
        current_day = current_date.strftime("%Y%m%d")
        as_version_6 = set()
        as_version_4 = set()
        for res in results: 
            if (res["date"]==current_day):
                if (res["asn_v6"]==None):
                    as_version_4.add(res["asn_v4"])
                else:
                    as_version_6.add(res["asn_v6"])
        amount_as_ipv6 = len(as_version_6)
        amount_as_ipv4 = len(as_version_4)
        percentage = (amount_as_ipv6 / (amount_as_ipv6 + amount_as_ipv4)) * 100
        print(f"At date {current_day} the percentage of IPv6 ASes in {country_code} is: {percentage}%")



check_as_for_probes("US", "2022-01-01", "2023-01-01")


    

At date 20220101 the percentage of IPv6 ASes in US is: 35.13513513513514%
At date 20220102 the percentage of IPv6 ASes in US is: 35.13513513513514%
At date 20220103 the percentage of IPv6 ASes in US is: 35.13513513513514%
At date 20220104 the percentage of IPv6 ASes in US is: 35.13513513513514%
At date 20220105 the percentage of IPv6 ASes in US is: 35.13513513513514%
At date 20220106 the percentage of IPv6 ASes in US is: 35.13513513513514%
At date 20220107 the percentage of IPv6 ASes in US is: 35.13513513513514%
At date 20220108 the percentage of IPv6 ASes in US is: 35.13513513513514%
At date 20220109 the percentage of IPv6 ASes in US is: 35.13513513513514%
At date 20220110 the percentage of IPv6 ASes in US is: 35.13513513513514%
At date 20220111 the percentage of IPv6 ASes in US is: 35.13513513513514%
At date 20220112 the percentage of IPv6 ASes in US is: 35.13513513513514%
At date 20220113 the percentage of IPv6 ASes in US is: 35.13513513513514%
At date 20220114 the percentage of IPv

In [None]:

from pprint import pprint
from ipwhois.experimental import bulk_lookup_rdap,get_bulk_asn_whois
ip_list = ['74.125.225.229', '2001:4860:4860::8888', '62.239.237.1', '2a00:2381:ffff::1', '210.107.73.73',
        #    '2001:240:10c:1::ca20:9d1d', '200.57.141.161', '2801:10:c000::', '196.11.240.215', '2001:43f8:7b0::', '133.1.2.5', '115.1.2.3'
           ]
# obj = IPWhois('["8.8.4.4", "1.1.1.1", "2c0f:fb50:4003::"]')
results= get_bulk_asn_whois(addresses=ip_list)
# results, stats = bulk_lookup_rdap(addresses=ip_list)
pprint(results)