# Measurement IP Data Downloader
__authors__: ufukbombar@gmail.com

__date__: 2024-11-24

__goal__: This notebook downloads the ip routing information into binary files given the measurement id. The format is given.

__input__:
 - The measurement table as .feather file from `table_to_measurement_converter`.
 - The measurement_uuid given by the user.

 __output__:
  - The binary data for each agent under the data dir.

__notes__:
The only required fields for finding out the routing is ['probe_dst_prefix', 'reply_src_prefix'].

In [None]:
# Required packages
%pip install pandas requests python-dotenv httpx aiofiles dataclasses tqdm pyarrow

# External imports
import pandas as pd 
from dataclasses import dataclass
import json
from datetime import datetime
from tqdm import tqdm
import asyncio
import os
import ipaddress

# Internal imports
from config import config, Config
import common
import file as ff
import objects

In [None]:
# This is the output of table_downloader
input_tables_name = "../data/measurements/measurement-2024-11-18 22:44:16.feather"
selected_measurement_uuid = "007046a9_518e_46cb_8e70_e598b8bce831"

In [None]:
measurement_tables_df = pd.read_feather(input_tables_name)
measurement_tables_df

In [None]:
selected_measurements_df = measurement_tables_df[
    (measurement_tables_df['cleaned'] == True) & 
    (measurement_tables_df['measurement_uuid'] == selected_measurement_uuid) & 
    (measurement_tables_df['type'] == 'results')
]
selected_measurements_df

In [None]:
# Convert the df to list[dataclass: Measurement]
selected_measurements_list = [objects.Measurement(**row) for _, row in selected_measurements_df.iterrows()]
print(f"There are {len(selected_measurements_list)} table(s).")

In [None]:
async def get_number_of_row(m: objects.Measurement): 
    q_size = f"select count(*) from {m.table_name}"
    cfg = Config()
    cfg.response_format = "CSV"
    raw_response = await common.quick_query(cfg, q_size)
    return int(raw_response.decode())

async def get_results_table_optimized(m: objects.Measurement, num_rows: int, limit: int=1, pbar: tqdm=None, pbar_update=100):
    # Define the packing/compression function
    def from_csv_to_int(binary_data: str) -> tuple[bytes, bool]:
        if not isinstance(binary_data, str):
            raise Exception("Given wrong type!")
        
        a, b = binary_data.split(',')
        a, b = a[1:-1], b[1:-1]
        a, b = ipaddress.IPv6Address(a).ipv4_mapped, ipaddress.IPv6Address(b).ipv4_mapped

        if a == None or b == None:
            return b'', False

        # remove the last byte since it is a prefix. The reouting information is represented with 
        # 3 + 3 = 6 bytes.
        return a.packed[:-1] + b.packed[:-1], True

    # Get the default config
    cfg = Config()
    cfg.response_format = "CSV"

    # Set the quert string
    q = f"select probe_dst_prefix, reply_src_prefix from {m.table_name}{'' if limit == 0 else ' limit ' + str(limit)}"

    # Determine the name of the binary data
    filepath = f"../data/ip_data/{m.measurement_uuid}-{ff.get_timestr()}/{m.table_name}.bin"
    num_pairs_written = 0
    num_pairs_expected = num_rows
    
    try:
        # with ff.BinaryFile(filepath) as bf:
        async for row in common.run_query_iter_lines(cfg, q):
            b, ok = from_csv_to_int(row)
            if not ok: continue
            num_pairs_written += 1

            # pbar.update(6 / (1024*1024)) # one row is 6 bytes
            pbar.update(1)

                # bf.write(b) # writes 6 bytes each time.
    except Exception as e:
        print(f"Exception on {m.table_name}, exitting")
        raise e
        return (False, num_pairs_expected, num_pairs_written)

    return (True, num_pairs_expected, num_pairs_written)

async def get_results_table_optimized_from_list(m_list: list[objects.Measurement], limit: int=1):
    num_rows_list = await asyncio.gather(*[get_number_of_row(m) for m in m_list])
    if limit == 0:
        num_rows_c = sum(num_rows_list)
        num_rows_tqdm = num_rows_c
    else:
        num_rows_c = limit
        num_rows_tqdm = limit * len(m_list)

    # One row is 6 bytes
    # Bytes
    total_mb = round((6 * num_rows_c) / (1024*1024), ndigits=2) + 1
    print(f"Will download total of {num_rows_c} row(s) and {total_mb} MB.")

    # pbar = tqdm(total=num_rows_tqdm, desc="Downloading", ncols=200)
    # r = await asyncio.gather(*[get_results_table_optimized(m, nr, limit=limit, pbar=pbar) for m, nr in zip(m_list, num_rows_list)])
    # pbar.close()
    # return r

