In [1]:
import sys
import os

# Adds the parent directory of the notebook to the path
PROJECT_ROOT = os.path.abspath("..")  # adjust path as needed
if PROJECT_ROOT not in sys.path:
    sys.path.append(PROJECT_ROOT)


In [2]:
import sys
import os
sys.path.append(os.path.abspath(".."))  # Adjust if notebook is elsewhere


In [5]:
import pandas as pd
from utils.helpers import create_ticker_batches
from stream.simulated_stream import SimulatedStream



df = pd.read_csv("../data/SampleCRSPData.csv")  # adjust if needed

batches = create_ticker_batches(df, batch_days=5)
df_batches = pd.DataFrame(batches)

stream = SimulatedStream(df_batches, delay=0).stream()

for batch in stream:
    print(f"Ticker: {batch['ticker']}")
    print(batch['data'][['date', 'PRC', 'VOL']].head())
    break  # remove break to keep streaming


Ticker: AAPL
        date    PRC       VOL
0 2009-01-02  90.75  28014289
1 2009-01-05  94.58  41844475
2 2009-01-06  93.02  47875392
3 2009-01-07  91.01  27744415
4 2009-01-08  92.70  24662123


In [6]:
import stream.simulated_stream as ss
print(dir(ss))


['BaseStream', 'SimulatedStream', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'pd', 'time']


In [7]:
import sys
import time

for batch in stream:
    print(f"Ticker: {batch['ticker']}", flush=True)
    print(batch['data'][['date', 'PRC', 'VOL']].head(), flush=True)
    time.sleep(20)


Ticker: AAPL
        date    PRC       VOL
0 2009-01-09  90.58  20110229
1 2009-01-12  88.66  22611994
2 2009-01-13  87.71  29403654
3 2009-01-14  85.33  39488108
4 2009-01-15  83.38  54368846


KeyboardInterrupt: 

In [25]:
import pandas as pd
import time
from IPython.display import clear_output, display
from stream.stream_manager import get_stream
from utils.helpers import create_ticker_batches



# Step 1: Load raw CRSP data
df = pd.read_csv("../data/SampleCRSPData.csv")

# Step 2: Preprocess into 5-day batches per TICKER
batches = create_ticker_batches(df, batch_days=5)
df_batches = pd.DataFrame(batches)

# Step 3: Get the stream using stream_manager
stream = get_stream(source_type="simulated", df=df_batches, delay=2)

# Step 4: Stream and display each batch
for batch in stream:
    clear_output(wait=True)
    print(f"🟢 Ticker: {batch['ticker']}")
    display(batch['data'][['date', 'PRC', 'VOL']])
    time.sleep(2)  # Set to 2 sec for testing, change to 20 in prod


🟢 Ticker: AAPL


Unnamed: 0,date,PRC,VOL
0,2009-01-09,90.58,20110229
1,2009-01-12,88.66,22611994
2,2009-01-13,87.71,29403654
3,2009-01-14,85.33,39488108
4,2009-01-15,83.38,54368846


KeyboardInterrupt: 

In [10]:
!pip install yfinance

from stream.stream_manager import get_stream
from utils.helpers import fetch_latest_price  

stream = get_stream(
    source_type="api",
    fetch_fn=lambda: fetch_latest_price("AAPL", interval="1m"),
    polling_interval=60  # poll every 60 seconds
)

from IPython.display import clear_output, display

for row in stream:
    clear_output(wait=True)
    print("📈 New Price Update")
    display(row[["Open", "High", "Low", "Close", "Volume"]])


📈 New Price Update


Price   Ticker
Open    AAPL      201.639999
High    AAPL      201.639999
Low     AAPL      201.639999
Close   AAPL      201.639999
Volume  AAPL        0.000000
Name: 2025-04-03 19:41:00+00:00, dtype: float64

KeyboardInterrupt: 

In [20]:
import time
from collections import deque
from IPython.display import clear_output, display
from stream.stream_manager import get_stream
from utils.helpers import fetch_latest_price  # Your helper for yfinance API
from drift.kl_divergence import detect_drift_kl  # KL divergence function

# Get the live API stream (or simulated, if desired)
stream = get_stream(
    source_type="api",
    fetch_fn=lambda: fetch_latest_price("AAPL", interval="1m"),
    polling_interval=60
)

ref_window_size = 5 
curr_window_size = 2  

W_ref = deque(maxlen=ref_window_size)
W_curr = deque(maxlen=curr_window_size)

for row in stream:
    clear_output(wait=True)

    close_price = row["Close"]
    
    W_curr.append(close_price)

    if len(W_ref) < ref_window_size:
        W_ref.append(close_price)
        print("Building reference window...")
        display(row[["Open", "High", "Low", "Close", "Volume"]])
        continue

    if len(W_curr) == curr_window_size:
        drift, kl_score = detect_drift_kl(
            list(W_curr),
            list(W_ref),
            bins=10,          
            threshold=0.1     
        )
        if drift:
            print(f"🚨 Drift detected! KL divergence: {kl_score:.4f}")
        else:
            print(f"No drift detected. KL divergence: {kl_score:.4f}")
        
        
        display(row[["Open", "High", "Low", "Close", "Volume"]])
        

        W_ref.clear()
        W_ref.extend(W_curr)
        W_curr.clear()
    
    time.sleep(0.1)  


KeyError: 'PRC'

In [34]:
import time
from collections import deque
from IPython.display import clear_output, display
from stream.stream_manager import get_stream
from utils.helpers import fetch_latest_price
from drift.divergence import kl_divergence_from_samples,jensen_shannon_divergence_from_samples,hellinger_distance_from_samples,categorize_drift


