In [1]:
# =========================
# HYBRID ML + TREND BAR STRATEGY WITH ADVANCED TRADE VISUALIZATION
# =========================

import pandas as pd
import numpy as np
from math import sqrt
from sklearn.linear_model import LogisticRegression
from xgboost import XGBClassifier
from hmmlearn.hmm import GaussianHMM
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from openalgo import api

# =========================
# CONFIG
# =========================
SYMBOL = "IRCON"
EXCHANGE = "NSE"

TRAIN_INTERVAL = "D"
TEST_INTERVAL = "15m"

TRAIN_START = "2025-11-01"
TRAIN_END   = "2025-12-31"

TEST_START  = "2026-01-01"
TEST_END    = "2026-01-08"

INITIAL_CAPITAL = 1_000_000
RISK_PER_TRADE = 0.005
STOP_LOSS_PCT = 0.005
TAKE_PROFIT_PCT = 0.028

NEWS_WINDOWS = [
    ("2026-01-05 10:00:00", "2026-01-05 11:00:00"),
]

OPENALGO_KEY = "02b4c23ab2a45a49fa11870d21833e57ec649d00435b78e0634a0958d2b41137"
OPENALGO_HOST = "http://127.0.0.1:5000"

client = api(api_key=OPENALGO_KEY, host=OPENALGO_HOST)

# =========================
# INDICATORS
# =========================

def compute_atr(df, n=14):
    hl = df["high"] - df["low"]
    hc = abs(df["high"] - df["close"].shift())
    lc = abs(df["low"] - df["close"].shift())
    tr = pd.concat([hl, hc, lc], axis=1).max(axis=1)
    return tr.rolling(n).mean()


def compute_rsi(series, n=14):
    delta = series.diff()
    gain = delta.clip(lower=0).rolling(n).mean()
    loss = (-delta.clip(upper=0)).rolling(n).mean()
    rs = gain / loss
    return 100 - (100 / (1 + rs))

# =========================
# FEATURES
# =========================

def create_features(df):
    df = df.copy()
    df["ret1"] = df["close"].pct_change()
    df["ret3"] = df["close"].pct_change(3)
    df["ema50"] = df["close"].ewm(span=50).mean()
    df["ema_dist"] = (df["close"] - df["ema50"]) / df["ema50"]
    df["rsi"] = compute_rsi(df["close"])
    df["atr"] = compute_atr(df)
    df["atr_pct"] = df["atr"] / df["close"]
    df["vol_chg"] = df["volume"].pct_change()
    df["y"] = np.where(df["close"].shift(-1) > df["close"], 1, 0)
    return df.dropna()

# =========================
# REGIME DETECTION
# =========================

def detect_regime(df):
    X = np.column_stack([df["ret1"], df["atr_pct"]])
    hmm = GaussianHMM(n_components=3, n_iter=200)
    hmm.fit(X)
    df["regime"] = hmm.predict(X)
    return df

# =========================
# NEWS FILTER
# =========================

def filter_news(df):
    mask = pd.Series(True, index=df.index)
    for start_str, end_str in NEWS_WINDOWS:
        mask.loc[pd.Timestamp(start_str):pd.Timestamp(end_str)] = False
    return mask

# =========================
# TREND BAR SIGNAL
# =========================

def trend_bar_signals(df):
    prev_high = df["high"].shift(1)
    prev_low = df["low"].shift(1)
    sig = np.zeros(len(df))
    sig[df["close"] > prev_high] = 1
    sig[df["close"] < prev_low] = -1
    return sig

# =========================
# BACKTEST ENGINE
# =========================

