![QuantConnect Logo](https://cdn.quantconnect.com/web/i/icon.png)
Edge Score From ADX and Time
<hr>
Better understand ADX and Time Stats

In [33]:
# 1.
import importlib
import research.condor_research as cr
importlib.reload(cr)
CondorResearch = cr.CondorResearch

import research.config as cfg
importlib.reload(cfg)
CondorResearchConfig = cfg.CondorResearchConfig

# 2. 
cfg = CondorResearchConfig()

# 3.
import json
import pandas as pd
qb = QuantBook()

key = 'ec279b1c2a070c50a299f2a66da4aaef_2024-10-02_00-00-00_trade_analytics.json'
snp_key = 'ec279b1c2a070c50a299f2a66da4aaef_2024-10-02_00-00-00_trade_snapshots.json'

CR = None
df = None
if qb.object_store.contains_key(key):
    string_data = qb.object_store.read(f"{key}")
    json_data = json.loads(string_data)    
    df = pd.json_normalize(json_data)
    CR = CondorResearch(df, cfg)


snp_df = None
if qb.object_store.contains_key(snp_key):
    string_data = qb.object_store.read(f"{snp_key}")
    json_data = json.loads(string_data)    
    snp_df = pd.json_normalize(json_data)
    


### Plot Time and ADX Buckets to Mean PNL

In [None]:
out = CR.group_stats(keys=["time_bucket", "adx_bucket"], value_col="pnl",
    include_expectancy=False)

plot_df = out.copy()

# Drop low-signal cells
plot_df = plot_df[plot_df["count"] >= 20]

# Map categorical buckets to numeric positions
time_order = list(plot_df["time_bucket"].unique())
adx_order  = list(plot_df["adx_bucket"].unique())

time_map = {k: i for i, k in enumerate(time_order)}
adx_map  = {k: i for i, k in enumerate(adx_order)}

plot_df["time_x"] = plot_df["time_bucket"].map(time_map)
plot_df["adx_y"]  = plot_df["adx_bucket"].map(adx_map)

import matplotlib.pyplot as plt

plt.figure(figsize=(12, 6))

plt.scatter(
    plot_df["time_x"],
    plot_df["adx_y"],
    s=plot_df["count"] * 1.5,          # bubble size = sample size
    c=plot_df["mean"],                 # color = avg pnl
    cmap="RdYlGn",
    alpha=0.75,
    edgecolors="black"
)

plt.xticks(range(len(time_order)), time_order, rotation=45)
plt.yticks(range(len(adx_order)), adx_order)
plt.colorbar(label="Mean PnL")
plt.title("Time × ADX Regime Bubble Map")
plt.xlabel("Time Bucket")
plt.ylabel("ADX Bucket")

plt.tight_layout()
plt.show()



### Create Regime Edge Score 
This is based on time and adx buckets scoring win rate, mean, tail, etc

In [35]:
regime_group = CR.group_stats(keys=["time_bucket", "adx_bucket"], value_col="pnl",
    include_expectancy=False)
regime_group_copy = regime_group.copy()
regime = regime_group_copy[[
    "time_bucket",
    "adx_bucket",
    "count",
    "mean",
    "win_rate",
    "tail_ratio_q05",
    "low_confidence"
]].copy()

from sklearn.preprocessing import MinMaxScaler


scaler = MinMaxScaler()
regime[["mean_n", "win_n"]] = scaler.fit_transform(regime[["mean", "win_rate"]])

# safer tail transform (log helps with huge ratios)
regime["tail_n"] = 1 / (1 + np.log1p(regime["tail_ratio_q05"].clip(lower=0)))

regime["edge_score"] = 0.5*regime["mean_n"] + 0.3*regime["win_n"] + 0.2*regime["tail_n"]
regime.loc[regime["low_confidence"], "edge_score"] *= 0.5

trade_edge = regime.set_index(["time_bucket","adx_bucket"])["edge_score"]

CR.df["regime_edge"] = CR.df.set_index(
    ["time_bucket","adx_bucket"]
).index.map(trade_edge)

CR.df["regime_edge"] = CR.df["regime_edge"].fillna(0)



In [None]:
# Add regime_edge agg into group stats
tidy = CR.group_stats(keys=["time_bucket", "adx_bucket"], value_col="pnl",
    include_expectancy=False, extra_aggs={
        "regime_edge": {"regime_edge": "mean"}  
    }
)

#view worse edges
bad = tidy.sort_values("regime_edge").head(6)
bad[["time_bucket","adx_bucket","count","mean","win_rate","tail_ratio_q05","low_confidence","regime_edge"]]

#### Discover Edge Thresholds

In [None]:
for t in [0.75, 0.78, 0.80, 0.82, 0.84, 0.86, 0.88, 0.9, 0.92, 0.94]:
    CR.df[f"edge_ge_{int(t*100)}"] = CR.df["regime_edge"] >= t
    out = CR.group_stats(keys=[f"edge_ge_{int(t*100)}"], value_col="pnl", include_expectancy=False)
    print("\nThreshold", t)
    print(out[["count","mean","win_rate","tail_ratio_q05","low_confidence"]])


#### Create Edge Bucket

In [None]:
import numpy as np

CR.df["regime_edge"] = CR.df["regime_edge"].clip(0, 1)

q01 = CR.df["regime_edge"].quantile(0.01)
q99 = CR.df["regime_edge"].quantile(0.99)

EDGE_BINS = np.r_[-np.inf, np.linspace(q01, q99, 6), np.inf]
display(EDGE_BINS)

In [36]:
EDGE_BINS = CR.df["regime_edge"].quantile([0, .2, .4, .6, .8, 1.0]).to_numpy()

In [None]:
# sanity check, counts should be somewhat evenly distributed
CR.df["edge_bucket"] = pd.cut(CR.df["regime_edge"], bins=EDGE_BINS, labels=None, include_lowest=True, right=True )

CR.group_stats(
    keys=["edge_bucket"],
    value_col="pnl",
    include_expectancy=False
)[["edge_bucket","count","mean","tail_ratio_q05"]]



In [None]:
CR.df["edge_bucket"].isna().mean()

#### PnL Regime Edge Over 0.6

In [38]:
CR.df["edge_ge_06"] = CR.df["regime_edge"] >= 0.8
EDGE_BINS = CR.df["regime_edge"].quantile([0, .2, .4, .6, .8, 1.0]).to_numpy()
CR.df["edge_bucket"] = pd.cut(CR.df["regime_edge"], bins=EDGE_BINS, labels=None, include_lowest=True, right=True )

CR.group_stats(
    keys=["edge_ge_06"],
    value_col="pnl",
    include_expectancy=False
)[["edge_ge_06","count","mean","win_rate","tail_ratio_q05"]]


In [None]:
#edge already considers time
# This table shows:
# Edge is NOT independent of time.
# Your regime edge already contains time information, but time still modulates risk.

edge_bucket_group_stats = CR.group_stats(
    keys=["time_bucket","edge_bucket"],
    value_col="pnl",
    include_expectancy=False
)[["time_bucket","edge_bucket","count","mean","tail_ratio_q05"]]

display(edge_bucket_group_stats)


In [None]:
#cleaner tables

# out = out[out["count"] > 0].sort_values(["time_bucket", "edge_bucket"])


#### 3rd Dimension - Cushion


CR.df["edge_bucket"] = pd.qcut(CR.df["regime_edge"], q=5, duplicates="drop")

CR.group_stats(
    keys=["edge_bucket","cushion_norm_bucket"],
    value_col="pnl",
    include_expectancy=False
)[["edge_bucket","cushion_norm_bucket","count","mean","tail_ratio_q05"]]


### Size Mult From Edge

In [None]:
CR.df["size_mult"] = pd.cut(
    CR.df["regime_edge"],
    bins=[CR.df["edge_bucket"]]
).astype(float)

CR.df["pnl_sized"] = CR.df["pnl"] * CR.df["size_mult"]

#### Edge Bands

In [None]:
CR.df["edge_band"] = pd.cut(
    CR.df["regime_edge"],
    bins=[-1, 0.78, 0.82, 10],
    labels=["skip", "cautious", "full"]
)

CR.group_stats(
    keys=["edge_band", "cushion_norm_bucket"],
    value_col="pnl",
    include_expectancy=False
)


### Create My Edge Scores Directly

In [None]:

import pandas as pd
import numpy as np

REGIME_SCORE = {
    ("0-30", pd.Interval(-np.inf, 15.0, closed="right")): 0.5,
    ("0-30", pd.Interval(15.0, 25.0, closed="right")): 0.4,

    ("30-90", pd.Interval(15.0, 25.0, closed="right")): 0.6,
    ("30-90", pd.Interval(25.0, 40.0, closed="right")): 0.6,

    ("90-180", pd.Interval(15.0, 25.0, closed="right")): 0.9,
    ("90-180", pd.Interval(25.0, 40.0, closed="right")): 0.85,

    ("180-300", pd.Interval(15.0, 25.0, closed="right")): 1.0,
    ("180-300", pd.Interval(25.0, 40.0, closed="right")): 0.95,

    ("300+", pd.Interval(15.0, 25.0, closed="right")): 0.7,
    ("300+", pd.Interval(25.0, 40.0, closed="right")): 0.7,
}

# 2) Convert to Series indexed by MultiIndex
score_s = pd.Series(REGIME_SCORE)

# 3) Map to trades using the (time_bucket, adx_bucket) multiindex
idx = pd.MultiIndex.from_frame(CR.df[["time_bucket", "adx_bucket"]])
CR.df["regime_score"] = idx.map(score_s).fillna(0.0)


CR.df["regime_score"] = CR.df.apply(
    lambda r: REGIME_SCORE.get(
        (r["time_bucket"], r["adx_bucket"]), 0.0
    ),
    axis=1
)

# CR.df["regime_edge"].value_counts()
# CR.df["regime_score"].value_counts()

# CR.df["regime_score"].describe()
CR.group_stats(
    keys=["regime_score"],
    value_col="pnl"
)


# out = CR.group_stats(keys=["time_bucket", "adx_bucket"], value_col="pnl",
#             include_expectancy=False)

# CR.as_multiindex(out, keys=["time_bucket", "adx_bucket"], cols=["count","mean","win_rate","tail_ratio_q05","expectancy","low_confidence"])



In [None]:
CR.df["regime_score"].value_counts().head(10)
CR.df.loc[CR.df["regime_score"] > 0, ["time_bucket","adx_bucket","regime_score"]].head(20)


### Misc
##### Check NaN On ADX

In [None]:
nan_adx = CR.df["adx_bucket"].isna().value_counts()
nan_adx_raw = CR.df["pos.position.technicals.current_adx"].isna()
display(nan_adx)
display(nan_adx_raw.value_counts())
CR.df.loc[
    CR.df["adx_bucket"].isna(),
    "pos.position.technicals.current_adx"
].describe()


##### Freeze “top regimes” into a dict

In [None]:
# CR.df["time_bucket"]
# example: 
top = regime[~regime["low_confidence"]].sort_values("edge_score", ascending=False).head(50)
REGIME_SCORE = {(r.time_bucket, r.adx_bucket): float(r.edge_score) for _, r in top.iterrows()}
display(REGIME_SCORE)

##### Trade Rules

In [None]:
def trade_allowed(edge_band, cushion_norm):
    if edge_band == "skip" and cushion_norm < 0.25:
        return False
    if edge_band == "cautious" and cushion_norm < 0.10:
        return False
    return True

def size_multiplier(edge_band, cushion_norm):
    if edge_band == "full":
        return 1.0 if cushion_norm >= 0.1 else 0.75
    if edge_band == "cautious":
        return 0.75 if cushion_norm >= 0.25 else 0.5
    return 0.5  # skip-band but wide cushion
