In [16]:
"""
# BGP Live Stream Data Processing Notebook

## Description:
This Jupyter Notebook is designed to **subscribe to a real-time BGP (Border Gateway Protocol) feed** from the RiS internet exchange and , process the received 
messages, and store them in a structured Pandas DataFrame. It includes functionalities to:

- **Start, pause, and stop** the BGP data feed using interactive buttons.
- Convert **hex-encoded fields** to **ASCII** for human-readable format.
- Extract **useful features** from timestamps for **machine learning** applications.
- Save processed data as a **CSV file** for further analysis.
- Develop a 

## Features:
- Real-time WebSocket connection to RIS Live BGP data stream.
- Data parsing and transformation for easy analysis.
- Machine learning-ready time-based feature extraction.
- User-controlled streaming via Jupyter widgets.
- Preprocesses the dataframe for Useful Time-Series Analysis to figure out Trends in BGP updates over time. 
-ASN Contribution Analysis, to Identify major BGP route announcers. 
-Anomaly Detection, via monitoring Sudden spikes in withdrawals could indicate BGP attacks. 
-Feature Engineering for future ML models, is_weekend, hour, and announcement_count etc. are useful features.
-Main Goals for the Dashboard was as follows:

    -Track Routing Changes Over Time (Using the Time-Series Chart).
    -Analyze Autonomous Systen Numbers Contributions (Identify major BGP peering routers).
    -Detect Network Anomalies (Spike in withdrawals = possible route hijack).

## Usage:
1. Run the notebook.
2. Use the **Start button** to begin collecting BGP messages.
3. Use the **Pause button** to temporarily freeze processing.
4. Use the **Stop button** to stop the WebSocket stream.
5. Access collected data using `df_messages`.
6. Save data locally using `df_messages.to_csv("output.csv", index=False)`.

## Requirements:
- Python 3.x
- Libraries: `websocket-client`, `pandas`, `ipywidgets`, `datetime`
- Jupyter Notebook environment

## Developer:
- **Author:** Shaji R. Nathan
- **Email:** shaji.nathan@ipinfusion.com
- **GitHub/Portfolio:** [Your GitHub or Portfolio Link]

## License:
© 2025 [Shaji Ravindra. Nathan]. All rights reserved.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by the Free Software Foundation,
either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU Affero General Public License for more details.

For details on GNU AGPLv3 licensing please, refer https://www.gnu.org/licenses/agpl-3.0.html.

## Version:
- **Date:** February 9, 2025
- **Version:** 1.0.0

"""


'\n# BGP Live Stream Data Processing Notebook\n\n## Description:\nThis Jupyter Notebook is designed to **subscribe to a real-time BGP (Border Gateway Protocol) feed** from the RiS internet exchange and , process the received \nmessages, and store them in a structured Pandas DataFrame. It includes functionalities to:\n\n- **Start, pause, and stop** the BGP data feed using interactive buttons.\n- Convert **hex-encoded fields** to **ASCII** for human-readable format.\n- Extract **useful features** from timestamps for **machine learning** applications.\n- Save processed data as a **CSV file** for further analysis.\n\n## Features:\n- Real-time WebSocket connection to RIS Live BGP data stream.\n- Data parsing and transformation for easy analysis.\n- Machine learning-ready time-based feature extraction.\n- User-controlled streaming via Jupyter widgets.\n\n## Usage:\n1. Run the notebook.\n2. Use the **Start button** to begin collecting BGP messages.\n3. Use the **Pause button** to temporarily 

## Border Gateway Protocol 
(BGP) is a set of rules that determine how data is routed on the internet. It's a fundamental part of the internet's global routing system, allowing networks to communicate with each other. 

### How does BGP work?

    BGP exchanges routing information between autonomous systems (ASes). 

* ASes are networks managed by a single organization or service provider. 
* BGP creates network stability by finding new paths when one path fails. 
* BGP routing decisions are based on network policies and rules set by network administrators. 

### What does BGP do? 

Enables networks to send and receive information, Allows network administrators to control traffic flow, Optimizes network performance, Balances load, and Manages costs. 

In [17]:
import json
import websocket
import threading
import time
import pandas as pd
import ipywidgets as widgets
from IPython.display import display, clear_output

### Setup the global variables

