In [2]:
import json
import logging
import vectorbtpro as vbt
import nasdaqdatalink as ndl

from tree import Node

vbt.settings.set_theme("dark")
ndl.ApiConfig.api_key = "NDL_API_KEY_HERE"
vbt.NDLData.set_custom_settings(api_key="NDL_API_KEY_HERE")
logger = logging.getLogger(__name__)

In [3]:
def get_ndl_symbol(symbol, start=None, end=None, **kwargs):
    if start is not None:
        start = vbt.dt.to_tzaware_datetime(start, tz=vbt.dt.get_local_tz())
    if end is not None:
        end = vbt.dt.to_tzaware_datetime(end, tz=vbt.dt.get_local_tz())
    return ndl.get_table(
        "QUOTEMEDIA/PRICES",
        ticker=symbol,
        date={
            "gte": start,
            "lte": end,
        },
        **kwargs,
    )


class NDLTableData(vbt.Data):
    @classmethod
    def fetch_symbol(cls, symbol, **kwargs):
        df = get_ndl_symbol(symbol, **kwargs)
        df.set_index("date", inplace=True)
        df.rename(
            columns={
                "open": "Open",
                "high": "High",
                "low": "Low",
                "close": "Close",
                # "adjusted_close": "Adj Close",
                "volume": "Volume",
            },
            inplace=True,
        )
        df.sort_index(inplace=True)
        return df


def find_assets_with_ticker(data):
    tickers = set()

    def recurse_children(children):
        for child in children:
            if child.get("step") == "asset":
                if "ticker" in child:
                    tickers.add(child["ticker"])
            # TODO: handle lhs-val and non-static rhs-val for tickers
            if child.get("step") == "if-child" and child.get("is-else-condition?") is False:
                tickers.add(child.get("lhs-val"))
                is_fixed_value = child.get("rhs-fixed-value?", None) == True
                if not is_fixed_value:
                    tickers.add(child.get("rhs-val"))
            if "children" in child:
                recurse_children(child["children"])

    recurse_children(data["children"])
    return list(tickers)


def find_computable_operations(data):
    # (window, operation, ticker[s])
    ops = set()

    def recurse_children(children):
        for child in children:
            if (
                child.get("step") == "if-child"
                and child.get("is-else-condition?") is False
            ):
                lhs_window_days = child.get("lhs-window-days")
                if not lhs_window_days:
                    lhs_window_days = child.get("lhs-fn-params", {}).get("window")
                lhs_func = child.get("lhs-fn")
                lhs_val = child.get("lhs-val")
                ops.add((int(lhs_window_days or 0), lhs_func, lhs_val))
                is_fixed_value = child.get("rhs-fixed-value?", None) == True
                if not is_fixed_value:
                    rhs_window_days = child.get("rhs-window-days")
                    if not rhs_window_days:
                        rhs_window_days = child.get("rhs-fn-params", {}).get("window")
                    rhs_func = child.get("rhs-fn")
                    rhs_val = child.get("rhs-val")
                    ops.add((int(rhs_window_days or 0), rhs_func, rhs_val))
            if child.get("step") == "filter":
                assets = find_assets_with_ticker(child)
                for asset in assets:
                    ops.add(
                        (
                            int(child["sort-by-window-days"] or 0),
                            child["sort-by-fn"],
                            asset,
                        )
                    )
            if "children" in child:
                recurse_children(child["children"])

    recurse_children(data["children"])
    return list(ops)

In [4]:
with open("v1a_tqqq_or_not.json", "r") as f:
# with open("v2.2a_commander_bnd_monthly.json", "r") as f:
# with open("20d_bnd_vs_60d_sh.json", "r") as f:
    data = json.load(f)
assets = find_assets_with_ticker(data)
# Benchmark
if "SPY" not in assets:
    assets.append("SPY")
computable_ops = find_computable_operations(data)
rebalance_frequency = "1d" # Rebalance every day
# rebalance_frequency = "BM" # Rebalance every month

In [5]:
# Pull from upstream, and store in a local directory
# d = NDLTableData.pull(
#     assets,
#     start="2011-09-14",
#     # end="2023-12-31",
# )
# d.to_parquet("monthly", partition_by="1 month", mkdir_kwargs=dict(mkdir=True))

