# Library Import

In [1]:
import pandas as pd 
import yfinance as yf
import polygon 
import numpy as np
from dotenv import load_dotenv
import os
import requests
import duckdb

In [3]:
import psycopg2
# Load environment variables
load_dotenv()

# Get the postgres url
postgres_url = os.environ.get('POSTGRES_URL')
user = os.environ.get('POSTGRES_USER')
password = os.environ.get('POSTGRES_PASSWORD')
intervals = [1, 3, 5, 8, 13]
# Connect to the postgres database
try:
    conn = psycopg2.connect(postgres_url)
    cursor = conn.cursor()
    print("Connected to the timescaledb database")
except Exception as e:
    print(f"Error connecting to the database: {e}")
    conn = None
    cursor = None
    

Connected to the timescaledb database


# Performance Comparison

## Postgres

In [11]:
# Connect to TimescaleDB
conn = psycopg2.connect(postgres_url)
cursor = conn.cursor()

# Query TimescaleDB
query = "SELECT * FROM raw ORDER BY symbol, date DESC"
cursor.execute(query)

# Fetch results
%time results = cursor.fetchall()

CPU times: user 14.9 s, sys: 4.77 s, total: 19.6 s
Wall time: 22.5 s


## DuckDB

In [14]:
## Connect Polar to Timescale Postgres
duck_query = duckdb.sql(f"""
    -- INSTALL postgres_scanner;
    -- LOAD postgres_scanner;

    SELECT * FROM postgres_scan(
        'host=localhost port=5432 user={user} password={password} dbname=condvest',
        'public', 'raw'
    ) ORDER BY symbol, date DESC;
""")

%time duck_df = duck_query.df()

CPU times: user 11.2 s, sys: 9.59 s, total: 20.7 s
Wall time: 20 s


In [5]:
query = f"""
WITH raw_data AS (
    SELECT * FROM postgres_scan(
        'host=localhost port=5432 user={user} password={password} dbname=condvest',
        'public', 'raw' 
    )
),
ranked AS (
    SELECT *,
        row_number() OVER (PARTITION BY symbol ORDER BY date) as rn
    FROM raw_data
),
grouped AS (
    SELECT *,
        (rn - 1) / 3 as group_id
    FROM ranked
)
SELECT 
    symbol,
    min(date) as date,
    first(open) as open,
    max(high) as high,
    min(low) as low,
    last(close) as close,
    sum(volume) as volume
FROM grouped
GROUP BY symbol, group_id
ORDER BY symbol, date;
"""

%time duckdb_result = duckdb.sql(query)

CPU times: user 1.13 ms, sys: 1.25 ms, total: 2.38 ms
Wall time: 30 ms


## Polar

In [40]:
import polars as pl

# Load from Timescale using ConnectorX
polars_df = pl.read_database_uri(
    "SELECT date, symbol, open, high, low, close, volume FROM raw ORDER BY symbol ASC, date ASC",
    uri=postgres_url
)

In [None]:
# Ensure datetime column is properly cast
polars_df = polars_df.with_columns([
    pl.col("date").cast(pl.Datetime("us"))
])

# Resample to 3-day OHLCV candles using Polars' groupby_dynamic
resampled_3d_df = (
    polars_df.group_by_dynamic(
        index_column="date",
        every="3d",
        by="symbol",
        closed="left",
        period="3d"
    )
    .agg([
        pl.col("open").first().alias("open"),
        pl.col("high").max().alias("high"),
        pl.col("low").min().alias("low"),
        pl.col("close").last().alias("close"),
        pl.col("volume").sum().alias("volume")
    ])
    .sort(["symbol", "date"])
)


In [None]:
resampled_3d_df

# Analytics Modules

##  1. DuckDB + Polar Add Indicator

In [None]:
import duckdb
import polars as pl
import time

# Step 1: Connect and load Postgres data into DuckDB
con = duckdb.connect()

combined_results = []
intervals = [1,3,5,8,13]
for interval in intervals:
    start_time = time.time()

    query = f"""
    WITH raw_data AS (
        SELECT * FROM postgres_scan(
            'host=localhost port=5432 user={user} password={password} dbname=condvest',
            'public', 'raw'
        )
    ),
    ranked AS (
        SELECT *,
            row_number() OVER (PARTITION BY symbol ORDER BY date) as rn
        FROM raw_data
    ),
    grouped AS (
        SELECT *,
            (rn - 1) / {interval} as group_id
        FROM ranked
    )
    SELECT 
        symbol,
        min(date) as date,
        first(open) as open,
        max(high) as high,
        min(low) as low,
        last(close) as close,
        sum(volume) as volume,
        '{interval}'::INT as interval
    FROM grouped
    GROUP BY symbol, group_id
    ORDER BY symbol, date;
    """
    
    df = duckdb.sql(query)
    print(f"DuckDB query execution time: {time.time() - start_time:.2f} seconds")
    combined_results.append(df.df())
    print(f"Duck to dataframe conversion time: {time.time() - start_time:.2f} seconds")

# Step 2: Convert to Polars DataFrame
start_time = time.time()
pl_resampled_df = pl.from_pandas(pd.concat(combined_results))
print(f"Pandas to Polars conversion time: {time.time() - start_time:.2f} seconds")

