# Imports

In [1]:
import os
import polars as pl

import datetime
from datetime import date, time, timedelta, datetime

import re
import pickle
import json

import pandas_market_calendars as mcal

import importlib
import config
importlib.reload(config)

# import time
from time import sleep

from IPython.display import display, Markdown

from bokeh.plotting import figure, show
from bokeh.models import ColumnDataSource, CustomJSTickFormatter, DatetimeTickFormatter, DatetimeTicker, HoverTool,CustomJSHover, LinearAxis, Range1d
from bokeh.io import output_notebook
from bokeh.io import output_file, save, show, reset_output

# Function Definitions

In [2]:
# Extract the date from a filename where the date is in the format YYYYMMDD convert to datetime
def extract_date_from_filename_datetime(filename):
    date = re.search(r'\d{8}', filename)
    if date:
        return datetime.strptime(date.group(0), '%Y%m%d')
    return None

# Convert an integer representing a time in the format HHMMSS to a time object
def int_to_time(time_int):
    time_str = str(time_int).zfill(6)
    hour = int(time_str[:2])
    minute = int(time_str[2:4])
    second = int(time_str[4:])
    return time(hour=hour, minute=minute, second=second)

# Convert and individual HFT1.csv file to a DataFrame using a threshold on the 'NumQuotes' column
def make_df_from_high_quote_count(file_path, threshold):
    temp_date = extract_date_from_filename_datetime(file_path)    
    # Read the CSV file
    df = pl.read_csv(file_path, has_header=False)    
    # Rename the columns
    df = df.rename({
        'column_1': 'Symbol',
        'column_2': 'T1',
        'column_3': 'Event_Date_Time',
        'column_4': 'NumQuotes',
        'column_5': 'BBO',
        'column_6': 'NumTrades',
        'column_7': 'Dollars',
        'column_8': 'T3'
    })
    
    # # Apply transformations
    # df = df.with_columns(pl.col('Event_Date_Time').map_elements(int_to_time, return_dtype=pl.Time).alias('Event_Date_Time'))
    # df = df.with_columns(pl.col('Event_Date_Time').map_elements(lambda x: datetime.combine(temp_date, x), return_dtype=pl.Datetime).alias('Event_Date_Time')) 

        # Apply transformations
    df = df.with_columns(pl.col('Event_Date_Time').map_elements(int_to_time, return_dtype=pl.Time).alias('Event_Date_Time'))
    df = df.with_columns(pl.col('Event_Date_Time').map_elements(lambda x: datetime.combine(temp_date, x), return_dtype=pl.Datetime).alias('Event_Date_Time'))    

    # Drop rows with NaN in 'Dollars' column
    df = df.drop_nulls(subset=['Dollars'])   
    # Filter rows where 'NumQuotes' is greater than the threshold
    df = df.filter(pl.col('NumQuotes') > threshold)  
    # Calculate 'Trade_Size'
    df = df.with_columns((pl.col('Dollars') / pl.col('NumTrades')).alias('Trade_Size'))   
    # Sort by 'Event_Date_Time'
    df = df.sort('Event_Date_Time')
    return df

def load_data(file_path):
    if os.path.exists(file_path):
        return pl.read_parquet(file_path)
    return None

def save_data(df, file_path):
    df.write_parquet(file_path)

# def get_price_data_for_symbol(client, symbol, start_date, end_date):
#     # print(f"Getting price data for symbol: {symbol} from {start_date} to {end_date}")
#     resp = client.get_price_history(
#         symbol=symbol,
#         period_type=client.PriceHistory.PeriodType.DAY,
#         period=client.PriceHistory.Period.ONE_DAY,
#         frequency_type=client.PriceHistory.FrequencyType.MINUTE,
#         frequency=client.PriceHistory.Frequency.EVERY_FIVE_MINUTES,
#         start_datetime=start_date,
#         end_datetime=end_date,
#         need_previous_close='True'
#     )

#     assert resp.status_code == httpx.codes.OK
#     history = resp.json()
#     candles = history.get('candles', [])

#     # Convert `datetime` values and write back into the `candles` list of dictionaries
#     for candle in candles:
#         dt_object = datetime.fromtimestamp(candle['datetime'] / 1000)
#         # candle['Event_Date_Time'] = dt_object.isoformat()
#         candle['Event_Date_Time'] = dt_object
#         candle['Symbol'] = symbol  # Add symbol to each candle