In [14]:
d = vbt.ParquetData.pull(
    [f"monthly/{asset}" for asset in assets],
    filters=[("group", ">=", "2015-01")],
)
d.select("BND").plot().show()

  0%|          | 0/11 [00:00<?, ?it/s]

In [7]:
class ComposerStrategy(object):
    def __init__(self, data, indicators, weight=100, starting_value=1):
        self.data = data
        self.indicators = indicators
        self.weight = weight
        self.account_value = starting_value
        self.root = None
        self.index_slice = None

    def _get_non_children(self, data):
        return {k: v for k, v in data.items() if k != "children"}

    def _get_normalized_func(self, step, prefix=""):
        normalized_func = step.replace("-", "_")
        requested_func = f"{prefix}{normalized_func}"
        func = getattr(self, requested_func, None)
        if not func:
            raise Exception(f"Unknown step {step} (requested func: {requested_func})")
        return func

    def _compare_values(self, lhs_val, comparator, rhs_val):
        # The arguments should always be numbers AFAIK
        logger.debug(f"Comparing values {lhs_val} {comparator} {rhs_val}")
        lhs_val = float(lhs_val)
        rhs_val = float(rhs_val)
        if comparator == "gt":
            return lhs_val > rhs_val
        elif comparator == "gte":
            return lhs_val >= rhs_val
        elif comparator == "lt":
            return lhs_val < rhs_val
        elif comparator == "lte":
            return lhs_val <= rhs_val
        elif comparator == "eq":
            return lhs_val == rhs_val
        else:
            raise Exception(f"Unknown comparator: {comparator}")

    def _find_all_tickers(self):
        tickers = set()

        def recurse_children(children):
            for child in children:
                if child.get("step") == "asset":
                    if "ticker" in child:
                        tickers.add(child["ticker"])
                elif child.get("step") == "if-child":
                    if "lhs-val" in child:
                        tickers.add(child["lhs-val"])
                if "children" in child:
                    recurse_children(child["children"])
            return list(tickers)

        return recurse_children(self.data["children"])

    def _to_days_operator(self, days: str):
        return f"{days}d"

    def cumulative_return(self, symbol, days):
        logger.debug(f"Get cumulative return for asset {symbol} over {days} days")
        return self.indicators[symbol][f"{days}d_cumret"][self.index_slice].iloc[-1]

    def relative_strength_index(self, symbol, days):
        logger.debug(f"Get RSI for asset {symbol} over {days} days")
        return self.indicators[symbol][f"{days}d_rsi"][self.index_slice].iloc[-1]

    def current_price(self, symbol, _):
        logger.debug(f"Get current price for asset {symbol}")
        return self.indicators[symbol][f"current_price"][self.index_slice].iloc[-1]

    def moving_average_price(self, symbol, days):
        logger.debug(f"Get moving average price for asset {symbol} over {days} days")
        return self.indicators[symbol][f"{days}d_sma"][self.index_slice].iloc[-1]

    def moving_average_return(self, symbol, days):
        logger.debug(f"Get moving average return for asset {symbol} over {days} days")
        return self.indicators[symbol][f"{days}d_smar"][self.index_slice].iloc[-1]

    def max_drawdown(self, symbol, days):
        logger.debug(f"Get max drawdown for asset {symbol} over {days} days")
        return self.indicators[symbol][f"{days}d_max_dd"][self.index_slice].iloc[-1]

    def exponential_moving_average_price(self, symbol, days):
        logger.debug(
            f"Get exponential moving average price for asset {symbol} over {days} days"
        )
        return self.indicators[symbol][f"{days}d_ema"][self.index_slice].iloc[-1]

    def standard_deviation_price(self, symbol, days):
        logger.debug(
            f"Get standard deviation price for asset {symbol} over {days} days"
        )
        return self.indicators[symbol][f"{days}d_stddev"][self.index_slice].iloc[-1]

    def standard_deviation_return(self, symbol, days):
        logger.debug(
            f"Get standard deviation return for asset {symbol} over {days} days"
        )
        return self.indicators[symbol][f"{days}d_stddev_ret"][self.index_slice].iloc[-1]

    def top(self, select_n, children):
        return children[:select_n]

    def bottom(self, select_n, children):
        return children[-select_n:]

    def step_if(self, args, node: Node):
        # There should be two children
        logger.debug(f"Parsing if: {args.get('id')}")
        if len(args.get("children")) != 2:
            raise Exception(f"Expected 2 children, got {len(args.get('children'))}")

        children = args.get("children")
        if_index = 0 if children[0].get("is-else-condition?", None) == False else 1
        if_branch = children[if_index]
        else_branch = children[1 - if_index]
        # if lhs-fn(lhs-val, lhs-window-days) comparator rhs-fn(rhs-val, rhs-window-days)
        # exception: if rhs-fixed-value? == true, then
        # if lhs-fn(lhs-val, lhs-window-days) comparator rhs-val
        lhs_func = self._get_normalized_func(if_branch.get("lhs-fn"))
        comparator = if_branch.get("comparator")
        # TODO: are there other possible args?
        is_fixed_value = if_branch.get("rhs-fixed-value?", None) == True
        # If rhs has a fixed value, it'll be a number
        # Otherwise it will be an asset
        if not is_fixed_value:
            rhs_func = self._get_normalized_func(if_branch.get("rhs-fn"))
            rhs_days = if_branch.get("rhs-window-days")
            if not rhs_days:
                rhs_days = if_branch.get("rhs-fn-params", {}).get("window")
            rhs_asset = if_branch.get("rhs-val")
            rhs_val = rhs_func(rhs_asset, rhs_days)
        else:
            rhs_val = if_branch.get("rhs-val")
        lhs_days = if_branch.get("lhs-window-days")
        if not lhs_days:
            lhs_days = if_branch.get("lhs-fn-params", {}).get("window")
        lhs_asset = if_branch.get("lhs-val")
        lhs_val = lhs_func(lhs_asset, lhs_days)
        n = Node(args.get("id"))
        node.add_child(n)
        # True means take the then branch, False means take the else branch
        if self._compare_values(lhs_val, comparator, rhs_val):
            logger.debug("Taking the then branch")
            return self.run_children(if_branch, n)
        else:
            logger.debug("Taking the else branch")
            return self.run_children(else_branch, n)

    def step_filter(self, args, node: Node):
        # {
        #     "id": "725e7c0e-7c8d-46ad-9e03-43b9b127b59d",
        #     "select-fn": "top",
        #     "select-n": "1",
        #     "select?": True,
        #     "sort-by-fn": "cumulative-return",
        #     "sort-by-window-days": "10",
        #     "sort-by?": True,
        #     "step": "filter",
        # }
        select = args.get("select?", None)
        sort = args.get("sort-by?", None)
        if sort == True:
            logger.debug(f"Sort children by {args.get('sort-by-fn')}")
            sort_function = self._get_normalized_func(args.get("sort-by-fn"))
            sort_window_days = args.get("sort-by-window-days")
            args["children"] = sorted(
                args.get("children"),
                key=lambda x: sort_function(x["ticker"], sort_window_days),
            )
        if select == True:
            select_function = self._get_normalized_func(args.get("select-fn"))
            select_n = int(args.get("select-n"))
            logger.debug(f"Select {args.get('select-fn')} {select_n} children")
            args["children"] = select_function(select_n, args.get("children"))
        return self.run_children(args, node)

    def step_if_child(self, args, node: Node):
        # if-child should just be is-else-condition? and children
        logger.debug(f"Parsing if-child: {args.get('id')}")
        # n = Node(args.get("id"))
        # node.add_child(n)
        return self.run_children(args, node)

    def step_group(self, args, node: Node):
        # We can basically ignore groups, unless we want to bubble up naming
        logger.debug(f"Parsing Group: {args.get('name')}")
        n = Node(args.get("id"), friendly_name=args.get("name"))
        node.add_child(n)
        return self.run_children(args, n)

    def step_wt_cash_specified(self, args, node: Node):
        logger.debug(
            f"Weight the following children with specified weights ({args.get('id')})"
        )
        # Specified weights are in the children themselves as:
        # "weight": {
        #     "num": "60",
        #     "den": 100
        # }
        n = Node(args["id"], weight_strategy="specified")
        weights = {}
        for child in args.get("children"):
            weights[child.get("id")] = float(child.get("weight", {}).get("num", 0))
        n.set_specified_weights(weights)
        node.add_child(n)
        return self.run_children(args, n)

    def step_wt_cash_equal(self, args, node: Node):
        logger.debug(f"Weight the following children equally ({args.get('id')})")
        n = Node(args["id"], weight_strategy="equal")
        node.add_child(n)
        return self.run_children(args, n)

    def step_wt_inverse_vol(self, args, node: Node):
        days = args.get("window-days")
        logger.debug(
            f"Weight the following children with by their inverse volatility over the last {days} days"
        )
        n = Node(args["id"], weight_strategy="inverse-volatility", window_days=days)
        node.add_child(n)
        return self.run_children(args, n)

    def step_asset(self, args, node: Node):
        return node.add_child(Node(args.get("ticker")))

    def step_root(self, args, node: Node):
        self.id = args.get("id")
        self.name = args.get("name")
        self.description = args.get("description")
        self.rebalance = args.get("rebalance")
        self.rebalance_corridor_width = args.get("rebalance-corridor-width")

        logger.debug(f"ID: {self.id}")
        logger.debug(f"Name: {self.name}")
        # logger.debug(f"Description: {description}")
        logger.debug(f"Rebalance: {self.rebalance}")
        logger.debug(f"Rebalance Corridor Width: {self.rebalance_corridor_width}")
        # Set up root node
        self.root = Node(
            self.name, weight=self.weight, account_value=self.account_value
        )
        return self.run_children(args, self.root)

    def get_asset_allocations(self, node: Node):
        asset_allocations = {}
        if node.children:  # Node is not a leaf
            for child in node.children:
                child_allocations = self.get_asset_allocations(child)
                for asset, amount in child_allocations.items():
                    asset_allocations[asset] = (
                        asset_allocations.get(asset, 0.0) + amount
                    )
        else:  # Node is a leaf
            asset_allocations[node.name] = node.account_value
        return asset_allocations

    def parse(self, index_slice):
        self.index_slice = index_slice
        self.next(self.data)

    # Run all children
    def run_children(self, args, node: Node):
        children = args.get("children")
        if not children:
            raise Exception("No children. Check your strategy definition.")
        # Process the children, allow them to self-organize their structure
        for child in children:
            self.next(child, node)
        # Apply the strategy after the child-processing is complete
        node.apply_strategy()

    # Get the next step and execute it
    def next(self, data, node: Node = None):
        step = data.get("step")
        if not step:
            raise Exception("No step specified")

        if logger.getEffectiveLevel == logging.DEBUG:
            nca = self._get_non_children(data)
            logger.debug(f"Step: {step}, Args: {nca}")

        step_func = self._get_normalized_func(step, prefix="step_")
        return step_func(data, node)