# Step 3: Convert to Polars and add indicators
def add_indicators(df: pl.DataFrame) -> pl.DataFrame:
    df = df.sort("date")
    
    # Step 1: Compute EMAs
    df = df.with_columns([
        pl.col("close").ewm_mean(span=8).alias("EMA_8"),
        pl.col("close").ewm_mean(span=13).alias("EMA_13"),
        pl.col("close").ewm_mean(span=21).alias("EMA_21"),
        pl.col("close").ewm_mean(span=144).alias("EMA_144"),
        pl.col("close").ewm_mean(span=169).alias("EMA_169"),
        pl.col("close").ewm_mean(span=55).alias("EMA_55"),
        pl.col("close").ewm_mean(span=89).alias("EMA_89"),
    ])
    
    # Step 2: Compute MACD and RSI using already-created columns
    df = df.with_columns([
        (pl.col("EMA_13") - pl.col("EMA_21")).alias("macd_fast"),
        (pl.col("EMA_55") - pl.col("EMA_89")).alias("macd_slow"),
    ])
    
    return df

start_time = time.time()
df_with_indicators = pl_resampled_df.group_by("symbol", maintain_order=True).map_groups(add_indicators)
print(f"Indicator calculation time: {time.time() - start_time:.2f} seconds")

print("\nFirst 10 rows of result:")
print(df_with_indicators.head(10))

## 2. Add Alerts

In [5]:
import polars as pl
import numpy as np
from typing import List

# Define intervals and rolling window
intervals = [1, 3, 5, 8, 13]
rolling_window = 50

# Step 1: Add velocity alerts
def add_velocity_alert(df: pl.DataFrame) -> pl.DataFrame:
    """
    Add velocity alerts based on the relationship between price and various EMAs.
    """
    df = df.with_columns([
        pl.when(
            (pl.col("close") > pl.col("open")) & 
            (pl.col("close") > pl.max_horizontal("EMA_8", "EMA_13")) & 
            (pl.col("close") > pl.max_horizontal("EMA_144", "EMA_169")) &
            (pl.min_horizontal("EMA_8", "EMA_13") > pl.max_horizontal("EMA_144", "EMA_169"))
        ).then(pl.lit("velocity_maintained"))
        .when(
            (pl.col("close") < pl.col("EMA_13")) & 
            (pl.col("close") > pl.col("EMA_169"))
        ).then(pl.lit("velocity_weak"))
        .when(
            (pl.col("close") < pl.col("EMA_13")) & 
            (pl.col("close") < pl.col("EMA_169"))
        ).then(pl.lit("velocity_loss"))
        .otherwise(pl.lit("velocity_negotiating"))
        .alias("velocity_status")
    ])
    return df

# Step 2: Add acceleration/deceleration alerts
def add_accel_decel_alert(df: pl.DataFrame, interval: int) -> pl.DataFrame:
    """
    Add acceleration/deceleration alerts based on EMA relationships and velocity status history.
    """
    window_dict = {
        1: 28, 3: 20, 5: 20, 8: 14, 13: 14
    }
    obs_window = window_dict.get(interval, 7)
    
    # First get velocity status
    df = add_velocity_alert(df)
    
    # Count velocity statuses in the observation window
    df = df.with_columns([
        pl.col("velocity_status").map_elements(
            lambda s: 1 if s in ["velocity_loss", "velocity_weak", "velocity_negotiating"] else 0,
            return_dtype=pl.Int32
        ).alias("loss_flag"),
        pl.col("velocity_status").map_elements(
            lambda s: 1 if s == "velocity_maintained" else 0,
            return_dtype=pl.Int32
        ).alias("maintain_flag")
    ])
    
    df = df.with_columns([
        pl.col("loss_flag").rolling_sum(window_size=obs_window).alias("count_velocity_loss"),
        pl.col("maintain_flag").rolling_sum(window_size=obs_window).alias("count_velocity_maintained")
    ])
    
    # Add acceleration/deceleration signals
    df = df.with_columns([
        pl.when(
            (pl.max_horizontal("EMA_144", "EMA_169") <= pl.max_horizontal("EMA_8", "EMA_13")) &
            (pl.col("open") < pl.col("close")) &
            (pl.col("count_velocity_loss") > pl.col("count_velocity_maintained"))
        ).then(pl.lit("accelerated"))
        .when(
            (pl.col("close") < pl.min_horizontal("EMA_8", "EMA_13")) &
            (pl.col("count_velocity_maintained") < pl.col("count_velocity_loss"))
        ).then(pl.lit("decelerated"))
        .otherwise(None).alias("momentum_signal")
    ])
    
    # Create alert
    momentum_alerts = df.filter(pl.col("momentum_signal").is_not_null())
    momentum_alerts = momentum_alerts.with_columns([
        pl.lit("momentum_alert").alias("alert_type"),
        pl.col("momentum_signal").alias("signal"),
        pl.lit(interval).alias("interval")
    ])
    
    return momentum_alerts.select("symbol", "date", "interval", "alert_type", "signal")

