In [53]:
import sys
import zmq
import json
import pandas as pd
import os
from pandas_gbq import read_gbq
import plotly.express as px
import plotly.offline as pyo
import plotly.io as pio

pio.renderers.default = 'notebook'  # or 'jupyterlab' if using JupyterLab


## Libraries

In [57]:
NODE_DOMAIN_MAP = {
	"GABMKJM6I25XI4K7U6XWMULOUQIQ27BCTMLS6BYYSOWKTBUXVRJSXHYQ": "Stellar Development Foundation",
	"GCGB2S2KGYARPVIA37HYZXVRM2YZUEXA6S33ZU5BUDC6THSB62LZSTYH": "Stellar Development Foundation",
	"GCM6QMP3DLRPTAZW2UZPCPX2LF3SXWXKPMP3GKFZBDSF3QZGV2G5QSTK": "Stellar Development Foundation",
	"GAK6Z5UVGUVSEK6PEOCAYJISTT5EJBB34PN3NOLEQG2SUKXRVV2F6HZY": "SatoshiPay",
	"GBJQUIXUO4XSNPAUT6ODLZUJRV2NPXYASKUBY4G5MYP3M47PCVI55MNT": "SatoshiPay",
	"GC5SXLNAM3C4NMGK2PXK4R34B5GNZ47FYQ24ZIBFDFOCU6D4KBN4POAE": "SatoshiPay",
	"GCFONE23AB7Y6C5YZOMKUKGETPIAJA4QOYLS5VNS4JHBGKRZCPYHDLW7": "LOBSTR",
	"GCB2VSADESRV2DDTIVTFLBDI562K6KE3KMKILBHUHUWFXCUBHGQDI7VL": "LOBSTR",
	"GD5QWEVV4GZZTQP46BRXV5CUMMMLP4JTGFD7FWYJJWRL54CELY6JGQ63": "LOBSTR",
	"GA7TEPCBDQKI7JQLQ34ZURRMK44DVYCIGVXQQWNSWAEQR6KB4FMCBT7J": "LOBSTR",
	"GA5STBMV6QDXFDGD62MEHLLHZTPDI77U3PFOD2SELU5RJDHQWBR5NNK7": "LOBSTR",
	"GAAV2GCVFLNN522ORUYFV33E76VPC22E72S75AQ6MBR5V45Z5DWVPWEU": "Blockdaemon Inc.",
	"GAVXB7SBJRYHSG6KSQHY74N7JAFRL4PFVZCNWW2ARI6ZEKNBJSMSKW7C": "Blockdaemon Inc.",
	"GAYXZ4PZ7P6QOX7EBHPIZXNWY4KCOBYWJCA4WKWRKC7XIUS3UJPT6EZ4": "Blockdaemon Inc.",
	"GBLJNN3AVZZPG2FYAYTYQKECNWTQYYUUY2KVFN2OUKZKBULXIXBZ4FCT": "Public Node",
	"GCIXVKNFPKWVMKJKVK2V4NK7D4TC6W3BUMXSIJ365QUAXWBRPPJXIR2Z": "Public Node",
	"GCVJ4Z6TI6Z2SOGENSPXDQ2U4RKH3CNQKYUHNSSPYFPNWTLGS6EBH7I2": "Public Node",
	"GA7DV63PBUUWNUFAF4GAZVXU2OZMYRATDLKTC7VTCG7AU4XUPN5VRX4A": "Franklin Templeton",
	"GARYGQ5F2IJEBCZJCBNPWNWVDOFK7IBOHLJKKSG2TMHDQKEEC6P4PE4V": "Franklin Templeton",
	"GCMSM2VFZGRPTZKPH5OABHGH4F3AVS6XTNJXDGCZ3MKCOSUBH3FL6DOB": "Franklin Templeton",
	"GD6SZQV3WEJUH352NTVLKEV2JM2RH266VPEM7EH5QLLI7ZZAALMLNUVN": "Whalestack LLC",
	"GADLA6BJK6VK33EM2IDQM37L5KGVCY5MSHSHVJA4SCNGNUIEOTCR6J5T": "Whalestack LLC",
	"GAZ437J46SCFPZEDLVGDMKZPLFO77XJ4QVAURSJVRZK2T5S7XUFHXI2Z": "Whalestack LLC",
}