In [19]:
# Ensures only one WebSocket instance runs at a time 
is_running = False   
is_paused = False
message_count = 0

# Default value, user can update it, since the messages coming in really fast

message_limit = 10   
ws_instance = None

# List to store the messages from the routing update

data_list = []   

# Store WebSocket thread 
ws_thread = None

#dataframe to store the messages 
df_messages = pd.DataFrame()   

### Input Widget for selecting number of messages to collect

In [20]:
# User input widget for number of messages
message_input = widgets.IntText(value=10, description="Messages:", continuous_update=False)
start_button = widgets.Button(description="Start", button_style='success')

# UI Controls
pause_button = widgets.ToggleButton(value=False, description="Pause Feed", button_style='warning')
stop_button = widgets.Button(description="Stop Feed", button_style='danger')

#### Methods to collect and process the message from the RiS feed and store it into a pandas dataframe

In [21]:
def on_message(ws, message):
    """Handles incoming messages and stores them in a global DataFrame."""
    global is_paused, message_count, message_limit, data_list, ws_instance, is_running, df_messages

    try:
        if not message:
            print("Received empty message.")
            return

        parsed = json.loads(message)

        # Pause handling
        while is_paused:
            time.sleep(0.5)

        # Store data in list
        data_list.append(parsed.get("data", {}))
        message_count += 1
        print(f"Received {message_count}/{message_limit}")

        # Stop WebSocket after reaching the limit
        if message_count >= message_limit:
            print("Message limit reached. Stopping WebSocket...")
            is_running = False  # Stop WebSocket thread
            ws_instance.close()  # Properly close WebSocket
            update_dataframe()  # Save to global DataFrame

    except json.JSONDecodeError as e:
        print("Error decoding JSON:", e)

def update_dataframe():
    """Updates the global DataFrame with collected messages."""
    global df_messages
    df_messages = pd.DataFrame(data_list)  # Convert list to DataFrame
    print("Messages stored in 'df_messages'. You can access it anytime.")

def on_error(ws, error):
    """Handles WebSocket errors."""
    print("WebSocket Error:", error)

def on_close(ws, close_status_code, close_msg):
    """Handles WebSocket closure."""
    print("WebSocket closed:", close_status_code, close_msg)
    global is_running
    is_running = False  # Ensure the flag is reset when WebSocket stops

def on_open(ws):
    """Subscribes to the RIS Live stream."""
    params = {
        "moreSpecific": True,
        "host": "rrc21",
        "socketOptions": {
            "includeRaw": False
        }
    }
    ws.send(json.dumps({
        "type": "ris_subscribe",
        "data": params
    }))
    print("Subscription request sent.")

def run_websocket():
    """Runs the WebSocket connection in a separate thread."""
    global ws_instance, is_running
    ws_instance = websocket.WebSocketApp(
        "wss://ris-live.ripe.net/v1/ws/?client=py-example-1",
        on_message=on_message,
        on_error=on_error,
        on_close=on_close
    )
    ws_instance.on_open = on_open
    ws_instance.run_forever()

def toggle_pause(change):
    """Pause or resume message processing."""
    global is_paused
    is_paused = change['new']
    print("Feed paused." if is_paused else "Feed resumed.")

def stop_feed(b):
    """Stops the WebSocket and clears data."""
    global is_running, ws_instance, message_count, data_list, ws_thread

    if not is_running:
        print("Feed is already stopped.")
        return

    print("Stopping feed...")
    is_running = False
    message_count = 0
    data_list = []

    if ws_instance:
        ws_instance.close()
    
    if ws_thread and ws_thread.is_alive():
        ws_thread.join()  # Ensure the thread is properly stopped

    print("Feed stopped.")

def start_feed(b):
    """Starts the WebSocket with user-defined message limit."""
    global is_running, message_limit, message_count, data_list, ws_thread

    if is_running:
        print("Feed is already running.")
        return

    message_limit = message_input.value  # Get user input
    message_count = 0
    data_list = []
    is_running = True

    print(f"Starting feed, collecting {message_limit} messages...")
    ws_thread = threading.Thread(target=run_websocket)
    ws_thread.daemon = True
    ws_thread.start()



### Message Event Handling for the UI

The event handlers will make the UI interactive in Jupyter Notebook.
Allows the to control the BGP feed dynamically.