def backtest_hybrid(df, signals, risk_per_trade, stop_loss_pct, take_profit_pct):
    capital = INITIAL_CAPITAL
    position = 0
    entry_price = 0
    entry_time = None
    trade_log = []
    equity_curve = []
    pending_short = False

    for i in range(1, len(df) - 1):
        price = df.iloc[i]["close"]
        time = df.index[i]

        equity_curve.append(capital + (position * (price - entry_price) if position != 0 else 0))

        if position > 0:
            sl = entry_price * (1 - stop_loss_pct)
            tp = entry_price * (1 + take_profit_pct)
            if price <= sl or price >= tp or signals[i] == -1:
                pnl = position * (price - entry_price)
                capital += pnl
                trade_log.append([entry_time, time, "LONG(CNC)", entry_price, price, pnl])
                position = 0

        if pending_short:
            entry_price_s = df.iloc[i+1]["open"]
            exit_price_s = df.iloc[i+1]["close"]
            pnl = -position * (exit_price_s - entry_price_s)
            capital += pnl
            trade_log.append([df.index[i+1], df.index[i+1], "SHORT(MIS)", entry_price_s, exit_price_s, pnl])
            pending_short = False

        if position == 0:
            if signals[i] == 1:
                risk_amount = capital * risk_per_trade
                qty = risk_amount / (price * stop_loss_pct)
                position = qty
                entry_price = price
                entry_time = time
            elif signals[i] == -1:
                pending_short = True

    return capital, equity_curve, trade_log

# =========================
# DATA FETCH
# =========================

def get_data(start, end, interval):
    df = client.history(symbol=SYMBOL, exchange=EXCHANGE, interval=interval, start_date=start, end_date=end)
    df.index = pd.to_datetime(df.index).tz_localize(None)
    return df

# =========================
# ML MODELS
# =========================

MODELS = {
    "logistic": LogisticRegression(max_iter=500),
    "xgb": XGBClassifier(n_estimators=150, max_depth=3, verbosity=0)
}


def select_model(train_df, features):
    best_model, best_name, best_score = None, None, -999

    for name, model in MODELS.items():
        model.fit(train_df[features], train_df["y"])
        preds = model.predict(train_df[features])
        sigs = np.where(preds == 1, 1, -1)
        _, equity_curve, _ = backtest_hybrid(train_df, sigs, RISK_PER_TRADE, STOP_LOSS_PCT, TAKE_PROFIT_PCT)
        returns = pd.Series(equity_curve).pct_change().dropna()
        sharpe = sqrt(252) * returns.mean() / returns.std() if returns.std() != 0 else 0

        if sharpe > best_score:
            best_model, best_name, best_score = model, name, sharpe

    return best_model, best_name

# =========================
# ADVANCED VISUALIZATION
# =========================

def plot_advanced(df, trades_df, equity_curve):
    trades_df = trades_df.copy()
    trades_df["Trade #"] = np.arange(1, len(trades_df) + 1)

    fig = make_subplots(rows=3, cols=1, shared_xaxes=True,
                        subplot_titles=["Price + Trades", "Equity Curve", "Drawdown"],
                        row_heights=[0.6, 0.25, 0.15])

    # Candles
    fig.add_trace(go.Candlestick(x=df.index, open=df.open, high=df.high,
                                 low=df.low, close=df.close, name="Price"), row=1, col=1)

    # Trade markers
    for _, t in trades_df.iterrows():
        color = "green" if t["PnL"] > 0 else "red"
        fig.add_trace(go.Scatter(
            x=[t["Entry Time"]], y=[t["Entry Price"]],
            mode="markers+text",
            text=[f"#{t['Trade #']}"],
            textposition="top center",
            marker=dict(size=10, color=color),
            name="Entry",
            hovertext=f"Trade #{t['Trade #']}<br>Type: {t['Type']}<br>PnL: {t['PnL']:.2f}"
        ), row=1, col=1)

        fig.add_trace(go.Scatter(
            x=[t["Exit Time"]], y=[t["Exit Price"]],
            mode="markers+text",
            text=[f"{t['PnL']:.0f}"],
            textposition="bottom center",
            marker=dict(size=10, color=color, symbol="x"),
            name="Exit",
            hovertext=f"Exit Price: {t['Exit Price']}"
        ), row=1, col=1)

    # Equity
    fig.add_trace(go.Scatter(y=equity_curve, name="Equity"), row=2, col=1)

    equity_series = pd.Series(equity_curve)
    drawdown = equity_series / equity_series.cummax() - 1

    fig.add_trace(go.Scatter(y=drawdown, name="Drawdown", fill="tozeroy"), row=3, col=1)

    fig.update_layout(template="plotly_dark", height=900, title="Advanced Trade Analysis")
    fig.show()

