In [1]:
import config
import pandas as pd
from sqlalchemy import text
import bgpkit
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
from manage_storage import FileManager

In [2]:
# configs
base_dir = config.base_dir / 'routeviews'
assert base_dir.exists()
url_engine = config.make_engine('metadata')
output_engine = config.make_engine('routes')
route_collectors = ['route-views.jinx','route-views.napafrica', 'route-views.kixp', 'route-views.gixa']

In [3]:
# setup directory structure
fm = FileManager(base_dir)
# fm.make_directory_structure(2012, 2024)

In [9]:
# functions
def get_output_file_name(url):
    """Generates output file name from URL."""
    parts = url.split('/')
    collector_name = parts[3].split('.')[1]
    date_parts = parts[-1].split('.')[1]
    date = f"{date_parts[4:6]}_{date_parts[6:8]}_{date_parts[:4]}"
    output = f"route-views.{collector_name}_{date}"
    return output


def flatten_list_to_string(_list):
    """Flatten list to string."""
    if _list is None or len(_list) == 0:
        return None
    return ' '.join([str(elem) for elem in _list])

def parse_mrt_file(file_path):
    """Parse MRT file and return a list of dictionaries."""
    output_file_name = get_output_file_name(file_path)
    print(f">>> START: {output_file_name}")
    parser = bgpkit.Parser(url=file_path)
    elements = parser.parse_all()
    rc_name = output_file_name.split('_')[0]
    _df = pd.DataFrame(elements)
    _df['origin_asns'] = _df['origin_asns'].apply(flatten_list_to_string)
    _df['communities'] = _df['communities'].apply(flatten_list_to_string)
    _df['route_collector_name'] = rc_name
    _df['timestamp'] = pd.to_datetime(_df['timestamp'], unit='s')
    # pick only columns we need
    _df_db = _df[['timestamp', 'peer_ip','peer_asn','prefix', 'as_path', 'origin_asns', 'origin','communities','route_collector_name', ]]
    # save to db
    _date = output_file_name.split('_')[3] + "-" + output_file_name.split('_')[1] + "-" + output_file_name.split('_')[2]
    # _date = _date.split('_')[2] + "-" + _date.split('_')[0] + "-" + _date.split('_')[1]
    # print(_date)
    output_dir = fm.get_directory_name(_date)
    _df_db.to_sql('rv_ipv4', output_engine, if_exists='append', index=False)
    _df.to_parquet(f"{output_dir}/{output_file_name}.parquet")
    print(f">>> DONE: {output_file_name}")
    return len(_df)


def get_latest_daily_urls(route_collector_name: str, start_date: str, end_date: str, db_engine) -> list:
    query = text("""
        SELECT url
        FROM routeviews_urls
        WHERE route_collector_name = :route_collector_name
        AND timestamp IN (
            SELECT MAX(timestamp)
            FROM routeviews_urls
            WHERE route_collector_name = :route_collector_name
            AND DATE(timestamp) BETWEEN :start_date AND :end_date
            GROUP BY DATE(timestamp)
        )
    """)

    _df = pd.read_sql(
        query,
        db_engine,
        params={
            'route_collector_name': route_collector_name,
            'start_date': start_date,
            'end_date': end_date
        }
    )

    return _df['url'].tolist()


def generate_dates():
    """Generate dates.
    For every month, get the first and last day of the month, from 2010 to 2023.
    Return all dates as a list. There is no need for start and end dates.
    """
    date_list = []
    for year in tqdm(range(2010, 2024)):
        for month in range(1, 13):
            start_date = (pd.Timestamp(year, month, 1)).strftime("%Y-%m-%d")
            seventh_day = (pd.Timestamp(year, month, 7)).strftime("%Y-%m-%d")
            fourteenth_day = (pd.Timestamp(year, month, 14)).strftime("%Y-%m-%d")
            month_end = (pd.Timestamp(year, month, 1) + pd.offsets.MonthEnd(1)).strftime("%Y-%m-%d")
            date_list.append(start_date)
            date_list.append(seventh_day)
            date_list.append(fourteenth_day)
            date_list.append(month_end)
    return date_list

In [5]:
# get latest daily urls for jinx route collector
jinx_urls = get_latest_daily_urls('route-views.jinx', '2012-07-12', '2019-08-15', url_engine)
len(jinx_urls)

31

In [6]:
with ThreadPoolExecutor(max_workers=10) as executor:   
    futures = [executor.submit(parse_mrt_file, url) for url in jinx_urls[10:]]
    for future in as_completed(futures):
        try:
           _dum = future.result()
        except Exception as e:
            print(f"!!<< {e}")

>>> START: route-views.jinx_10_11_2012
>>> START: route-views.jinx_10_12_2012
>>> START: route-views.jinx_10_13_2012
>>> START: route-views.jinx_10_14_2012
>>> START: route-views.jinx_10_15_2012
>>> START: route-views.jinx_10_16_2012
>>> START: route-views.jinx_10_17_2012
>>> START: route-views.jinx_10_18_2012
>>> START: route-views.jinx_10_19_2012
>>> START: route-views.jinx_10_20_2012
2012-10-11
2012-10-12
2012-10-13
2012-10-16
2012-10-17
2012-10-14
2012-10-15
2012-10-18
2012-10-19
2012-10-20
>>> START: route-views.jinx_10_21_2012
!!<< (pymysql.err.OperationalError) (1050, "Table 'rv_ipv4' already exists")
[SQL: 
CREATE TABLE rv_ipv4 (
	timestamp DATETIME, 
	peer_ip TEXT, 
	peer_asn BIGINT, 
	prefix TEXT, 
	as_path TEXT, 
	origin_asns TEXT, 
	origin TEXT, 
	communities TEXT, 
	route_collector_name TEXT
)

]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
>>> START: route-views.jinx_10_22_2012
>>> START: route-views.jinx_10_23_2012
!!<< (pymysql.err.OperationalError) (105

In [7]:
# get latest urls for napafrica route collector
napafrica_urls = get_latest_daily_urls('route-views.napafrica', '2018-02-01', '2023-09-14', url_engine)
kixp_urls = get_latest_daily_urls('route-views.kixp', '2014-01-01', '2023-09-14', url_engine)
gixa_urls = get_latest_daily_urls('route-views.gixa', '2019-05-29', '2023-09-14', url_engine)
all_urls = napafrica_urls + kixp_urls + gixa_urls
len(all_urls)

6682

In [None]:
# download files
with ThreadPoolExecutor(max_workers=10) as executor:   
    futures = [executor.submit(parse_mrt_file, url) for url in kixp_urls]
    for future in as_completed(futures):
        try:
            _dum = future.result()
        except Exception as e:
            print(f"!!<< {e}")