#     # Create a polars DataFrame from the candles list
#     price_data_df = pl.DataFrame(candles)
#     # return price_data_df

def update_burst_data_df(concatenated_df ,directory_path, threshold, start_date):
    # Set path to store processed dataframe
    file_path = os.path.join(directory_path, 'burst_data_processed.parquet')
    #List all files in the directory
    files = os.listdir(directory_path)
    # Filter the list to include only files that end with '.csv' and contain '_hft1' in the filename
    filtered_files = [f for f in files if f.endswith('.csv') and '_hft1' in f]
    # Find files after the data_start_date but do not process today's file
    # Get today's date
    today_date = datetime.today().strftime('%Y%m%d')
    selected_files = []
    for f in filtered_files:
        try:
            file_date_str = f.split('_')[0]
            file_date = datetime.strptime(file_date_str, '%Y%m%d')
            if file_date_str >= start_date and file_date != today_date:
                selected_files.append(f)
        except ValueError:
            # Skip files that do not match the expected date format
            continue

    # Loop through selected files in the directory
    for filename in selected_files:
        # Generate a dataframe from the CSV file using the make_df_from_high_quote_count function
        df = make_df_from_high_quote_count(os.path.join(directory_path, filename), threshold)
        if df.height > 0:
            # Concatenate the generated dataframe with the existing concatenated dataframe
            concatenated_df = pl.concat([concatenated_df, df])
    
    save_data(concatenated_df, file_path)
    return concatenated_df

# def update_price_data_df(all_price_data_df, directory_path, burst_dataframe, client, start_date, end_date):
#     # Set path to store processed dataframe
#     file_path = os.path.join(directory_path, 'price_data_processed.parquet')

#     # Extract a list of unique symbols from the burst_data_df
#     unique_symbols = burst_dataframe.select(pl.col("Symbol").unique()).to_series().to_list()
#     # Extract the unique symbols from all_price_data_df
#     existing_symbols = all_price_data_df.select(pl.col("Symbol").unique()).to_series().to_list()

#         # Check if all_price_data_df is empty
#     if all_price_data_df.is_empty():
#         # If empty, fetch price data for all symbols from start_date to end_date
#         for symbol in unique_symbols:
#             symbol_price_data_df = get_price_data_for_symbol(client, symbol, start_date, end_date)
#             if symbol_price_data_df.height != 0:
#                 all_price_data_df = pl.concat([all_price_data_df, symbol_price_data_df])
#     else:
#        # For all the symbols that we already have price dat for check to see if we have the latest data for the symbol. If we do not have the latest data for the symbol then update the price data for the symbol
#         max_dates_df = all_price_data_df.group_by("Symbol").agg(pl.col("Event_Date_Time").max().alias("Max_Event_Date_Time"))
#         update_target_df = max_dates_df.filter(pl.col("Max_Event_Date_Time") < end_date)
#         update_list = update_target_df.select(pl.col("Symbol")).to_series().to_list()
#         print(f'update_list length {len(update_list)}')
#         # Add symbols that are not in the filtered DataFrame (i.e., new symbols)
#         new_symbols = [symbol for symbol in unique_symbols if symbol not in existing_symbols]
#         print(f'new update_symbols length {len(new_symbols)}')
#         new_symbols = [symbol for symbol in unique_symbols if symbol not in existing_symbols]
#         update_list.extend(new_symbols)
#         print(f'total update_list length {len(update_list)}')
#         i=len(update_list)
#         #Process each unique symbol by checking if the symbol is already in the price data dataframe and if so updateing the price data for that symbol fro current end date of data to end_date
#         for symbol in update_list:
#             i=i-1
#             print(f'Currentlly processing symbol {symbol} remaining symbols {i}')
#             #Filter the price data dataframe for the symbol
#             symbol_price_data_df = all_price_data_df.filter(pl.col("Symbol") == symbol)
#             #If the symbol is not in the price data dataframe then get the price data for the symbol from the current end date of the data to the end date
#             if symbol_price_data_df.height == 0:
#                 print(f"Getting price data for new symbol: {symbol} from {start_date} to {end_date}")
#                 symbol_price_data_df = get_price_data_for_symbol(client, symbol, start_date, end_date)
#             #If the symbol is in the price data dataframe then get the price data for the symbol from the current end date of the data to the end date
#             else:
#                 symbol_start_date = symbol_price_data_df['Event_Date_Time'].max()
#                 print(f"{symbol} Symbol start date: {symbol_start_date} type: {type(symbol_start_date)} end_date: {end_date} type: {type(end_date)} remaining symbols: {i}")
#                 # symbol_start_date = datetime.strptime(symbol_start_date, '%Y-%m-%dT%H:%M:%S')
#                 # print(f"Symbol start date (after): {symbol_start_date} type: {type(symbol_start_date)}")
                