In [22]:
# Add the event handlers
pause_button.observe(toggle_pause, names='value')
stop_button.on_click(stop_feed)
start_button.on_click(start_feed)

# Display UI
display(message_input, start_button, pause_button, stop_button)

print("\nOnce messages are collected, access them using: `df_messages`")

IntText(value=10, description='Messages:')

Button(button_style='success', description='Start', style=ButtonStyle())



Button(button_style='danger', description='Stop Feed', style=ButtonStyle())


Once messages are collected, access them using: `df_messages`
Subscription request sent.
Received 1/100
Received 2/100
Received 3/100
Received 4/100
Received 5/100
Received 6/100
Received 7/100
Received 8/100
Received 9/100
Received 10/100
Received 11/100
Received 12/100
Received 13/100
Received 14/100
Received 15/100
Received 16/100
Received 17/100
Received 18/100
Received 19/100
Received 20/100
Received 21/100
Received 22/100
Received 23/100
Received 24/100
Received 25/100
Received 26/100
Received 27/100
Received 28/100
Received 29/100
Received 30/100
Received 31/100
Received 32/100
Received 33/100
Received 34/100
Received 35/100
Received 36/100
Received 37/100
Received 38/100
Received 39/100
Received 40/100
Received 41/100
Received 42/100
Received 43/100
Received 44/100
Received 45/100
Received 46/100
Received 47/100
Received 48/100
Received 49/100
Received 50/100
Received 51/100
Received 52/100
Received 53/100
Received 54/100
Received 55/100
Received 56/100
Received 57/100
Receive

In [23]:
df_messages.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 13 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   timestamp      100 non-null    float64
 1   peer           100 non-null    object 
 2   peer_asn       100 non-null    object 
 3   id             100 non-null    object 
 4   host           100 non-null    object 
 5   type           100 non-null    object 
 6   path           100 non-null    object 
 7   community      100 non-null    object 
 8   origin         95 non-null     object 
 9   med            41 non-null     float64
 10  aggregator     14 non-null     object 
 11  announcements  100 non-null    object 
 12  withdrawals    100 non-null    object 
dtypes: float64(2), object(11)
memory usage: 10.3+ KB


In [24]:
df_messages

Unnamed: 0,timestamp,peer,peer_asn,id,host,type,path,community,origin,med,aggregator,announcements,withdrawals
0,1.739261e+09,2001:7f8:54:5::1,42064,2001:7f8:54:5::1-0194f405747e0000,rrc21.ripe.net,UPDATE,"[6939, 4657, 58621]","[[42064, 64611], [42064, 64649], [42064, 65011...",IGP,0.0,58621:10.0.0.33,"[{'next_hop': '2001:7f8:54:5::13,fe80::4288:2f...",[]
1,1.739261e+09,37.49.232.7,8218,37.49.232.7-0194f405747e0001,rrc21.ripe.net,UPDATE,"[8218, 8220, 14114, 6979, 6979, 6979, 6979, 69...","[[8218, 102], [8218, 20000], [8218, 20110]]",IGP,999.0,6979:192.168.1.177,"[{'next_hop': '37.49.232.7', 'prefixes': ['169...",[]
2,1.739261e+09,2001:7f8:54:5::7,8218,2001:7f8:54:5::7-0194f405747e0002,rrc21.ripe.net,UPDATE,"[8218, 4657, 58621]","[[8218, 102]]",IGP,0.0,58621:10.0.0.33,"[{'next_hop': '2001:7f8:54:5::7,fe80::224:dc00...",[]
3,1.739261e+09,2001:7f8:54:5::1,42064,2001:7f8:54:5::1-0194f405747e0003,rrc21.ripe.net,UPDATE,"[6939, 34927, 214810]","[[42064, 64611], [42064, 64649], [42064, 65011...",IGP,0.0,,"[{'next_hop': '2001:7f8:54:5::13,fe80::4288:2f...",[]
4,1.739261e+09,37.49.232.7,8218,37.49.232.7-0194f405747e0004,rrc21.ripe.net,UPDATE,"[8218, 8220, 14114, 6979, 6979, 6979, 6979, 69...","[[8218, 102], [8218, 20000], [8218, 20113]]",IGP,999.0,6979:192.168.1.177,"[{'next_hop': '37.49.232.7', 'prefixes': ['169...",[]
...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,1.739261e+09,37.49.237.231,35426,37.49.237.231-0194f40578ac0029,rrc21.ripe.net,UPDATE,"[35426, 39801, 5511, 2914, 3223, 55933, 55933]","[[5511, 504], [5511, 666], [5511, 710], [5511,...",IGP,,,"[{'next_hop': '37.49.237.231', 'prefixes': ['4...",[]
96,1.739261e+09,37.49.237.231,35426,37.49.237.231-0194f40578ac002a,rrc21.ripe.net,UPDATE,"[35426, 39801, 5511, 3356, 211181, 208293, 210...","[[5511, 502], [5511, 666], [5511, 710], [5511,...",IGP,,,"[{'next_hop': '37.49.237.231', 'prefixes': ['8...",[]
97,1.739261e+09,37.49.237.231,35426,37.49.237.231-0194f40578ac002b,rrc21.ripe.net,UPDATE,"[35426, 33891, 47692, 44472]","[[33891, 5003], [33891, 9003], [33891, 13003],...",INCOMPLETE,,,"[{'next_hop': '37.49.237.55', 'prefixes': ['86...",[]
98,1.739261e+09,37.49.236.156,15547,37.49.236.156-0194f40578980000,rrc21.ripe.net,UPDATE,"[15547, 58453, 55685, 153076]",[],IGP,,,"[{'next_hop': '37.49.237.116', 'prefixes': ['1...",[]


