Requirements:
ripe.atlas.cousteau

In [None]:
"""
!pip install ripe.atlas.cousteau
"""
!pip install pyspark

In [None]:
from datetime import datetime
import json
import logging
from ripe.atlas.cousteau import MeasurementRequest, AtlasResultsRequest

logging.basicConfig(filename="data/measurement_collect.log", filemode='w',
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
                    datefmt='%d-%b-%y %H:%M:%S', level=logging.DEBUG)
measurement_filters = {
    "start_time__gte": datetime(2021, 1, 1),
    "stop_time__lte": datetime(2021, 1, 2),
    "type": "ping",
    "af": 4,
    "page_size": 500
}

measurements = MeasurementRequest(**measurement_filters)

measurement_ids = []
results_list = []

for msm in measurements:
    measurement_id = msm["id"]
    measurement_ids.append(measurement_id)
    is_success, results = AtlasResultsRequest(**{"msm_id": measurement_id}).create()
    logging.info("Result request is successful: %s", is_success)
    if is_success:
        results_list.extend(results)
        logging.info("Total measurements: %s, processed measurements: %s, Results size: %s, processed results: %s",
                     str(measurements.total_count), str(len(measurement_ids)), str(len(results)),
                     str(len(results_list)))

jsonString = json.dumps(results_list)
jsonFile = open("data/results_list-ipv4_ping-(21-1-1_21-1-2).json", "w")
jsonFile.write(jsonString)
jsonFile.close()

# open output file for writing
with open('data/measurement_ids.txt', 'w') as file_handle:
    json.dump(measurement_ids, file_handle)
file_handle.close()

logging.info("Measurements size: %s", str(measurements.total_count))
logging.info("Measurement IDs size: %s", str(len(measurement_ids)))
logging.info("Results size: %s", str(len(results_list)))
logging.getLogger().setLevel(logging.INFO)

In [None]:
import json
file_name = "data/results_list-ipv4_ping-(21-1-1_21-1-2).json"

with open(file=file_name, mode="r") as file_handle:
    ping_results = json.load(file_handle)

file_handle.close()

print(str(len(ping_results)))

In [None]:
one_result = ping_results[1]

In [None]:
print(one_result)

In [None]:
import requests

def ip_map_city_request(ip_addr):
    url = "https://ipmap.ripe.net/api/v1/locate/{}/best".format(ip_addr)
    response = requests.get(url)
    response_json = response.json()
    if 'location' in response_json:
        return response_json['location']['cityName']
    else:
        return None



In [None]:
for ping_result in ping_results:
    if ping_result["rcvd"] != 0 and ping_result["avg"] != -1:
        from_addr = ping_result["from"]
        from_addr_city = ip_map_city_request(from_addr)
        dst_addr = ping_result["dst_addr"]
        dst_addr_city = ip_map_city_request(dst_addr)
        rtt_list = ping_result["result"]
    else:
        print(ping_result)


APACHE SPARK SECTION STARTS

In [13]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Ping measurement") \
    .getOrCreate()

In [14]:
ping_results = spark.read.json(path="data/results_list-ipv4_ping-(21-1-1_21-1-2).json")

In [None]:
print(ping_results.count())
print(ping_results.printSchema())

In [15]:
valued_ping_results = ping_results.filter((ping_results.rcvd != 0) & (ping_results.avg != -1))

In [None]:
valued_ping_results.count()

In [None]:
valued_ping_results.take(3)

In [None]:
valued_ping_results.show(n=3)

In [111]:
selected_ping_results = valued_ping_results.select('from', 'dst_addr', 'avg', 'max', 'min').withColumnRenamed('from', 'from_addr')

In [112]:
selected_ping_results.show(4)

+--------------+-------------+--------------+----------+----------+
|     from_addr|     dst_addr|           avg|       max|       min|
+--------------+-------------+--------------+----------+----------+
| 203.23.128.44|46.30.201.202|    279.860558|324.039382|234.906895|
|  50.38.42.218|46.30.201.202|135.6853056667|139.013417| 133.56025|
|  95.94.75.171|46.30.201.202|     41.985082| 43.064958| 41.283366|
|189.120.72.237|46.30.201.202|224.1408336667|226.516979|222.869584|
+--------------+-------------+--------------+----------+----------+
only showing top 4 rows



In [123]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import Row

ip_geolocation_function_udf = udf(lambda x: ip_map_city_request(x), returnType=StringType())

from_city_ping_results = selected_ping_results.withColumn('from_city', ip_geolocation_function_udf(selected_ping_results.from_addr))

In [None]:
from_city_ping_results.show()

In [104]:
from pyspark.sql.functions import col, when

cities = ['New York City', 'London', 'City of London', 'Beijing', 'San Francisco', 'Sydney', 'Cape Town', 'São Paulo']
countries = ['United States', 'United Kingdom', 'China', 'Australia', 'South Africa', 'Brazil']

geolocations_rdd = spark.read.csv('data/geolocations-latest.out', header=False) \
.toDF("IP", "geolocation_ID", "city_name", "state_name", "country_name", "country_code_ISO2", "country_code_iso3", "latitude", "longitude", "score") \
.filter(col('city_name').isin(cities) & col('country_name').isin(countries)) \
.select('IP', 'city_name') \
.withColumn('city_name', when(col('city_name') == 'City of London', 'London').otherwise(col('city_name')) ) \
.rdd
#.filter(col('country_name') == 'Dominican Republic') \

In [105]:
geolocations_rdd.take(20)