# Step 3: Add EMA touch alerts
def add_ema_touch_alert(df: pl.DataFrame, interval: int) -> pl.DataFrame:
    """
    Add alerts for when price touches or comes close to important EMAs.
    """
    tolerance_dict = {
        1: 0.002, 3: 0.02, 5: 0.05, 8: 0.07, 13: 0.1
    }
    tolerance = tolerance_dict.get(interval, 0.02)
    
    # Calculate tolerance bands around EMAs
    df = df.with_columns([
        pl.min_horizontal(
            pl.col("EMA_144"), pl.col("EMA_169")
        ).fill_null(pl.col("EMA_13")).alias("long_term_min"),
        
        pl.max_horizontal(
            pl.col("EMA_144"), pl.col("EMA_169")
        ).fill_null(pl.col("EMA_13")).alias("long_term_max"),
        
        pl.min_horizontal(
            pl.col("EMA_8"), pl.col("EMA_13")
        ).alias("short_term_min"),
        
        pl.max_horizontal(
            pl.col("EMA_8"), pl.col("EMA_13")
        ).alias("short_term_max")
    ])
    
    # Calculate tolerance bands
    df = df.with_columns([
        (pl.col("long_term_min") * (1 - tolerance)).alias("lower_bound"),
        (pl.col("long_term_max") * (1 + tolerance)).alias("upper_bound")
    ])
    
    # Detect touches
    df = df.with_columns([
        pl.when(
            ((pl.col("low") <= pl.col("upper_bound")) & (pl.col("low") >= pl.col("lower_bound"))) |
            ((pl.col("EMA_13") <= pl.col("upper_bound")) & (pl.col("EMA_13") >= pl.col("lower_bound"))) |
            ((pl.col("EMA_8") <= pl.col("upper_bound")) & (pl.col("EMA_8") >= pl.col("lower_bound")))
        ).then(
            pl.when(
                (pl.col("short_term_min") > pl.col("long_term_max")) &
                (pl.min_horizontal(pl.col("close"), pl.col("open")) > pl.col("long_term_min"))
            ).then(pl.lit("support"))
            .when(
                (pl.col("short_term_max") < pl.col("long_term_max")) &
                (pl.col("close") < pl.col("long_term_max"))
            ).then(pl.lit("resistance"))
            .otherwise(pl.lit("neutral"))
        ).otherwise(None).alias("ema_touch_type")
    ])
    
    # Filter for touches and create alert
    ema_touch_alerts = df.filter(pl.col("ema_touch_type").is_not_null())
    ema_touch_alerts = ema_touch_alerts.with_columns([
        pl.lit("ema_touch").alias("alert_type"),
        pl.col("ema_touch_type").alias("signal"),
        pl.lit(interval).alias("interval")
    ])
    
    return ema_touch_alerts.select("symbol", "date", "interval", "alert_type", "signal")

# Step 4: Process all intervals and combine alerts
all_alerts = []
df_with_indicators = df_with_indicators.filter(pl.col("date").dt.replace_time_zone("America/Edmonton") >= pd.to_datetime("2020-01-01").tz_localize("America/Edmonton"))

for interval in intervals:
    df_interval = df_with_indicators.filter(pl.col("interval") == interval)
    
    # Skip empty DataFrames
    if df_interval.height == 0:
        continue
        
    # Add velocity alerts
    velocity_df = add_velocity_alert(df_interval)
    velocity_alerts = velocity_df.with_columns([
        pl.lit("velocity_alert").alias("alert_type"),
        pl.col("velocity_status").alias("signal"),
        pl.lit(interval).alias("interval")
    ]).select("symbol", "date", "interval", "alert_type", "signal")
    
    # Add momentum alerts
    momentum_alerts = add_accel_decel_alert(df_interval, interval)
    
    # Add EMA touch alerts
    ema_touch_alerts = add_ema_touch_alert(df_interval, interval)
    
    # Combine all alerts for this interval
    all_alerts.extend([
        velocity_alerts,
        momentum_alerts,
        ema_touch_alerts
    ])

# Step 5: Combine all alerts into final DataFrame
if all_alerts:
    alert_df = pl.concat(all_alerts)
else:
    # Return empty DataFrame with correct schema if no alerts
    alert_df = pl.DataFrame({
        "symbol": [],
        "date": [],
        "interval": [],
        "alert_type": [],
        "signal": []
    })

# Print results for verification
print("\nFirst 10 rows of alerts:")
print(alert_df.head(10))

# Filter specific alert types for analysis
momentum_df = alert_df.filter(pl.col("alert_type") == "momentum_alert")
ema_df = alert_df.filter(pl.col("alert_type") == "ema_touch")

print("\nFirst 5 momentum alerts:")
print(momentum_df.head())

print("\nFirst 5 EMA touch alerts:")
print(ema_df.head())

## 3. Add Signals

In [11]:
# Step 1: Initialize interval weights
distinct_intervals = alert_df.get_column("interval").unique().sort()
interval_weights = {interval: weight for weight, interval in enumerate(distinct_intervals, 1)}
print("Interval weights:", interval_weights)

# Step 2: Process momentum alerts for micro intervals
momentum_data = alert_df.filter(pl.col("alert_type") == "momentum_alert")
momentum_results = momentum_data.group_by(["symbol", "interval"]).agg([
    pl.when(pl.col("signal") == "accelerated").then(1).otherwise(0).sum().alias("momentum_alert_accelerated"),
    pl.when(pl.col("signal") == "decelerated").then(1).otherwise(0).sum().alias("momentum_alert_decelerated")
])
print("\nMomentum results:")
print(momentum_results.head())

