In [1]:
from typing import Optional, Union, Dict, Any

import polars as pl

from vnpy.app.factor_maker.template import FactorTemplate
from vnpy.trader.constant import Interval
from vnpy.app.factor_maker.base import FactorMode

LOG: update SETTINGS from vt_setting.json


In [2]:
class OPEN(FactorTemplate):
    factor_name = 'open'
    dependencies_factor = []
    freq = Interval.MINUTE
    factor_mode = FactorMode.Backtest

    def __init__(self, setting, **kwargs):
        """
        Initialize the OPEN factor with its settings.
        """
        super().__init__(setting, **kwargs)

    def __init_dependencies__(self):
        """
        Define dependencies for the OPEN factor (none in this case).
        """
        pass

    def calculate(self, input_data: Optional[Union[pl.DataFrame, Dict[str, Any]]], memory: Optional[pl.DataFrame] = None, *args, **kwargs) -> Any:
        """
        Return the 'open' data for Live Trading or Backtesting.

        Parameters:
            input_data (Optional[Union[pl.DataFrame, Dict[str, Any]]]): Input data containing 'open'.
            memory (Optional[pl.DataFrame]): Unused for this factor but kept for uniformity.

        Returns:
            pl.DataFrame: Open price data.
        """
        # Validate factor_mode
        if self.factor_mode not in [FactorMode.Backtest, FactorMode.Live]:
            raise ValueError("Invalid factor_mode. Must be 'Backtest' or 'Live'.")

        # Retrieve the 'open' data
        if isinstance(input_data, dict):
            open_data = input_data.get('open')
        elif isinstance(input_data, pl.DataFrame):
            open_data = input_data
        else:
            raise ValueError("Invalid input_data format. Expected pl.DataFrame or Dict[str, pl.DataFrame].")

        # Ensure the data is a Polars DataFrame
        if not isinstance(open_data, pl.DataFrame):
            raise ValueError("'open' data must be a Polars DataFrame.")

        return open_data
    def calculate_polars(self, input_data: pl.DataFrame, *args, **kwargs) -> Any:
        pass

In [3]:
class VOLUME(FactorTemplate):
    factor_name = 'volume'
    dependencies_factor = []
    freq = Interval.MINUTE
    factor_mode = FactorMode.Backtest

    def __init__(self, setting, **kwargs):
        """
        Initialize the VOLUME factor with its settings.
        """
        super().__init__(setting, **kwargs)

    def __init_dependencies__(self):
        """
        Define dependencies for the VOLUME factor (none in this case).
        """
        pass

    def calculate(self, input_data: Optional[Union[pl.DataFrame, Dict[str, Any]]], memory: Optional[pl.DataFrame] = None, *args, **kwargs) -> Any:
        """
        Return the 'volume' data for Live Trading or Backtesting.

        Parameters:
            input_data (Optional[Union[pl.DataFrame, Dict[str, Any]]]): Input data containing 'volume'.
            memory (Optional[pl.DataFrame]): Unused for this factor but kept for uniformity.

        Returns:
            pl.DataFrame: Volume data.
        """
        # Validate factor_mode
        if self.factor_mode not in [FactorMode.Backtest, FactorMode.Live]:
            raise ValueError("Invalid factor_mode. Must be 'Backtest' or 'Live'.")

        # Retrieve the 'volume' data
        if isinstance(input_data, dict):
            volume_data = input_data.get('volume')
        elif isinstance(input_data, pl.DataFrame):
            volume_data = input_data
        else:
            raise ValueError("Invalid input_data format. Expected pl.DataFrame or Dict[str, pl.DataFrame].")

        # Ensure the data is a Polars DataFrame
        if not isinstance(volume_data, pl.DataFrame):
            raise ValueError("'volume' data must be a Polars DataFrame.")

        return volume_data
    
    def calculate_polars(self, input_data: pl.DataFrame, *args, **kwargs) -> Any:
        pass