def get_longest_leaders(df, filter_last_five_min=True):
    # Step 1: Filter last five minute worth of data if arg set to true
    if filter_last_five_min:
        current_time = pd.Timestamp.now()
        five_minutes_ago = current_time - pd.Timedelta(minutes=5)
        filtered_df = df[df['close_at'] >= five_minutes_ago]
    else:
        filtered_df = df

    # Step 2: Assign row numbers based on closed_at
    filtered_df = filtered_df.sort_values('close_at')
    filtered_df['rn'] = range(1, len(filtered_df) + 1)

    # Step 3: Create a grouping identifier (grp)
    filtered_df['grp'] = filtered_df['rn'] - filtered_df.groupby('node_id')['rn'].transform(lambda x: x.rank(method='first'))

    # Step 4: Get start and end time for each group
    windowed_data = filtered_df.groupby(['node_id', 'grp']).agg(start_time=('close_at', 'min'),
                                                           end_time=('close_at', 'max')).reset_index()

    # Step 5: Calculate continuous_time and validator_frequency
    windowed_data['continuous_time'] = (windowed_data['end_time'] - windowed_data['start_time']).dt.total_seconds()
    result = windowed_data.groupby(['continuous_time', 'node_id']).size().reset_index(name='validator_frequency')

    # Step 6: Sort the results
    result = result.sort_values(by='continuous_time', ascending=False)
    result['home_domain'] = result['node_id'].map(NODE_DOMAIN_MAP)
    return result

def get_ledger_close_count_by_home_domain(df, filter_last_five_min=True):
    if filter_last_five_min:
        current_time = pd.Timestamp.now()
        five_minutes_ago = current_time - pd.Timedelta(minutes=5)
        filtered_df = df[df['close_at'] >= five_minutes_ago]
    else:
        filtered_df = df

    mapped_df = filtered_df.copy()
    mapped_df['home_domain'] = filtered_df['node_id'].map(NODE_DOMAIN_MAP)
    if filter_last_five_min:
        mapped_df['minute'] = mapped_df['close_at'].dt.floor('min')
        mapped_df['minute'] = mapped_df['minute'].dt.strftime('%Y-%m-%d %H:%M')
        result_df = mapped_df.groupby(['home_domain', 'minute']).size().reset_index(name='count')
    else:
        mapped_df['day'] = mapped_df['close_at'].dt.date
        result_df = mapped_df.groupby(['home_domain', 'day']).size().reset_index(name='count')
    return result_df

def write_plot_to_html(fig, filename, append=False):
    html_string = pyo.plot(fig, include_plotlyjs='cdn', output_type='div')
    
    if os.path.exists(f"{filename}.html") and append == True:
        with open(f"{filename}.html", "a") as f:
            f.write(html_string)
    else:
        with open(f"{filename}.html", "w") as f:
            f.write(html_string)

def plot_continuous_leader_chart(df, title, filename):
    fig = px.bar(
        df,
        x='continuous_time',
        y='validator_frequency',
        color='node_id',
        title=title,
        labels={'continuous_time': 'Continuous Time (seconds)', 'validator_frequency': 'Frequency'},
        text='home_domain'
    )

    # Update layout for grouping
    fig.update_layout(barmode='group')

    write_plot_to_html(fig, filename)

def plot_validator_frequency_chart(df, title, filename, x='', y='',labels={}, append=True):
    fig = px.bar(
        df,
        x=x,
        y=y,
        color='home_domain',
        title=title,
        labels=labels,
        barmode='stack'
    )

    if x == "minute":
        fig.update_xaxes(tickmode='array', tickvals=df['minute'], ticktext=df['minute'])
    else:
        fig.update_xaxes(tickmode='array', tickvals=df['day'], ticktext=df['day'])

    write_plot_to_html(fig, filename, append=True)

## Data Sources