# Step 3: Process EMA touch alerts
ema_data = alert_df.filter(pl.col("alert_type") == "ema_touch")
ema_results = ema_data.group_by(["symbol", "interval"]).agg([
    pl.when(pl.col("signal") == "resistance").then(1).otherwise(0).sum().alias("touch_type_resistance"),
    pl.when(pl.col("signal") == "support").then(1).otherwise(0).sum().alias("touch_type_support"),
    pl.len().alias("count")
])
print("\nEMA touch results:")
print(ema_results.head())

# Step 4: Join momentum and EMA results
micro_results = momentum_results.join(
    ema_results,
    on=["symbol", "interval"],
    how="full"
).fill_null(0)
print("\nCombined micro results:")
print(micro_results.head())

# Step 5: Apply interval weighting to micro results
micro_results = micro_results.with_columns([
    pl.col("interval").map_elements(lambda x: interval_weights.get(x, 0), return_dtype=pl.Int64).alias("interval_weight")
])
print("\nMicro results with weights:")
print(micro_results.head())

# Step 6: Calculate weighted values for micro results
micro_results = micro_results.with_columns([
    (pl.col("momentum_alert_accelerated") * pl.col("interval_weight")).alias("weighted_momentum_alert_accelerated"),
    (pl.col("momentum_alert_decelerated") * pl.col("interval_weight")).alias("weighted_momentum_alert_decelerated"),
    (pl.col("touch_type_resistance") * pl.col("interval_weight")).alias("weighted_touch_type_resistance"),
    (pl.col("touch_type_support") * pl.col("interval_weight")).alias("weighted_touch_type_support")
])
print("\nMicro results with weighted values:")
print(micro_results.head())

# Step 7: Process velocity alerts for macro intervals
velocity_data = alert_df.filter(pl.col("alert_type") == "velocity_alert")
macro_results = velocity_data.group_by(["symbol", "interval"]).agg([
    pl.when(pl.col("signal") == "velocity_maintained").then(1).otherwise(0).sum().alias("velocity_maintained"),
    pl.when(pl.col("signal") == "velocity_weak").then(1).otherwise(0).sum().alias("velocity_weak"),
    pl.when(pl.col("signal") == "velocity_loss").then(1).otherwise(0).sum().alias("velocity_loss")
])
print("\nMacro results:")
print(macro_results.head())

# Step 8: Apply interval weighting to macro results
macro_results = macro_results.with_columns([
    pl.col("interval").map_elements(lambda x: interval_weights.get(x, 0), return_dtype=pl.Int64).alias("interval_weight")
])
print("\nMacro results with weights:")
print(macro_results.head())

# Step 9: Calculate weighted values for macro results
macro_results = macro_results.with_columns([
    (pl.col("velocity_maintained") * pl.col("interval_weight")).alias("weighted_velocity_maintained"),
    (pl.col("velocity_weak") * pl.col("interval_weight")).alias("weighted_velocity_weak"),
    (pl.col("velocity_loss") * pl.col("interval_weight")).alias("weighted_velocity_loss")
])
print("\nMacro results with weighted values:")
print(macro_results.head())

# Step 10: Filter for specific stock categories
short_acc_equ = micro_results.filter(
    (pl.col("weighted_momentum_alert_accelerated") > 1) &
    (pl.col("weighted_momentum_alert_decelerated") < 1) &
    (pl.col("interval") <= 3)
).get_column("symbol")

lng_acc_equ = micro_results.filter(
    (pl.col("weighted_momentum_alert_accelerated") > 1) &
    (pl.col("weighted_momentum_alert_decelerated") < 1) &
    (pl.col("interval") == 5)
).get_column("symbol")

lng_main_acc_equ = micro_results.filter(
    (pl.col("weighted_touch_type_support") > 1) &
    (pl.col("weighted_touch_type_resistance") < 1) &
    (pl.col("count") >= 1) &
    (pl.col("interval") == 5)
).get_column("symbol")

maintained_stocks = macro_results.filter(
    (pl.col("weighted_velocity_maintained") > 0) &
    (pl.col("weighted_velocity_weak") == 0) &
    (pl.col("weighted_velocity_loss") == 0) &
    (pl.col("interval") >= 8)
).get_column("symbol")

print("\nShort accelerating stocks:", short_acc_equ.to_list())
print("\nLong accelerating stocks:", lng_acc_equ.to_list())
print("\nLong accumulating stocks:", lng_main_acc_equ.to_list())
print("\nVelocity maintained stocks:", maintained_stocks.to_list())

# Step 11: Group results by date and create final DataFrame
grouped_data = alert_df.group_by("date")
results = []

for date, group in grouped_data:
    micro_data = group.filter(pl.col("interval") <= 5)
    macro_data = group.filter(pl.col("interval") >= 8)
    
    date_results = {
        "date": date,
        "accelerating": short_acc_equ.to_list(),
        "long_accelerating": lng_acc_equ.to_list(),
        "long_accumulating": lng_main_acc_equ.to_list(),
        "velocity_maintained": maintained_stocks.to_list()
    }
    results.append(date_results)