In [8]:
import pandas as pd


# If the 10D RSI of TQQQ is greater than 79%, we're overbought.
# Weight a small short and go safety on. This would normally have VIX in it, but our QUOTEMEDIA sub doesn't cover BATS.
# tqqq_rsi = vbt.RSI.run(d.select("TQQQ").close, window=10).rsi
# overbought = tqqq_rsi.rsi > 79
close = d.get("adj_close")
opt_arg = {
    "symbols": close.columns.tolist(),
    "close": close,
}
for op in computable_ops:
    window, func, asset = op
    # if isinstance(asset, list):
    #     raise NotImplementedError("asset lists not supported yet")
    if opt_arg.get(asset) is None:
        opt_arg[asset] = {}
    if func == "relative-strength-index":
        print(f"running {window}d RSI for {asset}")
        opt_arg[asset][f"{window}d_rsi"] = vbt.RSI.run(
            d.select(asset).get("adj_close"),
            window=window,
        ).rsi
    elif func == "cumulative-return":
        print(f"running {window}d Cumulative Return for {asset}")
        ret_acc = pd.Series.vbt.returns.from_value(
            d.select(asset).get("adj_close"),
            freq="d",
        )
        opt_arg[asset][f"{window}d_cumret"] = ret_acc.rolling_total(window=window) * 100
    elif func == "max-drawdown":
        print(f"running {window}d Max Drawdown for {asset}")
        ret_acc = pd.Series.vbt.returns.from_value(
            d.select(asset).get("adj_close"),
            freq="d",
        )
        opt_arg[asset][f"{window}d_max_dd"] = (
            abs(ret_acc.rolling_max_drawdown(window=window)) * 100
        )
    elif func == "current-price":
        # The current price will just be the close price
        print(f"running Current Price for {asset}")
        opt_arg[asset]["current_price"] = d.select(asset).get("adj_close")
    elif func == "moving-average-price":
        print(f"running {window}d Moving Average Price for {asset}")
        opt_arg[asset][f"{window}d_sma"] = (
            d.select(asset).get("adj_close").vbt.rolling_mean(window)
        )
    elif func == "moving-average-return":
        print(f"running {window}d Moving Average Return for {asset}")
        ret_acc = pd.Series.vbt.returns.from_value(
            d.select(asset).get("adj_close"),
            freq="d",
        )
        opt_arg[asset][f"{window}d_smar"] = (
            ret_acc.rolling_mean(window=window) * 100
        )
    else:
        raise NotImplementedError(f"func {func} not supported yet")