# ok_expected_writted_list = await get_results_table_optimized_from_list(selected_measurements_list, limit=0)

# for m, ok_exp_writ in zip(selected_measurements_list, ok_expected_writted_list):
#     ok, expected, written = ok_exp_writ
#     print(f"For agent {m.agent_uuid} {written}/{expected} pairs are saved with {'success' if ok else 'error'}")

# await get_results_table_optimized(selected_measurements_list[0], 1, limit=1)

In [None]:
total_number_of_pairs = sum(await asyncio.gather(*[get_number_of_row(m) for m in selected_measurements_list]))
total_number_of_pairs

In [None]:
avg_number_of_pairs_per_table = total_number_of_pairs // len(selected_measurements_list)
avg_number_of_pairs_per_table

# DO NOT RUN

In [None]:
# Test for ip conversion -> IT WORKS
def from_bin_to_ipv4(binary_data: bytes):
    a, b = binary_data[:3] + b'\00', binary_data[3:] + b'\00'
    return ipaddress.ip_address(a), ipaddress.ip_address(b)

from_bin_to_ipv4(b'\x01\x00\xd5\xcb\xbe\xfa') # expected ("::ffff:1.0.213.0","::ffff:203.190.250.0")

In [None]:
import asyncio
from tqdm.asyncio import tqdm

async def nested_task(task_id, total_steps):
    # Create a nested progress bar for each task
    for i in tqdm(range(total_steps), desc=f"Subtask {task_id}", leave=False, ncols=100):
        await asyncio.sleep(0.1)  # Simulate async work

async def main_task(task_id, total_subtasks, subtask_steps):
    # Create a progress bar for the main task
    for i in tqdm(range(total_subtasks), desc=f"Main Task {task_id}", ncols=100):
        # Simulate async work for the main task
        await asyncio.sleep(0.2)
        
        # Call the nested task and await it
        await nested_task(i, subtask_steps)

async def main():
    total_tasks = 3  # Total number of main tasks
    total_subtasks = 5  # Subtasks per main task
    subtask_steps = 4  # Steps per nested subtask

    tasks = []
    for i in range(total_tasks):
        tasks.append(main_task(i, total_subtasks, subtask_steps))

    # Run all tasks concurrently
    await asyncio.gather(*tasks)

# Run the main async function
await main()


In [None]:
b = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x01\x02\xf1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xb4\xb4\xf8\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x01\x02\xf1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xb4\xb4\xf8\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x01\x02\xf1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xb4\xb4\xf8\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x01\x02\xf1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xb4\xb4\xf8\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x01\x02\xf1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xb4\xb4\xf8\x00'

In [None]:
len(b)
b[::8]

In [None]:
a = "aAbBcCdDeEfFgGhH"
row_size = 2
offset = 0

for i in range(len(a)):
    print(a[offset:offset + row_size])
    offset += row_size

In [None]:
import io
import ipaddress

def bytes_consumer(buffer_bytes: bytes):
    row_size = 32
    ipv6_size = 16
    new_row_size = 6
    num_pairs = len(buffer_bytes) // row_size # each row is 32 bytes
    new_buffer_size = num_pairs * new_row_size # Each row will be 6 bytes in the new one

    new_buffer = io.BytesIO()
    new_buffer.truncate(new_buffer_size)

    offset = 0
    for _ in tqdm(range(num_pairs)):
        row = buffer_bytes[offset: offset + row_size]
        
        first = buffer_bytes[offset:offset + ipv6_size]
        second = buffer_bytes[offset + ipv6_size:offset + row_size]

        first_address = ipaddress.ip_address(first).ipv4_mapped
        second_address = ipaddress.ip_address(second).ipv4_mapped

        if first_address == None or second_address == None:
            print("Invalid IPv6 Addresses, this might be a problem.")

        new_buffer.write(first_address.packed[:-1])
        new_buffer.write(second_address.packed[:-1])

        offset += row_size

    new_buffer.seek(0)
    return new_buffer


num_in_long_bytes = 75_830_108 # average number of paris per agent
num_in_long_bytes = 1_000_000
long_bytes = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x01\x02\xf1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xb4\xb4\xf8\x00' * num_in_long_bytes
buf = bytes_consumer(long_bytes)
bytes_from_buffer = buf.read()

In [None]:
ip_addresses = [ipaddress.ip_address(ip) for ip in bytes_from_buffer[:32*3:3]]
ip_addresses


In [None]:
test_cfg = Config()
test_cfg.response_format = "RowBinary"
await common.quick_query(test_cfg, 'with [1, 2, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] as ii select ii')

In [None]:
a