final_results = pl.DataFrame(results)
print("\nFinal results:")
print(final_results)

## Data Loading Module

In [13]:
class DataLoader:
    def __init__(self, user: str, password: str, intervals: List[int]):
        self.user = user
        self.password = password
        self.con = duckdb.connect()
        self.intervals = intervals

    def _get_resampled_data(self, interval: int) -> pd.DataFrame:
        query = f"""
        WITH raw_data AS (
            SELECT * FROM postgres_scan(
                'host=localhost port=5432 user={self.user} password={self.password} dbname=condvest',
                'public', 'raw'
            )
        ),
        ranked AS (
            SELECT *,
                row_number() OVER (PARTITION BY symbol ORDER BY date) as rn
            FROM raw_data
        ),
        grouped AS (
            SELECT *,
                (rn - 1) / {interval} as group_id
            FROM ranked
        )
        SELECT 
            symbol,
            min(date) as date,
            first(open) as open,
            max(high) as high,
            min(low) as low,
            last(close) as close,
            sum(volume) as volume,
            '{interval}'::INT as interval
        FROM grouped
        GROUP BY symbol, group_id
        ORDER BY symbol, date;
        """
        
        return self.con.sql(query).df()

    def load_data(self) -> pl.DataFrame:
        start_time = time.time()
        
        # Load and resample data for each interval
        combined_results = []
        for interval in self.intervals:
            interval_start = time.time()
            df = self._get_resampled_data(interval)
            print(f"DuckDB query execution time for interval {interval}: {time.time() - interval_start:.2f} seconds")
            combined_results.append(df)
            print(f"Duck to dataframe conversion time: {time.time() - interval_start:.2f} seconds")

        # Convert to Polars DataFrame
        pl_resampled_df = pl.from_pandas(pd.concat(combined_results))
        print(f"Pandas to Polars conversion time: {time.time() - start_time:.2f} seconds")

        return pl_resampled_df


In [None]:
# Load data Class
data_loader = DataLoader(user=user, password=password, intervals=intervals)
df = data_loader.load_data()

## Indicator Calculator Module

In [17]:
class IndicatorCalculator:
    def __init__(self):
        pass

    def add_indicators(self, df: pl.DataFrame) -> pl.DataFrame:
        df = df.sort("date")
        
        # Compute EMAs
        df = df.with_columns([
            pl.col("close").ewm_mean(span=8).alias("EMA_8"),
            pl.col("close").ewm_mean(span=13).alias("EMA_13"),
            pl.col("close").ewm_mean(span=21).alias("EMA_21"),
            pl.col("close").ewm_mean(span=144).alias("EMA_144"),
            pl.col("close").ewm_mean(span=169).alias("EMA_169"),
            pl.col("close").ewm_mean(span=55).alias("EMA_55"),
            pl.col("close").ewm_mean(span=89).alias("EMA_89"),
        ])
        
        # Compute MACD
        df = df.with_columns([
            (pl.col("EMA_13") - pl.col("EMA_21")).alias("macd_fast"),
            (pl.col("EMA_55") - pl.col("EMA_89")).alias("macd_slow"),
        ])
        
        return df

    def calculate_indicators(self, df: pl.DataFrame) -> pl.DataFrame:
        start_time = time.time()
        
        # Add indicators
        df_with_indicators = df.group_by("symbol", maintain_order=True).map_groups(self.add_indicators)
        print(f"Indicator calculation time: {time.time() - start_time:.2f} seconds")

        return df_with_indicators


In [None]:
# Calculate indicators
indicator_calculator = IndicatorCalculator()
results = indicator_calculator.calculate_indicators(df)
print("\nFirst 10 rows of result:")
print(results.head(10))

## Add Alerts Module

In [18]:
import polars as pl
import numpy as np
from typing import List