#                 # Check if symbol_start_date is less than end_date
#                 if symbol_start_date < end_date:
#                     print(f"Getting price data for symbol: {symbol} from {symbol_start_date} to {end_date}")
#                     symbol_price_data_df = get_price_data_for_symbol(client, symbol, symbol_start_date, end_date)
                
#             #Concatenate the symbol price data to the all price data dataframe
#             if symbol_price_data_df.height != 0:
#                 all_price_data_df = pl.concat([all_price_data_df, symbol_price_data_df])

#     save_data(all_price_data_df, file_path) 
#     return all_price_data_df

def generate_alert_strings(full_dataframe,todays_dataframe):
    # This function returns a message associated with the first occurrence of each symbol in the DataFrame.

    # Get the unique symbols in the DataFrame
    # symbols = todays_dataframe['Symbol'].unique()

        # Group by Symbol and get the first Event_Date_Time for each symbol
    first_appearance = todays_dataframe.group_by('Symbol').agg(pl.col('Event_Date_Time').min().alias('First_Appearance'))

    # Sort by Event_Date_Time
    sorted_symbols = first_appearance.sort('First_Appearance')['Symbol'].to_list()

    print(sorted_symbols)
    
    # Initialize an empty list to store the alert strings
    alert_strings = []
    
    # Iterate through each symbol in Todays DataFrame
    for symbol in sorted_symbols:
    # Filter the rows for the current symbol
        todays_symbol_rows = todays_dataframe.filter(pl.col('Symbol') == symbol)
        
        # Ensure that there is at least one row for the current symbol
        if todays_symbol_rows.height > 0:
            num_quotes = todays_symbol_rows.select(pl.col('NumQuotes')).to_series().item(0)
            num_trades = todays_symbol_rows.select(pl.col('NumTrades')).to_series().item(0)
            dollars = todays_symbol_rows.select(pl.col('Dollars')).to_series().item(0)
        else:
            raise ValueError(f"No rows found for symbol {symbol}")
        
        # Get the event date and time from the first row for the current symbol
        event_date_time = todays_symbol_rows.head(1).select(pl.col('Event_Date_Time')).item()

        all_symbol_rows = full_dataframe.filter(pl.col('Symbol') == symbol)
        
        # Find the previous occurrence of the symbol in the full_dataframe
        prev_event_date_time = None

        if all_symbol_rows.height > 0:
            prev_event_date_time = all_symbol_rows.tail(1).select(pl.col('Event_Date_Time')).to_series().item()
        
        # Calculate the interval between the current and previous occurrences
        if prev_event_date_time is not None:
            interval = (event_date_time - prev_event_date_time).days+1
        else:
            interval = None
        
        # Get the trade size from the first row of todays_symbol_rows
        trade_size = todays_symbol_rows.head(1).select(pl.col('Dollars')).to_series().item()
        # Count the number of appearances in all_symbol_rows
        n_appearances = all_symbol_rows.select(pl.col('Dollars')).count().item() + 1
        # Count the number of rows in all_symbol_rows where 'Dollars' is less than trade_size
        trade_size_rank = n_appearances - all_symbol_rows.filter(pl.col('Dollars') < trade_size).select(pl.col('Dollars')).count().item()

        
        # Generate the alert string for the current symbol
        if interval is not None:
            alert_string = f"HQC on ${symbol} with {num_quotes} quote changes resulting in {num_trades} trades  @ {event_date_time}. Last appeared on {prev_event_date_time}, {interval} days ago. Trade size is ${trade_size:,.2f} @ rank {trade_size_rank} for Size out of {n_appearances} occurrences."
        else:
            alert_string = f"HQC on ${symbol} with {num_quotes} quote changes resulting in {num_trades} trades  @ {event_date_time}. This is the first appearance of symbol. Trade size is ${trade_size:,.2f} @ rank {trade_size_rank} for Size out of {n_appearances} occurrences."
        
        # Append the alert string to the list of alert strings
        alert_strings.append(alert_string)
    
    # Return the list of alert strings
    return alert_strings,sorted_symbols