# =========================
# MAIN
# =========================

def main():
    df_train = get_data(TRAIN_START, TRAIN_END, TRAIN_INTERVAL)
    df_test = get_data(TEST_START, TEST_END, TEST_INTERVAL)

    full_raw = pd.concat([df_train, df_test]).sort_index()
    full_feat = detect_regime(create_features(full_raw))

    FEATURES = ["ret1", "ret3", "ema_dist", "rsi", "atr_pct", "vol_chg", "regime"]

    model, model_name = select_model(full_feat.loc[TRAIN_START:TRAIN_END], FEATURES)
    print(f"Selected ML Model: {model_name.upper()}")

    full_feat["pred"] = model.predict(full_feat[FEATURES])
    full_feat["ml_signal"] = np.where(full_feat["pred"] == 1, 1, -1)
    full_feat["tb_signal"] = trend_bar_signals(full_feat)

    full_feat["hybrid_signal"] = np.where(
        (full_feat.ml_signal == 1) & (full_feat.tb_signal == 1), 1,
        np.where((full_feat.ml_signal == -1) & (full_feat.tb_signal == -1), -1, 0)
    )

    mask = filter_news(full_feat)
    full_feat.loc[~mask, "hybrid_signal"] = 0

    capital, equity_curve, trade_log = backtest_hybrid(
        full_feat, full_feat["hybrid_signal"].values, RISK_PER_TRADE, STOP_LOSS_PCT, TAKE_PROFIT_PCT)

    trades_df = pd.DataFrame(trade_log, columns=["Entry Time", "Exit Time", "Type", "Entry Price", "Exit Price", "PnL"])

    plot_advanced(full_feat, trades_df, equity_curve)


if __name__ == "__main__":
    main()


Selected ML Model: XGB


In [5]:
import pandas as pd
import numpy as np
from math import sqrt
from sklearn.linear_model import LogisticRegression
from xgboost import XGBClassifier
from hmmlearn.hmm import GaussianHMM
import plotly.graph_objects as go
from openalgo import api

# =========================
# CONFIG
# =========================
SYMBOL = "IRCON"
EXCHANGE = "NSE"

TRAIN_INTERVAL = "D"
TEST_INTERVAL = "15m"

TRAIN_START = "2025-11-01"
TRAIN_END   = "2025-12-31"

TEST_START  = "2026-01-01"
TEST_END    = "2026-01-08"

INITIAL_CAPITAL = 1_000_000
RISK_PER_TRADE = 0.005
STOP_LOSS_PCT = 0.005
TAKE_PROFIT_PCT = 0.028

POSITIVE_NEWS_RISK_BOOST = 0.30     # +30%
NEGATIVE_NEWS_RISK_REDUCE = 0.50    # -50%
NEGATIVE_NEWS_SL_MULTIPLIER = 0.6

OPENALGO_KEY = "02b4c23ab2a45a49fa11870d21833e57ec649d00435b78e0634a0958d2b41137"
OPENALGO_HOST = "http://127.0.0.1:5000"

# =========================
# CONNECT
# =========================
client = api(api_key=OPENALGO_KEY, host=OPENALGO_HOST)

# =========================
# INDICATORS
# =========================
def compute_atr(df, n=14):
    hl = df["high"] - df["low"]
    hc = abs(df["high"] - df["close"].shift())
    lc = abs(df["low"] - df["close"].shift())
    tr = pd.concat([hl, hc, lc], axis=1).max(axis=1)
    return tr.rolling(n).mean()

def compute_rsi(series, n=14):
    delta = series.diff()
    gain = delta.clip(lower=0).rolling(n).mean()
    loss = (-delta.clip(upper=0)).rolling(n).mean()
    rs = gain / loss
    return 100 - (100 / (1 + rs))