#stream = get_stream(
    #source_type="api",
    #fetch_fn=lambda: fetch_latest_price("AAPL", interval="1m"),
    #polling_interval=60  
#)
df = pd.read_csv("../data/SampleCRSPData.csv")

# Step 2: Preprocess into 5-day batches per TICKER
batches = create_ticker_batches(df, batch_days=5)
df_batches = pd.DataFrame(batches)
stream = get_stream(source_type="simulated", df=df_batches, delay=5)



ref_window_size = 5  
curr_window_size = 3

W_ref = deque(maxlen=ref_window_size)
W_curr = deque(maxlen=curr_window_size)

for batch in stream:
    clear_output(wait=True)
    print(f"🟢 Ticker: {batch['ticker']}")

    volume = batch['data']["VOL"]
    

    W_curr.append(volume)
    
    if len(W_ref) < ref_window_size:
        W_ref.append(volume)
        print("Building reference window...")
        display(batch['data'][["date", "VOL"]])
        continue

    if len(W_curr) == curr_window_size:
        kl_score = kl_divergence_from_samples(list(W_curr), list(W_ref), bins=10)
        js_score = jensen_shannon_divergence_from_samples(list(W_curr), list(W_ref), bins=10)
        hellinger_score = hellinger_distance_from_samples(list(W_curr), list(W_ref), bins=10)
    
        drift_kl = categorize_drift(kl_score, method='kl')
        drift_js = categorize_drift(js_score, method='js')
        drift_hellinger = categorize_drift(hellinger_score, method='hellinger')
    
        print(f"KL divergence: {kl_score:.4f} => Drift type: {drift_kl}")
        print(f"JS divergence: {js_score:.4f} => Drift type: {drift_js}")
        print(f"Hellinger distance: {hellinger_score:.4f} => Drift type: {drift_hellinger}")


        
        W_ref.clear()
        W_ref.extend(W_curr)
        W_curr.clear()
    
    time.sleep(0.5)  


🟢 Ticker: AAPL
KL divergence: 0.5731 => Drift type: gradual
JS divergence: 0.1323 => Drift type: stable
Hellinger distance: 0.3783 => Drift type: stable


KeyboardInterrupt: 

In [27]:
print(df.columns.tolist())


['PERMNO', 'date', 'TICKER', 'COMNAM', 'ASKHI', 'PRC', 'VOL', 'RET', 'SHROUT', 'NUMTRD', 'RETX']


In [28]:
print(batches[0]["data"].columns.tolist())


['PERMNO', 'date', 'TICKER', 'COMNAM', 'ASKHI', 'PRC', 'VOL', 'RET', 'SHROUT', 'NUMTRD', 'RETX']


In [35]:
import time
from collections import deque
from IPython.display import clear_output, display
from stream.stream_manager import get_stream
from utils.helpers import create_ticker_batches
from drift.divergence import (
    kl_divergence_from_samples,
    jensen_shannon_divergence_from_samples,
    hellinger_distance_from_samples,
    categorize_drift
)

df = pd.read_csv("../data/SampleCRSPData.csv")

batches = create_ticker_batches(df, batch_days=5)
df_batches = pd.DataFrame(batches)

stream = get_stream(source_type="simulated", df=df_batches, delay=2)

ref_window_size = 10
curr_window_size = 2

W_ref = deque(maxlen=ref_window_size)
W_curr = deque(maxlen=curr_window_size)


for batch in stream:
    clear_output(wait=True)
    print(f"🟢 Ticker: {batch['ticker']}")
    
    price_series = batch['data']["PRC"]
    value = price_series.mean() if hasattr(price_series, 'mean') else price_series
    
    W_curr.append(value)

    if len(W_ref) < ref_window_size:
        W_ref.append(value)
        print("Building reference window...")
        display(batch['data'][["date", "PRC", "VOL"]])
        continue

    if len(W_curr) == curr_window_size:
        kl_score = kl_divergence_from_samples(list(W_curr), list(W_ref), bins=10)
        js_score = jensen_shannon_divergence_from_samples(list(W_curr), list(W_ref), bins=10)
        hellinger_score = hellinger_distance_from_samples(list(W_curr), list(W_ref), bins=10)
    
        drift_kl = categorize_drift(kl_score, method='kl')
        drift_js = categorize_drift(js_score, method='js')
        drift_hellinger = categorize_drift(hellinger_score, method='hellinger')
    
        print(f"KL divergence: {kl_score:.4f} => Drift type: {drift_kl}")
        print(f"JS divergence: {js_score:.4f} => Drift type: {drift_js}")
        print(f"Hellinger distance: {hellinger_score:.4f} => Drift type: {drift_hellinger}")
    
        display(batch['data'][["date", "PRC", "VOL"]])
        
        for sample in list(W_curr):
            W_ref.append(sample)

        W_curr.clear()
    
    time.sleep(0.5)


🟢 Ticker: AAPL
KL divergence: 1.0601 => Drift type: abrupt
JS divergence: 0.2934 => Drift type: stable
Hellinger distance: 0.6236 => Drift type: gradual


Unnamed: 0,date,PRC,VOL
0,2010-11-12,308.03,29405801
1,2010-11-15,307.035,15057055
2,2010-11-16,301.59,24340322
3,2010-11-17,300.5,17841578
4,2010-11-18,308.42999,18422776


KeyboardInterrupt: 