In [4]:
class MA(FactorTemplate):
    factor_name = 'ma'
    dependencies_factor = []
    freq = Interval.MINUTE
    factor_mode = FactorMode.Backtest

    def __init__(self, setting, window: int = None):
        """
        Initialize the MA factor with its settings and rolling window size.
        """
        super().__init__(setting, window=window)

    def __init_dependencies__(self):
        """
        Define dependencies for the MA factor.
        """
        # self.vwap = VWAP({}, window=20)
        self.target=f_class(***)
        setattr(self, 'dependencies_factor', [self.vwap])

    def calculate(self, input_data: Optional[Union[pl.DataFrame, Dict[str, pl.DataFrame]]], memory: Optional[pl.DataFrame] = None, *args, **kwargs) -> Any:
        """
        Calculate the rolling mean for all columns in the input data for Live Trading or Backtesting.

        Parameters:
            input_data (Optional[Union[pl.DataFrame, Dict[str, pl.DataFrame]]]): Input data with symbols as columns.
            memory (Optional[pl.DataFrame]): Current factor memory (used in Live Trading mode).

        Returns:
            pl.DataFrame: DataFrame with the rolling mean for each symbol.
        """
        # Validate factor_mode
        if self.factor_mode not in [FactorMode.Backtest, FactorMode.Live]:
            raise ValueError("Invalid factor_mode. Must be 'Backtest' or 'Live'.")

        # Retrieve input data
        if isinstance(input_data, dict):
            df = input_data.get(self.vwap.factor_key)
        elif isinstance(input_data, pl.DataFrame):
            df = input_data
        else:
            raise ValueError("Invalid input_data format. Expected pl.DataFrame or Dict[str, pl.DataFrame].")

        # Ensure the input data is a Polars DataFrame
        if not isinstance(df, pl.DataFrame):
            raise ValueError("Input data must be a Polars DataFrame.")

        # Get the rolling window size
        window = self.params.get_parameter('window')
        if window is None:
            raise ValueError("The rolling window size (window) is not set.")

        if self.factor_mode == FactorMode.Live:
            # Live Mode: Ensure memory is provided
            if memory is None:
                raise ValueError("Memory must be provided in 'Live' mode.")

            # Get tail(window) of the input data
            latest_data = df.tail(window)

            # Drop the datetime column for calculation
            datetime_col = latest_data["datetime"].tail(1)  # Latest datetime
            latest_data = latest_data.drop("datetime")

            # Calculate rolling mean for the latest row
            rolling_means = latest_data.mean(axis=0)

            # Create a new row for the memory update
            new_row = pl.DataFrame({
                "datetime": datetime_col,
                **{col: [rolling_means[col]] for col in latest_data.columns}
            })

            # Update memory
            memory = pl.concat([memory, new_row], how="vertical")
            return memory

        elif self.factor_mode == FactorMode.Backtest:
            # Backtesting Mode: Perform calculations on the entire dataset
            # Preserve datetime column
            datetime_col = None
            if "datetime" in df.columns:
                datetime_col = df["datetime"]
                df = df.drop("datetime")

            # Calculate rolling mean for all columns except datetime
            columns_to_aggregate = [col for col in df.columns if col != 'datetime']
            rolling_means = df.select([
                pl.col(col).rolling_mean(window).alias(col) for col in columns_to_aggregate
            ])

            # Add datetime column back to the result if it exists
            if datetime_col is not None:
                rolling_means = rolling_means.insert_column(0, datetime_col)

            return rolling_means