class TrendAlertProcessor:
    """
    TrendAlertProcessor using Polars for efficient processing of financial time series data.
    Incorporates advanced trend detection algorithms from the dictionary-based implementation.
    """
    def __init__(self, df: pl.DataFrame, intervals: List[int]):
        self.df = df
        self.intervals = intervals
        self.rolling_window = 50
    
    def _add_velocity_alert(self, df: pl.DataFrame) -> pl.DataFrame:
        """
        Add velocity alerts based on the relationship between price and various EMAs.
        Similar to velocity_alert_dict in the original implementation.
        """
        # Add velocity status
        df = df.with_columns([
            pl.when(
                (pl.col("close") > pl.col("open")) & 
                (pl.col("close") > pl.max_horizontal("EMA_8", "EMA_13")) & 
                (pl.col("close") > pl.max_horizontal("EMA_144", "EMA_169")) &
                (pl.min_horizontal("EMA_8", "EMA_13") > pl.max_horizontal("EMA_144", "EMA_169"))
            ).then(pl.lit("velocity_maintained"))
            .when(
                (pl.col("close") < pl.col("EMA_13")) & 
                (pl.col("close") > pl.col("EMA_169"))
            ).then(pl.lit("velocity_weak"))
            .when(
                (pl.col("close") < pl.col("EMA_13")) & 
                (pl.col("close") < pl.col("EMA_169"))
            ).then(pl.lit("velocity_loss"))
            .otherwise(pl.lit("velocity_negotiating"))
            .alias("velocity_status")
        ])
        
        return df
    
    def _add_accel_decel_alert(self, df: pl.DataFrame, interval: int) -> pl.DataFrame:
        """
        Add acceleration/deceleration alerts based on EMA relationships and velocity status history.
        """
        window_dict = {
            1: 28, 3: 20, 5: 20, 8: 14, 13: 14
        }
        obs_window = window_dict.get(interval, 7)
        
        # First get velocity status
        df = self._add_velocity_alert(df)
        
        # Count velocity statuses in the observation window
        df = df.with_columns([
            pl.col("velocity_status").map_elements(
                lambda s: 1 if s in ["velocity_loss", "velocity_weak", "velocity_negotiating"] else 0,
                return_dtype=pl.Int32
            ).alias("loss_flag"),
            pl.col("velocity_status").map_elements(
                lambda s: 1 if s == "velocity_maintained" else 0,
                return_dtype=pl.Int32
            ).alias("maintain_flag")
        ])
        
        df = df.with_columns([
            pl.col("loss_flag").rolling_sum(window_size=obs_window).alias("count_velocity_loss"),
            pl.col("maintain_flag").rolling_sum(window_size=obs_window).alias("count_velocity_maintained")
        ])
        
        # Add acceleration/deceleration signals
        df = df.with_columns([
            pl.when(
                (pl.max_horizontal("EMA_144", "EMA_169") <= pl.max_horizontal("EMA_8", "EMA_13")) &
                (pl.col("open") < pl.col("close")) &
                (pl.col("count_velocity_loss") > pl.col("count_velocity_maintained"))
            ).then(pl.lit("accelerated"))
            .when(
                (pl.col("close") < pl.min_horizontal("EMA_8", "EMA_13")) &
                (pl.col("count_velocity_maintained") < pl.col("count_velocity_loss"))
            ).then(pl.lit("decelerated"))
            .otherwise(None).alias("momentum_signal")
        ])
        
        # Create alert
        momentum_alerts = df.filter(pl.col("momentum_signal").is_not_null())
        momentum_alerts = momentum_alerts.with_columns([
            pl.lit("momentum_alert").alias("alert_type"),
            pl.col("momentum_signal").alias("signal"),
            pl.lit(interval).alias("interval")
        ])
        
        return momentum_alerts.select("symbol", "date", "interval", "alert_type", "signal")
    
    def _add_ema_touch_alert(self, df: pl.DataFrame, interval: int) -> pl.DataFrame:
        """
        Add alerts for when price touches or comes close to important EMAs.
        """
        tolerance_dict = {
            1: 0.002, 3: 0.02, 5: 0.05, 8: 0.07, 13: 0.1
        }
        tolerance = tolerance_dict.get(interval, 0.02)
        
        # Calculate tolerance bands around EMAs
        df = df.with_columns([
            pl.min_horizontal(
                pl.col("EMA_144"), pl.col("EMA_169")
            ).fill_null(pl.col("EMA_13")).alias("long_term_min"),
            
            pl.max_horizontal(
                pl.col("EMA_144"), pl.col("EMA_169")
            ).fill_null(pl.col("EMA_13")).alias("long_term_max"),
            
            pl.min_horizontal(
                pl.col("EMA_8"), pl.col("EMA_13")
            ).alias("short_term_min"),
            
            pl.max_horizontal(
                pl.col("EMA_8"), pl.col("EMA_13")
            ).alias("short_term_max")
        ])
        
        # Calculate tolerance bands
        df = df.with_columns([
            (pl.col("long_term_min") * (1 - tolerance)).alias("lower_bound"),
            (pl.col("long_term_max") * (1 + tolerance)).alias("upper_bound")
        ])
        
        # Detect touches
        df = df.with_columns([
            pl.when(
                ((pl.col("low") <= pl.col("upper_bound")) & (pl.col("low") >= pl.col("lower_bound"))) |
                ((pl.col("EMA_13") <= pl.col("upper_bound")) & (pl.col("EMA_13") >= pl.col("lower_bound"))) |
                ((pl.col("EMA_8") <= pl.col("upper_bound")) & (pl.col("EMA_8") >= pl.col("lower_bound")))
            ).then(
                pl.when(
                    (pl.col("short_term_min") > pl.col("long_term_max")) &
                    (pl.min_horizontal(pl.col("close"), pl.col("open")) > pl.col("long_term_min"))
                ).then(pl.lit("support"))
                .when(
                    (pl.col("short_term_max") < pl.col("long_term_max")) &
                    (pl.col("close") < pl.col("long_term_max"))
                ).then(pl.lit("resistance"))
                .otherwise(pl.lit("neutral"))
            ).otherwise(None).alias("ema_touch_type")
        ])
        
        # Filter for touches and create alert
        ema_touch_alerts = df.filter(pl.col("ema_touch_type").is_not_null())
        ema_touch_alerts = ema_touch_alerts.with_columns([
            pl.lit("ema_touch").alias("alert_type"),
            pl.col("ema_touch_type").alias("signal"),
            pl.lit(interval).alias("interval")
        ])
        
        return ema_touch_alerts.select("symbol", "date", "interval", "alert_type", "signal")
    
    def apply(self) -> pl.DataFrame:
        """
        Apply all alert detection algorithms and return a combined DataFrame of alerts.
        """
        all_alerts = []
        
        for interval in self.intervals:
            df_interval = self.df.filter(pl.col("interval") == interval)
            
            # No empty DataFrames
            if df_interval.height == 0:
                continue
                
            # Add velocity alerts
            velocity_df = self._add_velocity_alert(df_interval)
            velocity_alerts = velocity_df.with_columns([
                pl.lit("velocity_alert").alias("alert_type"),
                pl.col("velocity_status").alias("signal"),
                pl.lit(interval).alias("interval")
            ]).select("symbol", "date", "interval", "alert_type", "signal")
            
            # Add momentum alerts
            momentum_alerts = self._add_accel_decel_alert(df_interval, interval)
            
            # Add EMA touch alerts
            ema_touch_alerts = self._add_ema_touch_alert(df_interval, interval)
            
            # Combine all alerts for this interval
            all_alerts.extend([
                velocity_alerts,
                momentum_alerts,
                ema_touch_alerts
            ])
        
        # Combine all alerts into a single DataFrame
        if all_alerts:
            return pl.concat(all_alerts)
        else:
            # Return empty DataFrame with correct schema if no alerts
            return pl.DataFrame({
                "symbol": [],
                "date": [],
                "interval": [],
                "alert_type": [],
                "signal": []
            })

