In [5]:
# Program to find origin, provider, prefix, its length and version from raw data generated using bgpreader as 
# bgpreader -t updates -w 1704067200,1704067200 -A _32787_ >> raw_as13335_01_jan_2024.txt
# Use python multicore programming feature

import pandas as pd
import csv
from concurrent.futures import ProcessPoolExecutor

def process_chunk(lines):
    """Process a chunk of lines and extract prefix, AS path, and origin AS."""
    chunk_data = []
    for line in lines:
        fields = line.strip().split('|')
        if len(fields) > 12 and fields[0] == "U":
            try:
               # Extract prefix, AS path, and origin AS
                prefix = fields[9]
                as_path = fields[11].split()
                
                update_type = fields[1]

                # Find provider here checking AS repetetitions and the same organization owning multiple ASes
#                 provider = find_immediate_provider(as_path)
                
                provider = as_path[-2] if len(as_path) > 1 else None  # The ASN before the origin AS
                origin_as = as_path[-1] if as_path else None
                
                pfx_len = int(prefix.split('/')[1]) if '/' in prefix else None
                ip_version = "IPv6" if ':' in prefix else "IPv4"

                # Append data to the chunk
                chunk_data.append([prefix, ' '.join(as_path), update_type, origin_as, provider, pfx_len, ip_version])
            
            except IndexError:
                continue
    return chunk_data

def process_route_data_parallel(file_path, output_file, num_workers=8, chunk_size=100000):
    with open(file_path, 'r') as file:
        lines = []
        
        # Initialize CSV output
        with open(output_file, 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(['prefix', 'as_path', 'update_type', 'origin_as', 'provider', 'pfx_len', 'ip_version'])

        # Process the file in chunks with multiple workers
        with ProcessPoolExecutor(max_workers=num_workers) as executor:
            futures = []
            for line in file:
                lines.append(line)
                
                # When lines reach chunk size, process them
                if len(lines) >= chunk_size:
                    futures.append(executor.submit(process_chunk, lines))
                    lines = []

            # Process any remaining lines after the loop
            if lines:
                futures.append(executor.submit(process_chunk, lines))

            # Collect results from all futures and write to CSV
            for future in futures:
                chunk_data = future.result()
                if chunk_data:
                    df = pd.DataFrame(chunk_data, columns=['prefix', 'as_path', 'update_type', 'origin_as', 'provider', 'pfx_len', 'ip_version'])
                    df.to_csv(output_file, mode='a', header=False, index=False)

    print(f"Data successfully saved to {output_file}")

# Example usage
file_path = 'raw_as1103_17_jan_2025.txt'
output_file = 'raw_as1103_17_jan_2025.csv'
process_route_data_parallel(file_path, output_file)


df = pd.read_csv('raw_as1103_17_jan_2025.csv', low_memory=False)
 
# Remove duplicate rows in dataframe
df = df.drop_duplicates()

# Write unique rows into a file
df.to_csv('unique_raw_as1103_17_jan_2025.csv', index=False)
print("Completed.")

Data successfully saved to test.csv
Completed.