In [5]:
class MACD(FactorTemplate):
    factor_name = 'macd'
    dependencies_factor = []
    freq = Interval.MINUTE
    factor_mode = FactorMode.Backtest

    def __init__(self, setting, fast_period: int = None, slow_period: int = None, signal_period: int = None):
        """
        Initialize the MACD factor with settings and periods.
        """
        super().__init__(setting=setting, fast_period=fast_period, slow_period=slow_period, signal_period=signal_period)

    def __init_dependencies__(self):
        """
        Define dependencies for the MACD factor.
        """
        self.ma_fast = MA({}, self.params.get_parameter('fast_period'))
        self.ma_slow = MA({}, self.params.get_parameter('slow_period'))
        self.ma_fast.factor_mode = self.factor_mode
        self.ma_slow.factor_mode = self.factor_mode
        setattr(self, 'dependencies_factor', [self.ma_fast, self.ma_slow])

    def calculate(self, input_data: Optional[Dict[str, pl.DataFrame]], memory: Optional[pl.DataFrame] = None, *args, **kwargs) -> pl.DataFrame:
        """
        Calculate MACD histogram for Live Trading or Backtesting.

        Parameters:
            input_data (Optional[Dict[str, pl.DataFrame]]): Input data containing pre-calculated MA fast and MA slow.
            memory (Optional[pl.DataFrame]): Memory for Live Trading mode.
            factor_mode (str): The mode of calculation ('Backtest' or 'Live').

        Returns:
            pl.DataFrame: Updated MACD histogram DataFrame with datetime column preserved.
        """
        # Validate factor_mode
        if self.factor_mode not in [FactorMode.Backtest, FactorMode.Live]:
            raise ValueError("Invalid factor_mode. Must be 'Backtest' or 'Live'.")

        # Retrieve pre-calculated moving averages
        ma_fast = input_data.get(self.ma_fast.factor_key)
        ma_slow = input_data.get(self.ma_slow.factor_key)

        if ma_fast is None or ma_slow is None:
            raise ValueError("Missing required moving averages (ma_fast or ma_slow) in input_data.")

        # Ensure moving averages are Polars DataFrames
        if not isinstance(ma_fast, pl.DataFrame) or not isinstance(ma_slow, pl.DataFrame):
            raise ValueError("ma_fast and ma_slow must be Polars DataFrames.")

        if self.factor_mode == FactorMode.Live:
            # Live Mode: Ensure memory is provided
            if memory is None:
                raise ValueError("Memory must be provided in 'Live' mode.")

            # Get tail(signal_period) of input data
            ma_fast_tail = ma_fast.tail(self.params.get_parameter('signal_period'))
            ma_slow_tail = ma_slow.tail(self.params.get_parameter('signal_period'))

            # Drop datetime column for calculation
            datetime_col = ma_fast_tail["datetime"].tail(1)  # Latest datetime
            ma_fast_tail = ma_fast_tail.drop("datetime")
            ma_slow_tail = ma_slow_tail.drop("datetime")

            # Calculate MACD line and signal line
            macd_line = ma_fast_tail - ma_slow_tail
            signal_line = macd_line.mean(axis=0)  # Use the mean of macd_line as signal line

            # Calculate histogram using the last row of macd_line - signal_line
            last_histogram = (macd_line.tail(1) - signal_line).to_dict(as_series=False)

            # Append the latest histogram to memory
            new_row = pl.DataFrame({
                "datetime": datetime_col,
                **{col: [last_histogram[col]] for col in macd_line.columns},
            })
            memory = pl.concat([memory, new_row], how="vertical")
            return memory

        elif self.factor_mode == FactorMode.Backtest:
            # Backtesting Mode: Perform calculations on the entire dataset
            # Preserve datetime column
            datetime_col = None
            if "datetime" in ma_fast.columns and "datetime" in ma_slow.columns:
                datetime_col = ma_fast["datetime"]
                ma_fast = ma_fast.drop("datetime")
                ma_slow = ma_slow.drop("datetime")

            # Calculate MACD line
            macd_line = ma_fast - ma_slow

            # Calculate Signal line using rolling mean of the MACD line
            signal_line = macd_line.select([
                pl.col(col).rolling_mean(self.params.get_parameter('signal_period')).alias(col)
                for col in macd_line.columns
            ])

            # Calculate Histogram (MACD line - Signal line)
            histogram = macd_line - signal_line

            # Add datetime column back to the histogram
            if datetime_col is not None:
                histogram = histogram.insert_column(0, datetime_col)

            return histogram

