In [1]:
# Imports
import os
import numpy as np
import pandas as pd
import sqlite3
import requests
from html.parser import HTMLParser
import pybgpstream
from datetime import datetime
import geopandas as gpd
import matplotlib.pyplot as plt
import geopandas as gpd
from shapely.geometry import Point, LineString
import warnings
from IPython.display import clear_output
import time
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, sum, collect_list
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import sqlite3
from pyspark.sql.functions import concat_ws
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StringType

# warnings.filterwarnings("ignore")

In [3]:
# Creating the database class 
class DBConnection:
    def __init__(self, dbname = 'data.db'):
        self.db_con = sqlite3.connect(dbname)
        self.db_cur = self.db_con.cursor()
    
    def create_table(self):
        self.db_cur.execute('''create table if not exists au_systems (
            asn integer PRIMARY KEY,
            organization text,
            country text
        )''')

    def insert(self, asn, org_info, country):
        self.db_cur.execute(f"insert into au_systems values ('{asn}', '{org_info}', '{country}')")

    def insert(self, asn, org_info, country):
        try:
            self.db_cur.execute("INSERT OR IGNORE INTO au_systems VALUES (?, ?, ?)", (asn, org_info, country))
        except sqlite3.OperationalError as e:
            print("Database locked. Retrying in 1 second...")
            time.sleep(1) 
            # Retry the insert operation
            self.insert(asn, org_info, country)  

    def find(self, column, value):
        self.db_cur.execute(f"select * from au_systems where {column}='{value}'")
        return self.db_cur.fetchall()

    def find_one(self, column, value):
        self.db_cur.execute(f"select * from au_systems where {column}='{value}'")
        return self.db_cur.fetchone()
    
    def find_all(self):
        self.db_cur.execute('select * from au_systems')
        return self.db_cur.fetchall()
    
    def commit(self):
        self.db_con.commit()

    def close(self):
        self.db_con.close()

### Create the database for ASNs 

Only run the cell block below if you have not yet created this database before or if you want to update the exicting one.

In [4]:
# # HTML Parser Class
# class HTMLFilter(HTMLParser):
#     text = ""
#     def handle_data(self, data):
#         self.text += data
        
# html_filter = HTMLFilter()

# # Download content from the resource
# url = "https://www.cidr-report.org/as2.0/autnums.html"
# r = requests.get(url, allow_redirects = True)
# html_filter.feed(r.content.decode("utf-8"))

# db = DBConnection()
# db.create_table()

# # Populate the database
# lst = html_filter.text.splitlines( )
# for i in range(14,len(lst)-8):
#     line = lst[i]
#     asn, org_info, country_code = int(line[2:8].strip()), line[8:-4].strip(), line[-2:]
#     db.insert(asn, org_info, country_code)

# db.commit()
# db.close()

In [7]:
# Process the data and translate the Country Codes for the ASNs
def preprocess(df):
    # Convert Path column to tuples
    def path_to_tuple(path):
        return tuple(path)

    path_to_tuple_udf = udf(path_to_tuple, ArrayType(StringType()))

    # Define the list of invalid country codes
    invalid_country_codes = ['EU', 'ZZ']
    db = DBConnection()
    zz_eu_count = 0
    unknown_count = 0

    def process_path(path):
        nonlocal zz_eu_count, unknown_count
        db = DBConnection()  
        country_path = []
        count = 0
        count2 = 0
        for asn in path:
            result = db.find('asn', asn)
            if result:
                if result[0][2] not in invalid_country_codes:
                    country_path.append(result[0][2]) 
                else:
                    count2 += 1
            else:
                count += 1
        db.close() 
        zz_eu_count += count2
        unknown_count += count
        return country_path

    process_path_udf = udf(process_path, ArrayType(StringType()))

    # Apply the UDF to convert Path column to tuples
    df = df.withColumn("Path", path_to_tuple_udf(df["Path"]))

    # Apply the UDF to process paths and translate ASNs to country codes
    df = df.withColumn("processed_path", process_path_udf(df["Path"]))

    # Separate country_path, count, and count2
    df = df.withColumn("country_path", df["processed_path"])

    # Drop the processed_path column
    df = df.drop("processed_path")
    print(f'{zz_eu_count} cases of asns found that are invalid (ZZ or EU)')
    print(f'{unknown_count} cases of asns found that belong to unknown countries')
    return df