## Add Signal Module

In [19]:
class StockCandidatesProcessor:
    """
    A class to process and analyze stock candidates using Polars for efficient data processing.
    Analyzes stocks based on various criteria and alerts from different time intervals.
    """
    def __init__(self, df: pl.DataFrame):
        self.df = df
        self.interval_weights = None
        self._initialize_weights()
        
    def _initialize_weights(self):
        """Initialize interval weights based on unique intervals in the data."""
        distinct_intervals = self.df.get_column("interval").unique().sort()
        self.interval_weights = {interval: weight for weight, interval in enumerate(distinct_intervals, 1)}
        
    def _evaluate_micro_interval_stocks(self, data: pl.DataFrame) -> dict:
        """
        Evaluate stocks based on micro-interval criteria (intervals <= 5).
        Analyzes acceleration and accumulation patterns.
        """
        # Process momentum alerts
        momentum_data = data.filter(pl.col("alert_type") == "momentum_alert")
        momentum_results = momentum_data.group_by(["symbol", "interval"]).agg([
            pl.when(pl.col("signal") == "accelerated").then(1).otherwise(0).sum().alias("momentum_alert_accelerated"),
            pl.when(pl.col("signal") == "decelerated").then(1).otherwise(0).sum().alias("momentum_alert_decelerated")
        ])
        
        # Process EMA touch alerts
        ema_data = data.filter(pl.col("alert_type") == "ema_touch")
        ema_results = ema_data.group_by(["symbol", "interval"]).agg([
            pl.when(pl.col("signal") == "resistance").then(1).otherwise(0).sum().alias("touch_type_resistance"),
            pl.when(pl.col("signal") == "support").then(1).otherwise(0).sum().alias("touch_type_support"),
            pl.len().alias("count")
        ])
        
        # Join the results
        results = momentum_results.join(
            ema_results,
            on=["symbol", "interval"],
            how="full"
        ).fill_null(0)
        
        # Apply interval weighting
        results = results.with_columns([
            pl.col("interval").map_elements(lambda x: self.interval_weights.get(x, 0), return_dtype=pl.Int64).alias("interval_weight")
        ])
        
        # Calculate weighted values
        results = results.with_columns([
            (pl.col("momentum_alert_accelerated") * pl.col("interval_weight")).alias("weighted_momentum_alert_accelerated"),
            (pl.col("momentum_alert_decelerated") * pl.col("interval_weight")).alias("weighted_momentum_alert_decelerated"),
            (pl.col("touch_type_resistance") * pl.col("interval_weight")).alias("weighted_touch_type_resistance"),
            (pl.col("touch_type_support") * pl.col("interval_weight")).alias("weighted_touch_type_support")
        ])
        
        # Filter for accelerating stocks
        short_acc_equ = results.filter(
            (pl.col("weighted_momentum_alert_accelerated") > 1) &
            (pl.col("weighted_momentum_alert_decelerated") < 1) &
            (pl.col("interval") <= 3)
        ).get_column("symbol")
        
        lng_acc_equ = results.filter(
            (pl.col("weighted_momentum_alert_accelerated") > 1) &
            (pl.col("weighted_momentum_alert_decelerated") < 1) &
            (pl.col("interval") == 5)
        ).get_column("symbol")
        
        lng_main_acc_equ = results.filter(
            (pl.col("weighted_touch_type_support") > 1) &
            (pl.col("weighted_touch_type_resistance") < 1) &
            (pl.col("count") >= 1) &
            (pl.col("interval") == 5)
        ).get_column("symbol")
        
        return {
            "accelerating": short_acc_equ.to_list(),
            "long_accelerating": lng_acc_equ.to_list(),
            "long_accumulating": lng_main_acc_equ.to_list()
        }

    def _evaluate_macro_interval_stocks(self, data: pl.DataFrame) -> dict:
        """
        Evaluate stocks based on macro-interval criteria (intervals >= 8).
        Analyzes velocity maintenance patterns.
        """
        # Process velocity alerts
        velocity_data = data.filter(pl.col("alert_type") == "velocity_alert")
        results = velocity_data.group_by(["symbol", "interval"]).agg([
            pl.when(pl.col("signal") == "velocity_maintained").then(1).otherwise(0).sum().alias("velocity_maintained"),
            pl.when(pl.col("signal") == "velocity_weak").then(1).otherwise(0).sum().alias("velocity_weak"),
            pl.when(pl.col("signal") == "velocity_loss").then(1).otherwise(0).sum().alias("velocity_loss")
        ])
        
        # Apply interval weighting
        results = results.with_columns([
        pl.col("interval").map_elements(lambda x: self.interval_weights.get(x, 0), return_dtype=pl.Int64).alias("interval_weight")
        ])
        
        # Calculate weighted values
        results = results.with_columns([
            (pl.col("velocity_maintained") * pl.col("interval_weight")).alias("weighted_velocity_maintained"),
            (pl.col("velocity_weak") * pl.col("interval_weight")).alias("weighted_velocity_weak"),
            (pl.col("velocity_loss") * pl.col("interval_weight")).alias("weighted_velocity_loss")
        ])
        
        # Filter for maintained velocity stocks
        maintained_stocks = results.filter(
            (pl.col("weighted_velocity_maintained") > 0) &
            (pl.col("weighted_velocity_weak") == 0) &
            (pl.col("weighted_velocity_loss") == 0) &
            (pl.col("interval") >= 8)
        ).get_column("symbol")
        
        return {
            "velocity_maintained": maintained_stocks.to_list()
        }

    def process(self) -> pl.DataFrame:
        """
        Process the data and generate stock candidates for each date.
        Returns a DataFrame with the results.
        """
        # Group data by date
        grouped_data = self.df.group_by("date")
        
        # Process candidates for each date
        results = []
        
        for date, group in grouped_data:
            # Process micro-interval data
            micro_data = group.filter(pl.col("interval") <= 5)
            micro_results = self._evaluate_micro_interval_stocks(micro_data)
            
            # Process macro-interval data
            macro_data = group.filter(pl.col("interval") >= 8)
            macro_results = self._evaluate_macro_interval_stocks(macro_data)
            
            # Combine results
            combined_results = {**micro_results, **macro_results}
            combined_results["date"] = date
            
            results.append(combined_results)
        
        # Convert results to DataFrame
        return pl.DataFrame(results)