def split_message(message, max_length=140):
    if len(message) <= max_length:
        return [message]
    
    result = []
    current_piece = ''
    
    for word in message.split():
        if len(current_piece + word) + 1 > max_length:
            result.append(current_piece.strip())
            current_piece = ''
        
        current_piece += f'{word} '
    
    if current_piece:
        result.append(current_piece.strip())
    
    return result

In [19]:
print(f"Current working directory: {os.getcwd()}")

today_str = datetime.today().strftime('%Y%m%d')
print(f"Today's date string: {today_str}")

# make today-str year/month/day format
today_str1 = datetime.today().strftime('%Y/%m/')
print(f"Today's date string in year/month/day format: {today_str1}{today_str}")

csv_path = f"./Code/Data/{today_str}_hft1.csv"
print(f"Today's csv_path: {csv_path}")
parquet_path = f"./Processed_Data/{today_str1}{today_str}_hft1.parquet"
print(f"Today's parquet_path: {parquet_path}")
last_size_path = f"./Code/Data/{today_str}_hft1_lastsize.pkl"

Current working directory: c:\Users\vande\Desktop\Projects\Ki2_Alerts\Code
Today's date string: 20250725
Today's date string in year/month/day format: 2025/07/20250725
Today's csv_path: ./Code/Data/20250725_hft1.csv
Today's parquet_path: ./Processed_Data/2025/07/20250725_hft1.parquet


In [27]:
df=load_data(parquet_path)
#filter for NUmquotes greater that 3500
if df is None:
    df = pl.DataFrame() 
else:
    df = df.filter(pl.col('NumQuotes') > 3500)
#     #sort top 5 by dollars
#     # df = df.sort('Dollars', descending=True).head(5)
#     # sort by uniques symbols with most occurances
#     # df = df.group_by('Symbol').agg(pl.col('Dollars').sum().alias('Total_Dollars')).sort('Total_Dollars', descending=True).head(5)   
df

# df1=load_data('./Processed_Data/burst_data_processed.parquet')
# df1

generate_alert_strings(df1, df)

['INTC', 'PHIO', 'QQQ', 'IWM', 'QS', 'ASTS', 'QBTS', 'IOVA', 'MU', 'OPEN', 'FLG', 'IREN', 'ACHR', 'CMCSA', 'APA', 'SPY', 'NVDA', 'FAST', 'DHI', 'KMI', 'XLK', 'SOXL', 'ATAI', 'AAPL', 'XLI', 'TQQQ', 'PTC', 'IVZ', 'LBRT', 'YINN', 'IGV', 'LIDR', 'NEM', 'MBLY', 'POR', 'BBWI', 'OKLO', 'USO', 'XLE', 'UCO', 'UPST', 'SMCI', 'ENVX', 'HIMS', 'HAL', 'MARA', 'BILI', 'CSX', 'GLXY', 'STZ', 'STAA', 'MSFT', 'OPCH', 'BKR', 'RCL', 'PRTA', 'NVDL', 'XLY']