[Row(IP='213.46.174.97/32', city_name='London'),
 Row(IP='184.150.181.166/32', city_name='New York City'),
 Row(IP='187.100.34.165/32', city_name='São Paulo'),
 Row(IP='63.218.12.146/32', city_name='London'),
 Row(IP='63.218.12.238/32', city_name='London'),
 Row(IP='2001:5a0:12:100::24/128', city_name='New York City'),
 Row(IP='63.218.12.246/32', city_name='London'),
 Row(IP='2600:9000:211b:0:c:5a04:e140:93a1/128', city_name='London'),
 Row(IP='5.57.81.128/32', city_name='London'),
 Row(IP='5.57.81.134/32', city_name='London'),
 Row(IP='5.57.81.150/32', city_name='London'),
 Row(IP='5.57.81.174/32', city_name='London'),
 Row(IP='5.57.81.195/32', city_name='London'),
 Row(IP='2600:9000:211b:1800:c:5a04:e140:93a1/128', city_name='London'),
 Row(IP='191.162.75.107/32', city_name='São Paulo'),
 Row(IP='2001:504:30::ba01:1170:1/128', city_name='San Francisco'),
 Row(IP='5.57.81.200/32', city_name='London'),
 Row(IP='2001:470:1f1d:27b:220:4aff:fec7:af98/128', city_name='London'),
 Row(IP='17

In [151]:
geolocations_rdd.count()

33435

In [22]:
import ipaddress

In [157]:
sample_ip = ipaddress.ip_address('194.176.70.2')
sample_ip

IPv4Address('194.176.70.2')

In [158]:
sample_ip_network = ipaddress.ip_network('194.176.70.2/32')
sample_ip_network

IPv4Network('194.176.70.2/32')

In [159]:
sample_ip in sample_ip_network

True

In [126]:
network_geolocations_rdd = geolocations_rdd.map(lambda loc: Row(IP_network=ipaddress.ip_network(loc.IP), city_name=loc.city_name))

In [127]:
network_geolocations_rdd.take(10)

[Row(IP_network=IPv4Network('213.46.174.97/32'), city_name='London'),
 Row(IP_network=IPv4Network('184.150.181.166/32'), city_name='New York City'),
 Row(IP_network=IPv4Network('187.100.34.165/32'), city_name='São Paulo'),
 Row(IP_network=IPv4Network('63.218.12.146/32'), city_name='London'),
 Row(IP_network=IPv4Network('63.218.12.238/32'), city_name='London'),
 Row(IP_network=IPv6Network('2001:5a0:12:100::24/128'), city_name='New York City'),
 Row(IP_network=IPv4Network('63.218.12.246/32'), city_name='London'),
 Row(IP_network=IPv6Network('2600:9000:211b:0:c:5a04:e140:93a1/128'), city_name='London'),
 Row(IP_network=IPv4Network('5.57.81.128/32'), city_name='London'),
 Row(IP_network=IPv4Network('5.57.81.134/32'), city_name='London')]

In [138]:
network_geolocations_broadcast = spark.sparkContext.broadcast(network_geolocations_rdd.collectAsMap())

In [115]:
selected_ping_results_rdd = selected_ping_results.rdd
selected_ping_results_rdd.take(5)

[Row(from_addr='203.23.128.44', dst_addr='46.30.201.202', avg=279.860558, max=324.039382, min=234.906895),
 Row(from_addr='50.38.42.218', dst_addr='46.30.201.202', avg=135.6853056667, max=139.013417, min=133.56025),
 Row(from_addr='95.94.75.171', dst_addr='46.30.201.202', avg=41.985082, max=43.064958, min=41.283366),
 Row(from_addr='189.120.72.237', dst_addr='46.30.201.202', avg=224.1408336667, max=226.516979, min=222.869584),
 Row(from_addr='95.94.91.161', dst_addr='46.30.201.202', avg=44.897552, max=46.112293, min=43.682811)]

In [132]:
ip_ping_results = selected_ping_results_rdd.map(lambda ping: Row(from_IP_addr=ipaddress.ip_address(ping.from_addr), dst_IP_addr=ipaddress.ip_address(ping.dst_addr), avg=ping.avg, max=ping.max, min=ping.min))

In [133]:
ip_ping_results.take(5)

[Row(from_IP_addr=IPv4Address('203.23.128.44'), dst_IP_addr=IPv4Address('46.30.201.202'), avg=279.860558, max=324.039382, min=234.906895),
 Row(from_IP_addr=IPv4Address('50.38.42.218'), dst_IP_addr=IPv4Address('46.30.201.202'), avg=135.6853056667, max=139.013417, min=133.56025),
 Row(from_IP_addr=IPv4Address('95.94.75.171'), dst_IP_addr=IPv4Address('46.30.201.202'), avg=41.985082, max=43.064958, min=41.283366),
 Row(from_IP_addr=IPv4Address('189.120.72.237'), dst_IP_addr=IPv4Address('46.30.201.202'), avg=224.1408336667, max=226.516979, min=222.869584),
 Row(from_IP_addr=IPv4Address('95.94.91.161'), dst_IP_addr=IPv4Address('46.30.201.202'), avg=44.897552, max=46.112293, min=43.682811)]

In [160]:
def find_city(ip_addr):
    for ip_network, city_name in network_geolocations_broadcast.value.items():
        if ip_addr.version == ip_network.version:
            if ip_addr in ip_network:
                return city_name

print(find_city(sample_ip))

London


In [None]:
ip_ping_results.map(lambda row: find_city(row. ) )