## Completed OLAP Processor

In [None]:
# Step 1: Load and preprocess data
data_loader = DataLoader(user=user, password=password)
df = data_loader.load_data()

# Step 2: Calculate indicators
indicator_calculator = IndicatorCalculator()
df_with_indicators = indicator_calculator.calculate_indicators(df)

# Step 3: Add alerts
df_with_indicators = df_with_indicators.filter(pl.col("date").dt.replace_time_zone("America/Edmonton") >= pd.to_datetime("2020-01-01").tz_localize("America/Edmonton"))
trend_alert = TrendAlertProcessor(df_with_indicators, intervals=[1, 3, 5, 8, 13])
alert_df = trend_alert.apply()

# Step 4: Add signals
stock_candidates = StockCandidatesProcessor(alert_df)
results_df = stock_candidates.process()


In [None]:
results_df.sort("date", descending=True)

# Fast API Wrapper

## Resampled Data Request

In [None]:
# api.py
import duckdb
from fastapi import FastAPI, Query
from pydantic import BaseModel, RootModel
from typing import List, Any

app = FastAPI()
con = duckdb.connect()  # in-process, zero-config

# Optionally register your Postgres raw table via postgres_scan:
con.execute("""
    INSTALL httpfs; LOAD httpfs;
  /* or: INSTALL postgres_scanner; LOAD postgres_scanner; */
  /* then you can do: */
  /* CREATE VIEW raw AS SELECT * FROM postgres_scan('host=…','public','raw'); */
""")

class Row(RootModel):
    root: List[Any]

class QueryResult(BaseModel):
    columns: List[str]
    rows: List[Row]

@app.get("/resample", response_model=QueryResult)
def resample(
    interval: int = Query(3, description="Resample interval in days"),
    limit: int = Query(50, description="Max rows back")
):
    sql = f"""
    WITH ranked AS (
      SELECT *,
        ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY date) AS rn
        FROM raw
    ),
    grp AS (
      SELECT *,
        (rn - 1) / {interval} AS grp_id
        FROM ranked
    )
    SELECT
        symbol,
        MIN(date)   AS date,
        FIRST(open) AS open,
        MAX(high)   AS high,
        MIN(low)    AS low,
        LAST(close) AS close,
        SUM(volume) AS volume
    FROM grp
    GROUP BY symbol, grp_id
    ORDER BY symbol, date DESC
    LIMIT {limit};
    """
    # execute and grab both column names + native Python lists of tuples
    cur = con.execute(sql)
    cols = [c[0] for c in cur.description]
    data = cur.fetchall()
    return QueryResult(columns=cols, rows=[Row(root=list(r)) for r in data])