running 6d Cumulative Return for TQQQ
running 45d RSI for SPY
running 1d Cumulative Return for TQQQ
running 10d Max Drawdown for TMF
running 10d Max Drawdown for QQQ
running Current Price for QQQ
running 45d RSI for BND
running 10d RSI for TQQQ
running 200d RSI for IEF
running 25d Moving Average Price for QQQ
running 200d RSI for TLT
running 60d RSI for SPY


In [15]:
# You only get the number of columns
# def allocate_func(n_cols):
#     weights = np.random.uniform(size=symbol_wrapper.shape[1])
#     return weights / weights.sum()


def weight_equal(assets: list):
    weight = 1 / len(assets)
    return {asset: weight for asset in assets}


def weights_to_vector(assets, weights: dict):
    return [weights.get(asset, 0.0) for asset in assets]


def optimize_func(args, strategy: ComposerStrategy, index_slice):
    strategy.parse(index_slice)
    return weights_to_vector(
        args["symbols"], strategy.get_asset_allocations(strategy.root)
    )


strategy = ComposerStrategy(data, opt_arg)
logger.setLevel(logging.INFO)
# logger.setLevel(logging.DEBUG)
pfo = vbt.PortfolioOptimizer.from_optimize_func(
    close.vbt.wrapper,
    optimize_func,
    opt_arg,
    strategy,
    vbt.Rep("index_slice"),
    every=rebalance_frequency,
    start="2015-01-01",
    # end="2020-01-01",
    alloc_wait=0,
)
print(pfo.allocations.tail(120))
# pfo.plot().show()
# Simulate from the optimizer
pf = vbt.Portfolio.from_optimizer(
    close,
    pfo,
    freq="1d",
    bm_close=d.select("SPY").close,
)
print(pf.stats())
print(pf.annualized_return)
pf.plot().show()