# Explanation of Each Field
### Field:  	Description of the field
### timestamp:	The Unix timestamp when the BGP update was received.
### peer:	The IP address of the BGP peer that sent the update.
### peer_asn:	The Autonomous System Number (ASN) of the BGP peer.
### id:	A unique identifier for the BGP update message.
### host:	The route collector (e.g., rrc21) that received the update.
### type:	The type of BGP message (UPDATE, ANNOUNCEMENT, or WITHDRAWAL).
### path:	The AS path list, showing the sequence of ASNs the route has traveled.
### community:	The BGP community attributes associated with the route (used for traffic engineering and policy).
### origin:	This specifies the origin of the BGP route (IGP, EGP, or INCOMPLETE).
### announcements: 	The list of new ip prefixes announced in this BGP update.
### withdrawals: 	The list of ip prefixes that were withdrawn in this BGP update.

###  Save the data frame into a CSV file for further analysis in subsequent sessions

In [25]:
df_messages.to_csv("./BGPRiSTrends.csv", index=False)

### Verify whether the csv file was saved in the local directory

In [26]:
import os
print(os.listdir()) 

['.ipynb_checkpoints', 'BGPRiS.csv', 'BGPRiS.ipynb', 'BGPRis2.ipynb', 'BGPRiSTrends.csv', 'BGPRisTrends.ipynb']


# Pre-process the data for visualizing using Plotly

Why This Preprocessing is Useful
Time-Series Analysis to figure out Trends in BGP updates over time.
ASN Contribution Analysis, to  Identify major BGP route announcers.
Anomaly Detection, via monitoring Sudden spikes in withdrawals could indicate BGP attacks.
Feature Engineering for future ML models,  is_weekend, hour, and announcement_count etc. are useful features.

## Main Goals for the Dashboard is as follows:

* #### Track Routing Changes Over Time (Using the Time-Series Chart).
* #### Analyze Autonomous Systen Numbers Contributions (Identify major BGP peering routers).
* #### Detect Network Anomalies (Spike in withdrawals = possible route hijack).


In [32]:
import pandas as pd

# Ensure timestamp is in datetime format
df_messages['datetime'] = pd.to_datetime(df_messages['timestamp'], unit='s')

# Extract useful time-based features
df_messages['hour'] = df_messages['datetime'].dt.hour
df_messages['day'] = df_messages['datetime'].dt.date
df_messages['weekday'] = df_messages['datetime'].dt.weekday
df_messages['is_weekend'] = df_messages['weekday'].apply(lambda x: 1 if x >= 5 else 0)

# Count announcements and withdrawals
df_messages['announcement_count'] = df_messages['announcements'].apply(lambda x: len(x) if isinstance(x, list) else 0)
df_messages['withdrawal_count'] = df_messages['withdrawals'].apply(lambda x: len(x) if isinstance(x, list) else 0)