In [58]:
def get_full_data_from_hubble():
    nodes = NODE_DOMAIN_MAP.keys()
    nodes_string = ",".join(f"'{item}'" for item in nodes)
    query = f"""
      SELECT
        hl.node_id as node_id,
        hl.closed_at AS close_at
      FROM crypto-stellar.crypto_stellar.history_ledgers AS hl
      WHERE hl.closed_at BETWEEN '2024-01-01 00:00:00 UTC' AND '2025-01-01 00:00:00 UTC'
      AND hl.node_id in ({nodes_string})
    """
    full_df = read_gbq(query, project_id='crypto-stellar')
    full_df['close_at'] = full_df['close_at'].dt.tz_localize(None)
    full_df['close_at'] = pd.to_datetime(full_df['close_at'])
    return full_df

def get_full_data_from_CSV_files():
    ## TODO
    pass

def get_live_data_socket_from_stream():
    #  Socket to talk to server
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    
    print("Collecting validator info from pipeline ...")
    socket.connect("tcp://127.0.0.1:5555")
    socket.subscribe("")
    return socket

## Full history Analytics

In [25]:
full_df = get_full_data_from_hubble() ## TODO: Replace with CSV

longest_leaders_result_df = get_longest_leaders(full_df, filter_last_five_min=False)
plot_chart(longest_leaders_result_df[longest_leaders_result_df["continuous_time"] >= 20], title="Nodes leading for greater than equal to 20 seconds", filename="full_history")

validator_count_by_home_domain_df = get_ledger_close_count_by_home_domain(full_df, filter_last_five_min=False)
plot_validator_frequency_chart(validator_count_by_home_domain_df, title="Validator count by day", filename="full_history", x="day", y="count",labels={"day": "Timestamp by day", "count": "Closing validator count"})

Downloading: 100%|[32m█████████████████████████████████████████████████████████████████[0m|[0m


## Live data Analytics

In [None]:
cur_data_vals = []
socket = get_live_data_socket_from_stream()
while True:
    message = socket.recv()
    json_object = json.loads(message)
    json_formatted_str = json.dumps(json_object, indent=2)
    print(f"Validator info:\n\n{json_formatted_str}")

    cur_data_vals.append(json_object)
    cur_df = pd.DataFrame(cur_data_vals)
    cur_df.rename(columns={'close_time': 'close_at'}, inplace=True)
    cur_df['close_at'] = pd.to_datetime(cur_df['close_at'], unit='s')

    cur_res_df = get_longest_leaders(cur_df)
    plot_chart(cur_res_df, title="Nodes leading for last 5 minutes", filename="live_data")

    cur_res_df = get_ledger_close_count_by_home_domain(cur_df)
    plot_validator_frequency_chart(cur_res_df, title="Validator count by minute", filename="live_data", x="minute", y="count",labels={"minute": "Timestamp by minutes", "count": "Closing validator count"})
    print("Update live data chart")


Collecting validator info from pipeline ...
Validator info:

{
  "sequence_number": 54051454,
  "node_id": "GBJQUIXUO4XSNPAUT6ODLZUJRV2NPXYASKUBY4G5MYP3M47PCVI55MNT",
  "signature": "+yTl+k0cFX1qN9ThkdY5XYLVyMLSMudiiJFJElyX2x2GZtjG2r6HASuCHXB9kR7RAYaZh+AP+ypI4esC+ij4Cg==",
  "name": "SatoshiPay",
  "close_time": 1729545928,
  "operations": {
    "total": 670,
    "categories": {
      "Account Creation": 1,
      "Claimable Balances": 96,
      "Offers and AMMs": 200,
      "Other": 7,
      "Payments": 335,
      "Sponsorship": 2,
      "Trust": 29
    }
  },
  "network": "Public Global Stellar Network ; September 2015"
}
Update live data chart
Validator info:

{
  "sequence_number": 54051455,
  "node_id": "GADLA6BJK6VK33EM2IDQM37L5KGVCY5MSHSHVJA4SCNGNUIEOTCR6J5T",
  "signature": "ZLnNHgADsNZjL7KwYgEg0tISsG6nB9KLAEsBiBXfvEZOFx15mN+TW0/AlAdSS9uigJt3e5f+1YnQeTEFFs1pCQ==",
  "name": "Whalestack LLC",
  "close_time": 1729545934,
  "operations": {
    "total": 961,
    "categories": {
    