# Transform the data for more efficient storage
def transformation3(df_update):
    transformed_df = df_update.groupBy("Path").agg(
        count("Time").alias("Frequency"),
        collect_list("country_path").alias("country_paths"), # Won't recomment this line for storage complexity
        collect_list("Router Collector Name").alias("Router_Collectors") # Won't recomment this line for storage complexity
    ).withColumn("path_length", F.size("Path"))

    # Remove duplicate values from the "Router_Collectors" column
    transformed_df = transformed_df.withColumn("Router_Collectors", F.array_distinct("Router_Collectors"))
    transformed_df = transformed_df.withColumn("country_paths", F.array_distinct("country_paths"))

    return transformed_df

# Combine data of all days of belonging to the same year 
def combine_datasets_for_january(spark, year, day_range):

    # Create an empty DataFrame to store combined data for January
    combined_df = None

    # Loop through each day in January (1st to 7th)
    for day in range(1, day_range+1):
        # Load the dataset for the specific day and year
        file_path = f'BGP_data_{year}_j{day}'
        df = spark.read.parquet(file_path)

        # Combine the DataFrame with previous ones
        if combined_df is None:
            combined_df = df
        else:
            combined_df = combined_df.union(df)
    combined_df.write.format("parquet").save(f'combined_{year}')

    return combined_df

# Process the data
def data_process2(spark, year):
    
    df = spark.read.parquet(f'combined_{year}')
    df2 = preprocess(df)
    df3 = transformation3(df2)
    df3.write.format("parquet").save(f'second_transformed_{year}')
    return df3
    

In [11]:
spark = SparkSession.builder \
            .appName("Combiner") \
            .config("spark.executor.memory", "2g") \
            .config("spark.driver.memory", "2g") \
            .getOrCreate()


# Call the function to combine datasets for January for each year
combined_january_data = combine_datasets_for_january(spark, '2017', 7)

# Show the combined DataFrame
combined_january_data.show()

spark.stop()


                                                                                

+-------+--------------------+---------------------+-------------------+
|Peer AS|                Path|Router Collector Name|               Time|
+-------+--------------------+---------------------+-------------------+
|   8763| [8763, 6939, 24441]|                rrc12|2017-02-01 11:09:24|
|   8763| [8763, 6939, 38235]|                rrc12|2017-02-01 11:09:24|
|   8763| [8763, 6939, 38235]|                rrc12|2017-02-01 11:09:24|
|   8763| [8763, 6939, 38235]|                rrc12|2017-02-01 11:09:24|
|  36351|[36351, 1299, 132...|      route-views.isc|2017-02-01 11:09:26|
|  36351|[36351, 1299, 132...|      route-views.isc|2017-02-01 11:09:26|
|  58511|[58511, 24218, 9902]|     route-views.linx|2017-02-01 11:09:27|
|  58511|[58511, 24218, 9902]|     route-views.linx|2017-02-01 11:09:27|
|  58511|[58511, 24218, 9902]|     route-views.linx|2017-02-01 11:09:27|
|  58511|[58511, 24218, 9902]|     route-views.linx|2017-02-01 11:09:27|
|  58511|[58511, 24218, 9902]|     route-views.linx

In [15]:
spark = SparkSession.builder \
    .appName("GROOT") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

df = data_process2(spark, '2017_feb')
df.show()
spark.stop()

0 cases of asns found that are invalid (ZZ or EU)
0 cases of asns found that belong to unknown countries


                                                                                

+--------------------+---------+--------------------+--------------------+-----------+
|                Path|Frequency|       country_paths|   Router_Collectors|path_length|
+--------------------+---------+--------------------+--------------------+-----------+
|[10026, 1299, 483...|       39|[[JP, SE, CN, TH,...|[route-views3, ro...|          6|
|[10026, 1299, 819...|        6|[[JP, SE, LV, LV,...|[route-views3, ro...|          5|
|[10026, 15412, 99...|        8|  [[JP, GB, KH, KH]]|      [route-views3]|          4|
|[10026, 174, 1299...|        3|[[JP, US, SE, SG,...|[route-views3, ro...|         18|
|[10026, 174, 3491...|      113|[[JP, US, US, TH,...|[route-views3, ro...|          6|
|[10026, 20804, 50...|        4|[[JP, PL, PL, PL,...|      [route-views3]|         12|
|[10026, 24218, 13...|        3|      [[JP, MY, KH]]|[route-views.sydn...|          3|
|[10026, 3257, 349...|       36|[[JP, US, US, TH,...|[route-views3, ro...|          6|
|[10026, 3257, 349...|       95|[[JP, US, U