# Top BGP Peers by ASN
top_peers = df_messages['peer_asn'].value_counts().reset_index()
top_peers.columns = ['peer_asn', 'count']

# Display the processed DataFrame
df_messages.head()



Unnamed: 0,timestamp,peer,peer_asn,id,host,type,path,community,origin,med,aggregator,announcements,withdrawals,datetime,hour,day,weekday,is_weekend,announcement_count,withdrawal_count
0,1739261000.0,2001:7f8:54:5::1,42064,2001:7f8:54:5::1-0194f405747e0000,rrc21.ripe.net,UPDATE,"[6939, 4657, 58621]","[[42064, 64611], [42064, 64649], [42064, 65011...",IGP,0.0,58621:10.0.0.33,"[{'next_hop': '2001:7f8:54:5::13,fe80::4288:2f...",[],2025-02-11 07:59:45.789999962,7,2025-02-11,1,0,1,0
1,1739261000.0,37.49.232.7,8218,37.49.232.7-0194f405747e0001,rrc21.ripe.net,UPDATE,"[8218, 8220, 14114, 6979, 6979, 6979, 6979, 69...","[[8218, 102], [8218, 20000], [8218, 20110]]",IGP,999.0,6979:192.168.1.177,"[{'next_hop': '37.49.232.7', 'prefixes': ['169...",[],2025-02-11 07:59:45.789999962,7,2025-02-11,1,0,1,0
2,1739261000.0,2001:7f8:54:5::7,8218,2001:7f8:54:5::7-0194f405747e0002,rrc21.ripe.net,UPDATE,"[8218, 4657, 58621]","[[8218, 102]]",IGP,0.0,58621:10.0.0.33,"[{'next_hop': '2001:7f8:54:5::7,fe80::224:dc00...",[],2025-02-11 07:59:45.789999962,7,2025-02-11,1,0,1,0
3,1739261000.0,2001:7f8:54:5::1,42064,2001:7f8:54:5::1-0194f405747e0003,rrc21.ripe.net,UPDATE,"[6939, 34927, 214810]","[[42064, 64611], [42064, 64649], [42064, 65011...",IGP,0.0,,"[{'next_hop': '2001:7f8:54:5::13,fe80::4288:2f...",[],2025-02-11 07:59:45.789999962,7,2025-02-11,1,0,1,0
4,1739261000.0,37.49.232.7,8218,37.49.232.7-0194f405747e0004,rrc21.ripe.net,UPDATE,"[8218, 8220, 14114, 6979, 6979, 6979, 6979, 69...","[[8218, 102], [8218, 20000], [8218, 20113]]",IGP,999.0,6979:192.168.1.177,"[{'next_hop': '37.49.232.7', 'prefixes': ['169...",[],2025-02-11 07:59:45.789999962,7,2025-02-11,1,0,1,0


## Interactive Dashboard for BGP Data using Plotly Dash

* ### Time-series plots of announcements & withdrawals of network prefixes
* ### Top Autonomous System Numbers (networks) contributing to the traffic present in the BGP feed.
* ### BGP Routes/Routing Updates Over Time.

In [33]:
import dash
from dash import dcc, html
import plotly.express as px

In [34]:
# Initialize Dash app
app = dash.Dash(__name__)

# Create Figures
fig_time_series = px.line(
    df_messages.groupby('datetime')[['announcement_count', 'withdrawal_count']].sum().reset_index(),
    x='datetime', y=['announcement_count', 'withdrawal_count'],
    title="BGP Announcements & Withdrawals Over Time",
    labels={'value': 'Count', 'datetime': 'Time'},
)

fig_asn_distribution = px.bar(
    top_peers.head(10),
    x='peer_asn', y='count',
    title="Top 10 BGP Peers (ASN Contribution)",
    labels={'peer_asn': 'ASN', 'count': 'Message Count'},
    color='count',
)

# Layout for Dash App
app.layout = html.Div(children=[
    html.H1("BGP Live Data Dashboard", style={'textAlign': 'center'}),
    
    dcc.Graph(id='time-series-plot', figure=fig_time_series),
    
    dcc.Graph(id='asn-distribution', figure=fig_asn_distribution),
])

# Run the app
if __name__ == '__main__':
    #app.run_server(debug=True, port=8050)
    app.run(jupyter_mode="external")


Dash app running on http://127.0.0.1:8050/