# =========================
# FEATURES
# =========================
def create_features(df):
    df = df.copy()
    df["ret1"] = df["close"].pct_change()
    df["ret3"] = df["close"].pct_change(3)
    df["ema50"] = df["close"].ewm(span=50).mean()
    df["ema_dist"] = (df["close"] - df["ema50"]) / df["ema50"]
    df["rsi"] = compute_rsi(df["close"])
    df["atr"] = compute_atr(df)
    df["atr_pct"] = df["atr"] / df["close"]
    df["vol_chg"] = df["volume"].pct_change()
    df["y"] = np.where(df["close"].shift(-1) > df["close"], 1, 0)
    return df.dropna()

# =========================
# REGIME
# =========================
def detect_regime(df):
    X = np.column_stack([df["ret1"], df["atr_pct"]])
    hmm = GaussianHMM(n_components=3, n_iter=200)
    hmm.fit(X)
    df["regime"] = hmm.predict(X)
    return df

# =========================
# NEWS DECAY INJECTION
# =========================
def inject_manual_news(df):
    df["news_state"] = 0.0

    base_date = pd.Timestamp("2026-01-02")
    decay = [1.0, 0.6, 0.3]

    for i, val in enumerate(decay):
        d = base_date + pd.Timedelta(days=i)
        df.loc[df.index.date == d.date(), "news_state"] = val

    return df

# =========================
# TREND BAR
# =========================
def trend_bar_signals(df):
    prev_high = df["high"].shift(1)
    prev_low = df["low"].shift(1)
    sig = np.zeros(len(df))
    sig[df["close"] > prev_high] = 1
    sig[df["close"] < prev_low] = -1
    return sig

# =========================
# BACKTEST ENGINE
# =========================
def backtest_hybrid(df, signals):
    capital = INITIAL_CAPITAL
    position = 0
    entry_price = 0
    entry_time = None
    trade_log = []
    equity_curve = []

    for i in range(1, len(df) - 1):
        row = df.iloc[i]
        price = row["close"]

        equity_curve.append(capital + (position * (price - entry_price) if position else 0))

        # Exit
        if position > 0:
            sl = entry_price * (1 - STOP_LOSS_PCT)
            tp = entry_price * (1 + TAKE_PROFIT_PCT)
            if price <= sl or price >= tp or signals[i] == -1:
                pnl = position * (price - entry_price)
                capital += pnl
                trade_log.append([entry_time, df.index[i], "LONG", entry_price, price, pnl])
                position = 0

        # Entry
        if position == 0 and signals[i] == 1:
            news_state = row["news_state"]

            current_risk = RISK_PER_TRADE
            current_sl = STOP_LOSS_PCT

            if news_state > 0:
                current_risk *= (1 + POSITIVE_NEWS_RISK_BOOST * news_state)
            elif news_state < 0:
                current_risk *= NEGATIVE_NEWS_RISK_REDUCE
                current_sl *= NEGATIVE_NEWS_SL_MULTIPLIER

            risk_amount = capital * current_risk
            qty = risk_amount / (price * current_sl)

            position = qty
            entry_price = price
            entry_time = df.index[i]

    return capital, equity_curve, trade_log

# =========================
# DATA FETCH
# =========================
def get_data(start, end, interval):
    df = client.history(
        symbol=SYMBOL,
        exchange=EXCHANGE,
        interval=interval,
        start_date=start,
        end_date=end
    )
    df.index = pd.to_datetime(df.index).tz_localize(None)
    return df

# =========================
# ML MODELS
# =========================
MODELS = {
    "logistic": LogisticRegression(max_iter=500),
    "xgb": XGBClassifier(n_estimators=200, max_depth=4, learning_rate=0.05, verbosity=0)
}

def select_model(train_df, features):
    best_model, best_score = None, -999
    for model in MODELS.values():
        model.fit(train_df[features], train_df["y"])
        probs = model.predict_proba(train_df[features])[:,1]
        sigs = np.where(probs > 0.55, 1, np.where(probs < 0.45, -1, 0))

        _, equity, _ = backtest_hybrid(train_df, sigs)
        returns = pd.Series(equity).pct_change().dropna()
        sharpe = sqrt(252) * returns.mean() / returns.std() if returns.std() else 0

        if sharpe > best_score:
            best_score = sharpe
            best_model = model

    return best_model