(['HQC on $INTC with 3702 quote changes resulting in 1353 trades  @ 2025-07-25 07:00:00. Last appeared on 2025-07-24 16:05:33, 1 days ago. Trade size is $4,202,784.00 @ rank 2196 for Size out of 24210 occurrences.',
  'HQC on $PHIO with 4192 quote changes resulting in 962 trades  @ 2025-07-25 07:57:00. Last appeared on 2025-05-14 12:34:58, 72 days ago. Trade size is $467,537.00 @ rank 79 for Size out of 213 occurrences.',
  'HQC on $QQQ with 4947 quote changes resulting in 316 trades  @ 2025-07-25 09:30:00. Last appeared on 2025-07-24 16:01:00, 1 days ago. Trade size is $64,284,132.00 @ rank 935 for Size out of 2156456 occurrences.',
  'HQC on $IWM with 4854 quote changes resulting in 508 trades  @ 2025-07-25 09:30:00. Last appeared on 2025-07-24 16:00:00, 1 days ago. Trade size is $27,099,153.00 @ rank 888 for Size out of 508917 occurrences.',
  'HQC on $QS with 3772 quote changes resulting in 1569 trades  @ 2025-07-25 09:30:06. Last appeared on 2025-07-24 15:57:34, 1 days ago. Trade 

In [25]:
df


Symbol,T1,Event_Date_Time,NumQuotes,BBO,NumTrades,Dollars,T3,Trade_Size
str,str,datetime[μs],i64,i64,i64,f64,str,f64
"""INTC""","""0""",2025-07-25 07:00:00,3702,1421,1353,4.202784e6,,3106.27051
"""PHIO""","""0""",2025-07-25 07:57:00,4192,1538,962,467537.0,,486.005198
"""PHIO""","""0""",2025-07-25 07:57:01,3624,1264,778,303421.0,,390.001285
"""PHIO""","""0""",2025-07-25 07:57:29,3543,1369,894,502219.0,,561.766219
"""QQQ""","""0""",2025-07-25 09:30:00,4947,940,316,6.4284132e7,,203430.797468
…,…,…,…,…,…,…,…,…
"""LIDR""","""0""",2025-07-25 11:15:19,3549,1359,697,967068.0,,1387.472023
"""NEM""","""0""",2025-07-25 11:15:40,4991,1026,1175,5.291722e6,,4503.593191
"""NVDA""","""0""",2025-07-25 11:17:00,3833,1001,894,1.510261e7,,16893.299776
"""LIDR""","""0""",2025-07-25 11:27:47,4075,1304,999,1.317375e6,,1318.693694


In [None]:
def monitor_hft1_stream(most_recent_file,output_file,topic,client,last_size):
    global current_time,formatted_date,c
    # Load the last size from a pickle file if it exists
    progress_file= f'./Processed_data/{formatted_date}_HFT1_last_size.pkl'
    if os.path.exists(progress_file):
        with open(progress_file, 'rb') as f:
            last_size = pickle.load(f)
            #print(f"Last Size: {last_size}")

    # Check if the file has changed
    current_size = os.stat(most_recent_file).st_size
    min_time=None
    row_dict = {}

    if current_size != last_size:
        current_time = datetime.now()
        # The file has changed, open it and read from the point where we left off
        with open(most_recent_file, 'r') as f:
            # Seek to the position of the last read
            f.seek(last_size)
            # Read the new lines
            new_lines = f.readlines()

        # Process the new lines into JSON and write them to a new file
        with open(output_file, 'a') as f1:
            for line in csv.reader(new_lines):
                # Create a dictionary for each line
                row_dict = dict(zip(['Symbol', 'T1', 'Event_Date_Time', 'NumQuotes', 'BBO', 'NumTrades', 'Dollars', 'T3'], line))  
                row_dict['Trade_Size'] = None
                # Convert 'Event_Date_Time' from int to time
                row_dict['Event_Date_Time'] = int_to_time(row_dict['Event_Date_Time'])
                # Combine 'Event_Date_Time' with temp_date
                temp_date = extract_date_from_filename_datetime(most_recent_file)
                row_dict['Event_Date_Time'] = datetime.combine(temp_date, row_dict['Event_Date_Time'])
                # Convert 'Event_Date_Time' to a string in ISO 8601 format
                row_dict['Event_Date_Time'] = row_dict['Event_Date_Time'].isoformat()

                if min_time is None:
                    min_time = datetime.fromisoformat(row_dict['Event_Date_Time'])

                try:
                    r = c.get_quote(row_dict['Symbol'] ,fields=[c.Quote.Fields.QUOTE,c.Quote.Fields.REGULAR,c.Quote.Fields.EXTENDED])
                    assert r.status_code == 200, r.raise_for_status()
                    new_data = r.json()
                    # print(new_data)
                    row_dict['Price'] = new_data[row_dict['Symbol']]['regular']['regularMarketLastPrice']
                    row_dict['Size'] = new_data[row_dict['Symbol']]['regular']['regularMarketLastSize']
                    row_dict['Volume'] = new_data[row_dict['Symbol']]['quote']['totalVolume']

                except Exception as e:
                    print(f"An error occurred: {e}")
                    break

                try:
                    row_dict['Trade_Size'] = float(row_dict['Dollars']) / float(row_dict['NumTrades'])
                except ZeroDivisionError:
                    row_dict['Trade_Size'] = 0
                                
                # Convert the dictionary to a JSON string
                row_dict_str = json.dumps(row_dict)
                                
                # Write the dictionary as a JSON object to the file
                f1.write(row_dict_str+ '\n')
                client.publish("hft1_data", row_dict_str)
            
        # Update the last size
        last_size = current_size
        # Save the last size to a pickle file
        with open(progress_file, 'wb') as f:
            pickle.dump(last_size, f)

        if 'Event_Date_Time' not in row_dict:
            return  last_size

        max_time = datetime.fromisoformat(row_dict['Event_Date_Time'])
        # Calculate the time difference
        time_diff = max_time - min_time
        # Get the number of lines
        num_lines = len(new_lines)
        try:
            eps = num_lines / time_diff.total_seconds()
        except ZeroDivisionError:
            eps = 0
        min_time=None  

        client.publish(topic, f"{eps}")

    # Check if 120 seconds has elapsed since the last time current_time was updated
    if datetime.now() - current_time >= timedelta(seconds=120):
        print("120 seconds has elapsed since the last update of current_time.")
        current_time = datetime.now()  # Update current_time

    return last_size

In [None]:
import polars as pl
import os
import io
import pickle
from datetime import datetime

def extract_date_from_filename_datetime(filename):
    import re
    date = re.search(r'\d{8}', filename)
    if date:
        return datetime.strptime(date.group(0), '%Y%m%d')
    return None

def int_to_time(time_int):
    time_str = str(time_int).zfill(6)
    hour = int(time_str[:2])
    minute = int(time_str[2:4])
    second = int(time_str[4:])
    from datetime import time
    return time(hour=hour, minute=minute, second=second)

def monitor_and_update_hft1_parquet(csv_path, parquet_path, last_size_path, schema):
    # Load last read size
    last_size = 0
    if os.path.exists(last_size_path):
        with open(last_size_path, 'rb') as f:
            last_size = pickle.load(f)

    # Get current file size
    current_size = os.stat(csv_path).st_size

    # If no new data, just return the current parquet DataFrame (if exists)
    if current_size == last_size and os.path.exists(parquet_path):
        return pl.read_parquet(parquet_path)

    # Read only new lines
    with open(csv_path, 'r') as f:
        f.seek(last_size)
        new_lines = f.readlines()

    # If no new lines, return current parquet DataFrame
    if not new_lines:
        if os.path.exists(parquet_path):
            return pl.read_parquet(parquet_path)
        else:
            return pl.DataFrame(schema=schema)

    # Create DataFrame from new lines
    new_df = pl.read_csv(
        source=io.StringIO(''.join(new_lines)),  # <-- fix here
        has_header=False,
        schema=schema
    )

    # Extract date from filename
    temp_date = extract_date_from_filename_datetime(csv_path)

    # Convert Event_Date_Time from int to time, then to datetime, then to ISO string
    new_df = new_df.with_columns([
        pl.col('Event_Date_Time').map_elements(lambda x: int_to_time(int(x)), return_dtype=pl.Time).alias('Event_Date_Time')
    ])
    new_df = new_df.with_columns([
        pl.col('Event_Date_Time').map_elements(lambda x: datetime.combine(temp_date, x), return_dtype=pl.Datetime).alias('Event_Date_Time')
    ])

    # Calculate Trade_Size
    new_df = new_df.with_columns([
        (pl.col('Dollars') / pl.col('NumTrades')).alias('Trade_Size')
    ])

    # Drop rows with NaN in 'Dollars'
    new_df = new_df.drop_nulls(subset=['Dollars'])

    # Sort by Event_Date_Time
    new_df = new_df.sort('Event_Date_Time')

    # If parquet file exists, append new data
    if os.path.exists(parquet_path):
        old_df = pl.read_parquet(parquet_path)
        full_df = pl.concat([old_df, new_df])
    else:
        full_df = new_df

    # Save updated DataFrame to parquet
    full_df.write_parquet(parquet_path)

    # Update last size
    with open(last_size_path, 'wb') as f:
        pickle.dump(current_size, f)

    return full_df

schema = {
    "Symbol": pl.Utf8,
    "T1": pl.Utf8,
    "Event_Date_Time": pl.Utf8,
    "NumQuotes": pl.Int64,
    "BBO": pl.Int64,
    "NumTrades": pl.Int64,
    "Dollars": pl.Float64,
    "T3": pl.Utf8
}
df = monitor_and_update_hft1_parquet(
    csv_path="./Data/20250722_hft1.csv",
    parquet_path="./Processed_Data/20250722_hft1.parquet",
    last_size_path="./Data/20250722_hft1_lastsize.pkl",
    schema=schema
)


Symbol,T1,Event_Date_Time,NumQuotes,BBO,NumTrades,Dollars,T3,Trade_Size
str,str,datetime[μs],i64,i64,i64,f64,str,f64
"""OSCR""","""0""",2025-07-22 06:00:00,1118,502,761,884167.0,,1161.848883
"""OPEN""","""0""",2025-07-22 07:00:00,2411,1093,681,959758.0,,1409.33627
"""IVF""","""0""",2025-07-22 07:00:04,1014,278,289,166448.0,,575.944637
"""IVF""","""0""",2025-07-22 07:00:05,1509,534,486,144934.0,,298.218107
"""IVF""","""0""",2025-07-22 07:00:06,1131,463,468,105815.0,,226.100427
…,…,…,…,…,…,…,…,…
"""QQQ""","""0""",2025-07-22 16:04:31,1506,309,15,722136.0,,48142.4
"""ENPH""","""0""",2025-07-22 16:05:00,1357,299,572,816241.0,,1426.994755
"""SOXS""","""0""",2025-07-22 16:08:17,1135,399,261,410666.0,,1573.43295
"""SOXS""","""0""",2025-07-22 16:10:00,1387,355,104,383240.0,,3685.0


# Parameter Initialization

In [3]:
#Set start date for reading hft1 burst files
data_start_date= '20240417'
start_date = datetime.strptime(data_start_date, '%Y%m%d')

# Get yesterday's date
yesterday = datetime.now() - timedelta(days=1)

# Create a calendar for NYSE
nyse = mcal.get_calendar('NYSE')
# Get the market schedule for the last 5 days up to today
schedule = nyse.schedule(start_date=yesterday - timedelta(days=5), end_date=yesterday)

# Find the most recent trading day
previous_date = schedule.iloc[-1].name.strftime('%Y%m%d')
data_end_date = datetime.strptime(previous_date, '%Y%m%d').replace(hour=16, minute=0, second=0)
print(f"The most recent NYSE open day is: {previous_date}")

#Set the end date to the most recent complete trading day
end_date=data_end_date 

# Set the directory path where the CSV files are located. The application generated by this is intended to run in the directory that the burst monitor is running in and utilize
# the same directory structure. The directory path is relative to the directory that the application is running in.

directory_path = './Data/'

# Set the file paths for the processed hft1 burst data and price data
file_path = os.path.join(directory_path, 'burst_data_processed.parquet')
# price_data_file_path = os.path.join(directory_path, 'price_data_processed_5.parquet')
price_data_file_path = os.path.join(directory_path, 'price_data_processed.parquet')

high_quote_threshold = 0
active_high_quote_threshold = 3500
files = os.listdir(directory_path)

The most recent NYSE open day is: 20250721


In [4]:
files

[]

In [None]:
# This cell implements all of the code to monitor the incoming datafiles to determine when a change has been made to the most recent file and to trigger
# a message based on that change. A dataframe containing all of the previous data must first be generated to provide the information for the message.

# This code is designed to run after the Burst Monitor is activated for the day and has generated it's initial files for that day.

# This function runs an endless loop until termianted by a keyboard interrupt from the user

# Set the directory path where the CSV files are located. The application generated by this is intended to run in the directory that the burst monitor is running in and utilize
# the same directory structure. The directory path is relative to the directory that the application is running in.

active_high_quote_threshold = 3500
directory_path = './Data/'

# The following code is used to allow for picking up where the monitoring left off if it is interrupted if there have already been messages published.

try:
    len(published_msg)
except NameError:
    published_msg = []

# Get a list of all files in the directory
files = os.listdir(directory_path)
# This identifies the file that is being modified today and uses it to determine when to trigger a message.
# Filter the list to include only files that end with '.csv' and contain '_hft1' in the filename
filtered_files = [f for f in files if f.endswith('.csv') and '_hft1' in f]
filtered_files = sorted(filtered_files, key=lambda f: os.path.getmtime(os.path.join(directory_path, f)), reverse=True)
most_recent_file=os.path.join(directory_path, filtered_files[0])

# Set the file paths for the processed hft1 burst data and price data
file_path = os.path.join(directory_path, 'burst_data_processed.parquet')
concatenated_df=load_data(file_path)
#Filter Concatenated Dataframe to only include data with NumQuotes greater than the axtive_high_quote_threshold
concatenated_df = concatenated_df.filter(pl.col('NumQuotes') > active_high_quote_threshold)

temp=concatenated_df.sort(by='Event_Date_Time', descending=False)

#Remove Items from temp that have occurred today to prevent duplicate messages when starting server during the trading day

today = datetime.today().date()
today_date = datetime.today().strftime('%Y%m%d')
# Filter the DataFrame to keep only rows where Event_Date_Time is not today
temp = temp.filter(pl.col("Event_Date_Time").dt.date() != today)

# The complete dataframe is in temp and the following code neesd to monitor todays dataframe and update temp if changed, update it accordingly and generate the message

with open(most_recent_file) as f:
    #This is the first run through looking for messages for today -probably simpler to reread the whole data frame and process differences]
    todays_events = make_df_from_high_quote_count(most_recent_file, active_high_quote_threshold)  
    if not todays_events.is_empty():
        msg_text, symbols = generate_alert_strings(temp,todays_events)
        for msg in msg_text:
            if msg not in published_msg:
                display(Markdown(msg))
                # send_slack_message('#burst_report',msg,slack_api_token)
                # response = send_twitter_message(msg,service_name='KII')
                # response = send_twitter_message(msg,service_name='HFT')
                published_msg.append(msg)
    else:
        symbols = []
        total_events = 0
    f.close()

#Mark the time that the file was last processed, the number of messages published and the length of todays_events
last_modified = os.path.getmtime(most_recent_file)
current_length=len(todays_events)

try:
    while True:
        current_modified = os.path.getmtime(most_recent_file)       
        if current_modified != last_modified:
            # File has changed, open it and see if there are any relevant events in the new information, reset last_modified flag
            with open(most_recent_file) as f:
                todays_events = make_df_from_high_quote_count(most_recent_file,active_high_quote_threshold)
                if len(todays_events) != current_length:
                    #There are new events check if the events are related to symbols that have already been published. Get Unique symbols
                    msg_text,new_symbols=generate_alert_strings(temp,todays_events)
                    # unpublished_symbols = [symbol for symbol in new_symbols if symbol not in symbols]
                    positions = np.where(np.isin(new_symbols, symbols, invert=True))
                    if len(positions[0]) > 0:
                         # positions = [new_symbols.index(symbol) for symbol in unpublished_symbols]
                        # selected_messages = [msg_text[position] for position in positions]
                        selected_messages = [msg_text[position] for position in np.concatenate(positions)]
                        for msg in selected_messages:
                            display(Markdown(msg))
                            # send_slack_message('#burst_report',msg,slack_api_token)
                            # response = send_twitter_message(msg,service_name='KII')
                            # response = send_twitter_message(msg,service_name='HFT')
                            published_msg.append(msg)
                    symbols = new_symbols    
                    current_length=len(todays_events)
                f.close()
        last_modified = os.path.getmtime(most_recent_file)
        # total_events = len(todays_events)
        # total_events_symbols=todays_events['Symbol'].value_counts()
        # # Create the event_list_msg
        # # Create the event_list_msg
        # event_list_msg = ' '.join([f'${row[0]}-{row[1]}' for row in total_events_symbols.iter_rows()])

        # # print(f'Daily Wrap - Tickers: {event_list_msg}')
        # final_msg = f'Daily Wrap up for {today} - {total_events} events.'# Threshold - {high_quote_threshold}
        # print(f'Daily Wrap - Final Message: {final_msg}')

        # user_input = input("Type 'exit' to stop: ")
        # if user_input.lower() == 'exit':
        #     break

        sleep(10)



except KeyboardInterrupt:
        print('Keyboard interrupt detected. Exiting...')
        total_events = len(todays_events)
        total_events_symbols=todays_events['Symbol'].value_counts()
        total_events_symbols =  total_events_symbols.sort(by='count', descending=True)
        # Create the event_list_msg
        event_list_msg = ' '.join([f'${row[0]}-{row[1]}' for row in total_events_symbols.iter_rows()])

        # print(f'Daily Wrap - Tickers: {event_list_msg}')
        final_msg = f'Daily Wrap up for {today} - {total_events} events.'# Threshold - {high_quote_threshold}
        # message = event_list_msg
        # pieces = split_message(message, max_length=279)
        print(f'Daily Wrap - Tickers: {event_list_msg}')    

        # for i, piece in enumerate(pieces):
        # for i, item in reversed(list(enumerate(pieces))):
        #     print(f'Daily Wrap - Tickers: {item}')

        print(f'Daily Wrap - Final Message: {final_msg}')
# #             # send_slack_message('#burst_report',piece,slack_api_token)
# #             # response = send_twitter_message(item,service_name='KII')
# #             # response = send_twitter_message(item,service_name='HFT')
# #         # send_slack_message('#burst_report',final_msg,slack_api_token)
# #         # response = send_twitter_message(final_msg,service_name='KII')
# #         # response = send_twitter_message(final_msg,service_name='HFT')