symbol                     SPY  SOXL  TQQQ  PSQ  BIL  BND  IEF  BTAL  TLT  \
date                                                                        
2023-10-04 00:00:00+00:00  0.0   0.0   0.0  0.0  1.0  0.0  0.0   0.0  0.0   
2023-10-05 00:00:00+00:00  0.0   0.0   0.0  0.0  1.0  0.0  0.0   0.0  0.0   
2023-10-06 00:00:00+00:00  0.0   0.0   0.0  0.0  1.0  0.0  0.0   0.0  0.0   
2023-10-09 00:00:00+00:00  0.0   0.0   0.0  0.0  1.0  0.0  0.0   0.0  0.0   
2023-10-10 00:00:00+00:00  0.0   0.0   0.0  0.0  1.0  0.0  0.0   0.0  0.0   
...                        ...   ...   ...  ...  ...  ...  ...   ...  ...   
2024-03-20 00:00:00+00:00  0.0   0.0   0.0  0.0  1.0  0.0  0.0   0.0  0.0   
2024-03-21 00:00:00+00:00  0.0   0.0   0.0  0.0  1.0  0.0  0.0   0.0  0.0   
2024-03-22 00:00:00+00:00  0.0   0.0   0.0  0.0  1.0  0.0  0.0   0.0  0.0   
2024-03-25 00:00:00+00:00  0.0   0.0   0.0  0.0  1.0  0.0  0.0   0.0  0.0   
2024-03-26 00:00:00+00:00  0.0   0.0   0.0  0.0  1.0  0.0  0.0   0.0  0.0   


Subplot 'orders' does not support grouped data


Subplot 'trade_pnl' does not support grouped data