# =========================
# PERFORMANCE METRICS
# =========================
def print_performance(initial_capital, final_capital, equity_curve, trades_df):
    equity = pd.Series(equity_curve).dropna()
    returns = equity.pct_change().dropna()

    total_return = (final_capital - initial_capital) / initial_capital
    years = len(equity) / 252
    cagr = (final_capital / initial_capital) ** (1 / years) - 1 if years > 0 else 0
    sharpe = sqrt(252) * returns.mean() / returns.std() if returns.std() else 0

    dd = equity / equity.cummax() - 1
    max_dd = dd.min()

    wins = trades_df[trades_df["PnL"] > 0]
    losses = trades_df[trades_df["PnL"] < 0]

    win_rate = len(wins) / len(trades_df) * 100 if len(trades_df) else 0
    profit_factor = wins["PnL"].sum() / abs(losses["PnL"].sum()) if len(losses) else np.inf

    print("\n================ BACKTEST REPORT ================")
    print(f"Initial Capital      : ₹{initial_capital:,.0f}")
    print(f"Final Capital        : ₹{final_capital:,.0f}")
    print(f"Net Profit           : ₹{final_capital-initial_capital:,.0f}")
    print(f"Total Return         : {total_return*100:.2f}%")
    print(f"CAGR                 : {cagr*100:.2f}%")
    print(f"Sharpe Ratio         : {sharpe:.2f}")
    print(f"Max Drawdown         : {max_dd*100:.2f}%")
    print(f"Win Rate             : {win_rate:.2f}%")
    print(f"Profit Factor        : {profit_factor:.2f}")
    print(f"Number of Trades     : {len(trades_df)}")
    print("=================================================\n")

# =========================
# MAIN
# =========================
def main():
    df_train = get_data(TRAIN_START, TRAIN_END, TRAIN_INTERVAL)
    df_test = get_data(TEST_START, TEST_END, TEST_INTERVAL)

    raw = pd.concat([df_train, df_test]).sort_index()

    df = create_features(raw)
    df = detect_regime(df)
    df = inject_manual_news(df)

    FEATURES = ["ret1", "ret3", "ema_dist", "rsi", "atr_pct", "vol_chg", "regime", "news_state"]

    model = select_model(df.loc[TRAIN_START:TRAIN_END], FEATURES)

    probs = model.predict_proba(df[FEATURES])[:,1]
    df["ml_signal"] = np.where(probs > 0.55, 1, np.where(probs < 0.45, -1, 0))

    df["tb_signal"] = trend_bar_signals(df)

    df["hybrid_signal"] = np.where(
        (df["ml_signal"] == 1) & (df["tb_signal"] >= 0), 1,
        np.where((df["ml_signal"] == -1) & (df["tb_signal"] <= 0), -1, 0)
    )

    capital, equity, trades = backtest_hybrid(df, df["hybrid_signal"].values)

    trades_df = pd.DataFrame(trades, columns=["Entry", "Exit", "Type", "EntryPrice", "ExitPrice", "PnL"])
    trades_df.to_csv("hybrid_news_trades.csv", index=False)

    print_performance(INITIAL_CAPITAL, capital, equity, trades_df)

    fig = go.Figure()
    fig.add_trace(go.Scatter(y=equity, name="Equity Curve"))
    fig.update_layout(title="Optimized Hybrid ML + Trend Bar + News Strategy", template="plotly_dark")
    fig.show()

if __name__ == "__main__":
    main()



Initial Capital      : ₹1,000,000
Final Capital        : ₹1,196,297
Net Profit           : ₹196,297
Total Return         : 19.63%
CAGR                 : 29.45%
Sharpe Ratio         : 1.93
Max Drawdown         : -3.94%
Win Rate             : 64.29%
Profit Factor        : 4.33
Number of Trades     : 14

