In [None]:
import datetime as dt
import logging
import sys

import polars as pl

from backtest_lib.backtest import Backtest
from backtest_lib.examples.get_sp500_historical import get_sp500_market_view
from backtest_lib.market import MarketView
from backtest_lib.market.polars_impl import PolarsPastView, PolarsUniverseMapping
from backtest_lib.portfolio import WeightedPortfolio

logging.basicConfig(
    level=logging.INFO,
    stream=sys.stdout,
    format="[%(levelname)s] %(name)s: %(message)s",
    force=True,
)

market_view = get_sp500_market_view(
    start=dt.datetime.fromisoformat("2020-01-03"),
)
# market_view = pkl.load(open("2010-sp500-market-view.pkl", "rb"))
# market_view = market_view.truncated_to(500)


universe = tuple(market_view.prices.close.securities)

In [None]:
from backtest_lib.portfolio import uniform_portfolio
from backtest_lib.engine.decision import target_weights

pf = uniform_portfolio(universe, value=100000, backend="polars")


def index_strategy(universe, current_portfolio, market, ctx):
    tradable = [x for x, y in market.tradable.by_period[-1].items() if y]
    even_portfolio = uniform_portfolio(
        universe,
        tradable,
        value=current_portfolio.total_value,
        backend=current_portfolio._backend,
    )
    return target_weights(even_portfolio.holdings)


df = market_view.prices.close.by_security.as_df()
cols = df.columns
numeric_cols = [col for col in cols if df.get_column(col).dtype.is_numeric()]
ewma = (
    df.with_columns(
        pl.col(col).ewm_mean(com=2).alias(f"{col}_EWM") for col in numeric_cols
    )
    .with_columns((pl.col(col) / pl.col(f"{col}_EWM")) for col in numeric_cols)
    .select(cols)
)

ewma_past_view = PolarsPastView.from_dataframe(ewma)

market_view = MarketView(
    prices=market_view.prices,
    tradable=market_view.tradable,
    signals={"ewma_ratio": ewma_past_view},
)


def ewma_strategy(universe, current_portfolio, market, ctx):
    tradable_today = market.tradable.by_period[-1]
    latest_ewma_ratio = (
        market.signals["ewma_ratio"]
        .by_security[[sec for sec in universe if tradable_today[sec] is True]]
        .by_period[-1]
    )

    avg_ewma_ratio = latest_ewma_ratio.mean()
    norm_ewma_ratio = latest_ewma_ratio / avg_ewma_ratio
    weights = norm_ewma_ratio / len(norm_ewma_ratio)
    return target_weights(weights)


benchmark_backtest = Backtest(
    universe=universe,
    strategy=index_strategy,
    market_view=market_view,
    initial_portfolio=pf,
)

ewma_backtest = Backtest(
    universe=universe,
    strategy=ewma_strategy,
    market_view=market_view,
    initial_portfolio=pf,
)

In [None]:
print(
    all(
        x == 0
        for x in ewma_backtest.market_view.prices.close.by_security.as_df().row(0)
    ),
)
ewma_results = ewma_backtest.run()

In [None]:
print(f"total return: {(ewma_results.total_return - 1) * 100:.2f}%")
sorted_weights = sorted(
    ewma_backtest._current_portfolio.holdings.items(),
    key=lambda x: x[1],
    reverse=True,
)
sorted_weights

In [None]:
results = benchmark_backtest.run()
print(f"total return: {(results.total_return - 1) * 100:.2f}%")

# what a strategy

In [None]:
len(market_view.tradable.by_period)

In [None]:
import altair as alt

final_allocations = (
    ewma_results.weights.by_period[-1:]
    .by_security.as_df(show_periods=False)
    .lazy()
    .unpivot()
    .filter(pl.col("value") > 0)
    .collect()
)
alt.Chart(final_allocations).mark_bar().encode(
    x="variable",
    y=alt.Y(
        "value",
        scale=alt.Scale(
            domain=[
                final_allocations["value"].min() * 0.99,
                final_allocations["value"].max() * 1.01,
            ],
            clamp=True,
        ),
    ),
)

In [None]:
final_allocations_series = ewma_results.weights.by_period[-1]
final_allocations = pl.DataFrame(
    data={
        "security": final_allocations_series.names,
        "value": final_allocations_series.to_series(),
    },
).filter(pl.col("value") > 0)

alt.Chart(final_allocations).mark_bar().encode(
    x="security",
    y=alt.Y(
        "value",
        scale=alt.Scale(
            domain=[
                final_allocations["value"].min() * 0.99,
                final_allocations["value"].max() * 1.01,
            ],
            clamp=True,
        ),
    ),
)

In [None]:
nvda_pnl = list(
    (results.asset_returns.by_security["NVDA"].to_series() + 1)
    .cum_prod()
    .rolling_mean(10, min_samples=10),
)

print(nvda_pnl)

nvda_pnl_values = [x for x in nvda_pnl if x is not None]

print(nvda_pnl_values)

dates = ewma_backtest.market_view.periods.array[1:]

alt.Chart(pl.DataFrame({"pnl": nvda_pnl, "date": dates})).mark_line().encode(
    x="date",
    y=alt.Y(
        "pnl",
        scale=alt.Scale(
            domain=[min(nvda_pnl_values) * 0.98, max(nvda_pnl_values) * 1.02],
            clamp=True,
        ),
    ),
)

In [None]:
print(results.market.prices.close.by_security["NVDA"][0])
print(results.market.prices.close.periods[0])
results.market.prices.close.by_security["NVDA"][-1]