In [6]:
class VWAP(FactorTemplate):
    factor_name = 'vwap'
    dependencies_factor = []
    freq = Interval.MINUTE
    factor_mode = FactorMode.Backtest

    def __init__(self, setting, window: int = None):
        """
        Initialize VWAP with its settings and rolling window size.
        """
        super().__init__(setting, window=window)

    def __init_dependencies__(self):
        """
        Define dependencies for the VWAP factor.
        """
        self.open = OPEN({})
        self.volume = VOLUME({})
        self.open.factor_mode = self.factor_mode
        self.volume.factor_mode = self.factor_mode
        setattr(self, 'dependencies_factor', [self.open, self.volume])

    def calculate(self, input_data: Dict[str, Any], memory: Optional[pl.DataFrame] = None, *args, **kwargs) -> pl.DataFrame:
        """
        Calculate the rolling VWAP (Volume Weighted Average Price) for Live Trading or Backtesting.

        Parameters:
            input_data (Dict[str, Any]): Input bar_data with keys for 'open' and 'volume' from dependent factors.
            memory (Optional[pl.DataFrame]): Current factor memory (used in Live Trading mode).

        Returns:
            pl.DataFrame: Updated VWAP DataFrame.
        """
        # Validate factor_mode
        if self.factor_mode not in [FactorMode.Backtest, FactorMode.Live]:
            raise ValueError("Invalid factor_mode. Must be 'Backtest' or 'Live'.")

        # Retrieve open and volume data
        open_prices = input_data.get(self.open.factor_key)
        volumes = input_data.get(self.volume.factor_key)

        # Ensure both are Polars DataFrames
        if not isinstance(open_prices, pl.DataFrame) or not isinstance(volumes, pl.DataFrame):
            raise ValueError("Both open and volume bar_data must be Polars DataFrames.")

        # Check for rolling window
        window = self.params.get_parameter('window')
        if window is None:
            raise ValueError("The rolling window size (window) is not set.")

        if self.factor_mode == FactorMode.Live:
            # Live Mode: Ensure memory is provided
            if memory is None:
                raise ValueError("Memory must be provided in 'Live' mode.")

            # Get tail(window) of the input data
            open_tail = open_prices.tail(window)
            volume_tail = volumes.tail(window)

            # Drop the datetime column for calculation
            datetime_col = open_tail["datetime"].tail(1)  # Latest datetime
            open_tail = open_tail.drop("datetime")
            volume_tail = volume_tail.drop("datetime")

            # Calculate VWAP for the latest row
            weighted_prices = open_tail * volume_tail
            sum_weighted = weighted_prices.sum(axis=0)
            sum_volume = volume_tail.sum(axis=0)
            latest_vwap = sum_weighted / sum_volume

            # Append the latest VWAP to memory
            new_row = pl.DataFrame({
                "datetime": datetime_col,
                **{col: [latest_vwap[col]] for col in latest_vwap.columns}
            })
            memory = pl.concat([memory, new_row], how="vertical")
            return memory

        elif self.factor_mode == FactorMode.Backtest:
            # Backtesting Mode: Perform calculations on the entire dataset
            # Preserve the datetime column
            datetime_col = None
            if "datetime" in open_prices.columns and "datetime" in volumes.columns:
                datetime_col = open_prices["datetime"]
                open_prices = open_prices.drop("datetime")
                volumes = volumes.drop("datetime")

            # Calculate VWAP: rolling sum(open * volume) / rolling sum(volume)
            weighted_prices = open_prices * volumes
            rolling_sum_weighted = weighted_prices.select([
                pl.col(col).rolling_sum(window).alias(col) for col in weighted_prices.columns
            ])
            rolling_sum_volume = volumes.select([
                pl.col(col).rolling_sum(window).alias(col) for col in volumes.columns
            ])
            vwap = rolling_sum_weighted / rolling_sum_volume

            # Add the datetime column back to the result if it exists
            if datetime_col is not None:
                vwap = vwap.insert_column(0, datetime_col)

            return vwap

In [7]:
macd = MACD({}, fast_period=5, slow_period=20, signal_period=5)

In [8]:
import numpy as np
import polars as pl
import pandas as pd

# Step 1: Generate Open Data (Simulated Price Data)
date_range = pd.date_range("2024-01-01", periods=200, freq="1min")
raw_data = {
    "open": pl.DataFrame({
        "datetime": date_range,
        "AAPL": np.random.uniform(150, 155, size=200),
        "MSFT": np.random.uniform(300, 305, size=200),
        "GOOG": np.random.uniform(2800, 2810, size=200),
    }),
    "high": pl.DataFrame({
        "datetime": date_range,
        "AAPL": np.random.uniform(155, 160, size=200),
        "MSFT": np.random.uniform(305, 310, size=200),
        "GOOG": np.random.uniform(2810, 2820, size=200),
    }),
    "low": pl.DataFrame({
        "datetime": date_range,
        "AAPL": np.random.uniform(145, 150, size=200),
        "MSFT": np.random.uniform(295, 300, size=200),
        "GOOG": np.random.uniform(2790, 2800, size=200),
    }),
    "close": pl.DataFrame({
        "datetime": date_range,
        "AAPL": np.random.uniform(150, 155, size=200),
        "MSFT": np.random.uniform(300, 305, size=200),
        "GOOG": np.random.uniform(2800, 2810, size=200),
    }),
    "volume": pl.DataFrame({
        "datetime": date_range,
        "AAPL": np.random.randint(1000, 2000, size=200),
        "MSFT": np.random.randint(1000, 2000, size=200),
        "GOOG": np.random.randint(1000, 2000, size=200),
    }),
}

In [9]:
from vnpy.app.factor_maker.backtesting import FactorBacktester
from vnpy.app.factor_maker.optimizer import FactorOptimizer
bt = FactorBacktester(data=raw_data, trading_freq='2h')
opt = FactorOptimizer(backtester=bt, data=raw_data)

In [10]:
opt.add_factor(macd)

In [11]:
opt.factor_data

{'open@noparams': shape: (200, 4)
 ┌─────────────────────┬────────────┬────────────┬─────────────┐
 │ datetime            ┆ AAPL       ┆ MSFT       ┆ GOOG        │
 │ ---                 ┆ ---        ┆ ---        ┆ ---         │
 │ datetime[ns]        ┆ f64        ┆ f64        ┆ f64         │
 ╞═════════════════════╪════════════╪════════════╪═════════════╡
 │ 2024-01-01 00:00:00 ┆ 150.098749 ┆ 300.001159 ┆ 2804.26789  │
 │ 2024-01-01 00:01:00 ┆ 153.615817 ┆ 300.7539   ┆ 2804.712822 │
 │ 2024-01-01 00:02:00 ┆ 151.926714 ┆ 302.197401 ┆ 2801.368064 │
 │ 2024-01-01 00:03:00 ┆ 153.63471  ┆ 303.729307 ┆ 2804.737348 │
 │ 2024-01-01 00:04:00 ┆ 151.977851 ┆ 301.553481 ┆ 2803.704407 │
 │ …                   ┆ …          ┆ …          ┆ …           │
 │ 2024-01-01 03:15:00 ┆ 152.474404 ┆ 301.294196 ┆ 2805.005212 │
 │ 2024-01-01 03:16:00 ┆ 153.601673 ┆ 304.335578 ┆ 2809.289586 │
 │ 2024-01-01 03:17:00 ┆ 153.642465 ┆ 303.627632 ┆ 2809.25477  │
 │ 2024-01-01 03:18:00 ┆ 152.925415 ┆ 303.462689 ┆ 2805.

In [12]:
opt.tasks

{'open@noparams': Delayed('calculate-9a06b3a6-afe2-4093-9f3c-51a1d352e0a1'),
 'volume@noparams': Delayed('calculate-b840d7f9-4535-4067-be19-196582249a64'),
 'vwap@window_20': Delayed('calculate-54372db5-0aad-4452-abe1-fda07071071d'),
 'ma@window_5': Delayed('calculate-82f1f446-b05e-46cc-b72d-04da60f29d75'),
 'ma@window_20': Delayed('calculate-a1af6480-dc05-47fa-ad86-f5e13a2ce054')}