In [170]:
from nautilus_trader.trading.strategy import Strategy
from nautilus_trader.model.data import DataType
from nautilus_trader.core import Data
from nautilus_trader.model.data import BarType
from nautilus_trader.model.data import Bar
from nautilus_trader.model import InstrumentId
from nautilus_trader.common.enums import LogColor
from nautilus_trader.config import StrategyConfig
import pyarrow as pa
from nautilus_trader.core.datetime import unix_nanos_to_iso8601

class TweetSignalData(Data):
    def __init__(
        self, instrument_id: InstrumentId | None = None,
        ts_init: int = 0,
        signal: int = 0,
    ) -> None:
        self.instrument_id = instrument_id
        self._ts_init = ts_init
        self.signal = signal

    def __repr__(self):
        return (f"Tweet Data(ts_init={unix_nanos_to_iso8601(self._ts_init)}, instrument_id={self.instrument_id}, signal={self.signal:.2f})")

    @property
    def ts_init(self):
        return self._ts_init

    def to_dict(self):
        return {
            "instrument_id": self.instrument_id.value,
            "ts_init": self._ts_init,
            "signal": self.signal,
        }

    @classmethod
    def from_dict(cls, data: dict):
        return TweetSignalData(InstrumentId.from_str(data["instrument_id"]), data["ts_init"], data["signal"])

    def to_bytes(self):
        return msgspec.msgpack.encode(self.to_dict())

    @classmethod
    def from_bytes(cls, data: bytes):
        return cls.from_dict(msgspec.msgpack.decode(data))

    def to_catalog(self):
        return pa.RecordBatch.from_pylist([self.to_dict()], schema=TweetSignalData.schema())

    @classmethod
    def from_catalog(cls, table: pa.Table):
        return [TweetSignalData.from_dict(d) for d in table.to_pylist()]

    @classmethod
    def schema(cls):
        return pa.schema(
            {
                "instrument_id": pa.string(),
                "ts_init": pa.int64(),
                "signal": pa.int64(),
            }
        )

class MyStrategyConfig(StrategyConfig, frozen = True):
    bar_type: BarType
    tweet_signal_data: TweetSignalData

class MyStrategy(Strategy):
    def __init__(self, config: MyStrategyConfig):
        super().__init__(config = config)

        self.count_of_bars: int = 0
        self.count_of_tweets: int = 0

    def on_start(self):
        
        self.subscribe_bars(self.config.bar_type)
        self.subscribe_data(self.config.tweet_signal_data)

    def on_bar(self, bar: Bar):
        if isinstance(bar, Bar):
            self.count_of_bars += 1

    def on_data(self, data: Data):
        if isinstance(data, TweetSignalData):
            self.on_tweet_data(data)
        else:
            super().on_data(data)  # Pass other types (like Bar, Quote, etc.)
    
    def on_tweet_data(self, data: TweetSignalData):
        # Handle your tweet signal
        print(f"Received tweet signal: {data.signal}")
        self.count_of_tweets += 1

    def on_end(self):
        self.log.info(f"Number of bars: {self.count_of_bars}",
                      color = LogColor.GREEN
                     )
        self.log.info(f"Number of tweets: {self.count_of_tweets}",
                      color = LogColor.GREEN
                     )



In [161]:
from nautilus_trader.backtest.engine import BacktestEngine

from nautilus_trader.config import BacktestEngineConfig
from nautilus_trader.config import LoggingConfig

from nautilus_trader.model import TraderId
from nautilus_trader.model import Bar

engine_config = BacktestEngineConfig(
    trader_id = TraderId("BACKTEST-TWEET-001"),
    logging = LoggingConfig(log_level = "DEBUG")
)

engine = BacktestEngine(config = engine_config)

[1m2025-04-28T10:50:59.180701000Z[0m [INFO] BACKTEST-TWEET-001.BacktestEngine: Building system kernel[0m
[1m2025-04-28T10:50:59.182905000Z[0m [94m[INFO] BACKTEST-TWEET-001.MessageBus: config.database=None[0m
[1m2025-04-28T10:50:59.182909000Z[0m [94m[INFO] BACKTEST-TWEET-001.MessageBus: config.encoding='msgpack'[0m
[1m2025-04-28T10:50:59.182915000Z[0m [94m[INFO] BACKTEST-TWEET-001.MessageBus: config.timestamps_as_iso8601=False[0m
[1m2025-04-28T10:50:59.182915001Z[0m [94m[INFO] BACKTEST-TWEET-001.MessageBus: config.buffer_interval_ms=None[0m
[1m2025-04-28T10:50:59.182923000Z[0m [94m[INFO] BACKTEST-TWEET-001.MessageBus: config.autotrim_mins=None[0m
[1m2025-04-28T10:50:59.182924000Z[0m [94m[INFO] BACKTEST-TWEET-001.MessageBus: config.use_trader_prefix=True[0m
[1m2025-04-28T10:50:59.182924001Z[0m [94m[INFO] BACKTEST-TWEET-001.MessageBus: config.use_trader_id=True[0m
[1m2025-04-28T10:50:59.182925000Z[0m [94m[INFO] BACKTEST-TWEET-001.MessageBus: config.use_in

In [162]:
from data_provider import prepare_demo_data_eurusd_futures_1min

prepared_data: dict = prepare_demo_data_eurusd_futures_1min()


In [163]:
from nautilus_trader.model.instruments.base import Instrument

venue_name: str = prepared_data["venue_name"]
eurusd_instrument: Instrument = prepared_data["instrument"]
eurusd_1min_bartype: BarType = prepared_data["bar_type"]
eurusd_1min_bars_list: list[Bar] = prepared_data["bars_list"]


In [164]:
# Data type
# cần data
from nautilus_trader.test_kit.providers import TestInstrumentProvider

VENUE_NAME = "XCME"

tweet_instrument: Instrument = TestInstrumentProvider.eurusd_future(
        expiry_year=2024,
        expiry_month=3,
        venue_name=VENUE_NAME,
    )

tweet_data_list: list[TweetSignalData] = [
    TweetSignalData(instrument_id=tweet_instrument.id, ts_init = 1704150060000000000, signal = 0),
    TweetSignalData(instrument_id=tweet_instrument.id, ts_init = 1704150120000000000, signal = 1),
]


In [165]:
from decimal import Decimal
from nautilus_trader.backtest.engine import BacktestEngine
from nautilus_trader.config import BacktestEngineConfig
from nautilus_trader.config import LoggingConfig
from nautilus_trader.core.nautilus_pyo3 import BarType
from nautilus_trader.model import Bar
from nautilus_trader.model import TraderId
from nautilus_trader.model.currencies import USD
from nautilus_trader.model.enums import AccountType
from nautilus_trader.model.enums import OmsType
from nautilus_trader.model.identifiers import Venue
from nautilus_trader.model.instruments.base import Instrument
from nautilus_trader.model.objects import Money

engine.add_venue(
        venue=Venue(venue_name),
        oms_type=OmsType.NETTING,  # Order Management System type
        account_type=AccountType.MARGIN,  # Type of trading account
        starting_balances=[Money(1_000_000, USD)],  # Initial account balance
        base_currency=USD,  # Base currency for account
        default_leverage=Decimal(1),  # No leverage used for account
    )

# Add instrument and market data to the engine
engine.add_instrument(eurusd_instrument)


[1m2025-04-28T10:51:01.018602000Z[0m [INFO] BACKTEST-TWEET-001.SimulatedExchange(XCME): OmsType=NETTING[0m
[1m2025-04-28T10:51:01.020314000Z[0m [INFO] BACKTEST-TWEET-001.ExecClient-XCME: READY[0m
[1m2025-04-28T10:51:01.021599000Z[0m [INFO] BACKTEST-TWEET-001.SimulatedExchange(XCME): Registered ExecutionClient-XCME[0m
[1m2025-04-28T10:51:01.021981000Z[0m [INFO] BACKTEST-TWEET-001.ExecEngine: Registered ExecutionClient-XCME[0m
[1m2025-04-28T10:51:01.022491000Z[0m [INFO] BACKTEST-TWEET-001.BacktestEngine: Added SimulatedExchange(id=XCME, oms_type=NETTING, account_type=MARGIN)[0m
[1m2025-04-28T10:51:01.024914000Z[0m [INFO] BACKTEST-TWEET-001.DataClient-XCME: READY[0m
[1m2025-04-28T10:51:01.026262000Z[0m [INFO] BACKTEST-TWEET-001.DataEngine: Registered XCME[0m
[1m2025-04-28T10:51:01.026891000Z[0m [DEBUG] BACKTEST-TWEET-001.Cache: Added instrument 6EH4.XCME[0m
[1m2025-04-28T10:51:01.030748000Z[0m [INFO] BACKTEST-TWEET-001.SimulatedExchange(XCME): Added instrument 6E

In [166]:
engine.add_data(eurusd_1min_bars_list)

[1m2025-04-28T10:51:01.255567000Z[0m [INFO] BACKTEST-TWEET-001.BacktestEngine: Added 29_996 6EH4.XCME-1-MINUTE-LAST-EXTERNAL Bar elements[0m


In [167]:
engine.add_data(tweet_data_list)

[1m2025-04-28T10:51:01.441338000Z[0m [INFO] BACKTEST-TWEET-001.BacktestEngine: Added 2 6EH4.XCME TweetSignalData elements[0m


In [168]:
strategy = MyStrategy(config=MyStrategyConfig(eurusd_1min_bartype, DataType(TweetSignalData)))
engine.add_strategy(strategy)

# Execute the backtest
engine.run()

# Clean up resources
engine.dispose()

[1m2025-04-28T10:51:01.801196000Z[0m [INFO] BACKTEST-TWEET-001.MyStrategy: READY[0m
[1m2025-04-28T10:51:01.802305000Z[0m [DEBUG] BACKTEST-TWEET-001.MessageBus: Added Subscription(topic=events.order.MyStrategy-000, handler=<bound method Strategy.handle_event of MyStrategy(MyStrategy-000)>, priority=0)[0m
[1m2025-04-28T10:51:01.802316000Z[0m [DEBUG] BACKTEST-TWEET-001.MessageBus: Added Subscription(topic=events.position.MyStrategy-000, handler=<bound method Strategy.handle_event of MyStrategy(MyStrategy-000)>, priority=0)[0m
[1m2025-04-28T10:51:01.802358000Z[0m [INFO] BACKTEST-TWEET-001.ExecEngine: Registered OMS.UNSPECIFIED for Strategy MyStrategy-000[0m
[1m2025-04-28T10:51:01.802379000Z[0m [INFO] BACKTEST-TWEET-001.BACKTEST-TWEET-001: Registered Strategy MyStrategy-000[0m
[1m2025-04-28T10:51:01.808060000Z[0m [DEBUG] BACKTEST-TWEET-001.Cache: Added Account(id=XCME-001)[0m
[1m2025-04-28T10:51:01.808315000Z[0m [DEBUG] BACKTEST-TWEET-001.Cache: Indexed AccountId('XCME-0

[1m2024-01-01T23:01:00.000000000Z[0m [1;31m[ERROR] BACKTEST-TWEET-001.DataEngine: Cannot handle data: unrecognized type <class '__main__.TweetSignalData'> Tweet Data(ts_init=2024-01-01T23:01:00.000000000Z, instrument_id=6EH4.XCME, signal=0.00)[0m
[1m2024-01-01T23:02:00.000000000Z[0m [1;31m[ERROR] BACKTEST-TWEET-001.DataEngine: Cannot handle data: unrecognized type <class '__main__.TweetSignalData'> Tweet Data(ts_init=2024-01-01T23:02:00.000000000Z, instrument_id=6EH4.XCME, signal=1.00)[0m


-MINUTE-LAST-EXTERNAL,1.10710,1.10720,1.10705,1.10715,30,1704152160000000000)[0m
[1m2024-01-01T23:36:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with open 1.10710[0m
[1m2024-01-01T23:36:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with high 1.10720[0m
[1m2024-01-01T23:36:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with low 1.10705[0m
[1m2024-01-01T23:36:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with close 1.10715[0m
[1m2024-01-01T23:37:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Processing Bar(6EH4.XCME-1-MINUTE-LAST-EXTERNAL,1.10710,1.10710,1.10710,1.10710,15,1704152220000000000)[0m
[1m2024-01-01T23:37:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with open 1.10710[0m
[1m2024-01-01T23:38:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Proc

In [169]:
engine.data

024-01-02T03:30:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with close 1.10555[0m
[1m2024-01-02T03:31:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Processing Bar(6EH4.XCME-1-MINUTE-LAST-EXTERNAL,1.10555,1.10560,1.10555,1.10560,58,1704166260000000000)[0m
[1m2024-01-02T03:31:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with high 1.10560[0m
[1m2024-01-02T03:31:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with low 1.10555[0m
[1m2024-01-02T03:31:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with close 1.10560[0m
[1m2024-01-02T03:32:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Processing Bar(6EH4.XCME-1-MINUTE-LAST-EXTERNAL,1.10565,1.10565,1.10565,1.10565,3,1704166320000000000)[0m
[1m2024-01-02T03:32:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with 

[]

550,5,1704166740000000000)[0m
[1m2024-01-02T03:39:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with low 1.10550[0m
[1m2024-01-02T03:40:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Processing Bar(6EH4.XCME-1-MINUTE-LAST-EXTERNAL,1.10550,1.10560,1.10550,1.10560,9,1704166800000000000)[0m
[1m2024-01-02T03:40:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with high 1.10560[0m
[1m2024-01-02T03:40:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with low 1.10550[0m
[1m2024-01-02T03:40:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with close 1.10560[0m
[1m2024-01-02T03:41:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Processing Bar(6EH4.XCME-1-MINUTE-LAST-EXTERNAL,1.10560,1.10565,1.10560,1.10560,14,1704166860000000000)[0m
tchingEngine(XCME): Updating with high 1.10565[0mKTEST-TWEET-001.OrderMa

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



[1m2024-01-31T20:14:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with open 1.08310[0m
[1m2024-01-31T20:14:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with high 1.08355[0m
[1m2024-01-31T20:14:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with low 1.08280[0m
[1m2024-01-31T20:14:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with close 1.08335[0m
[1m2024-01-31T20:15:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Processing Bar(6EH4.XCME-1-MINUTE-LAST-EXTERNAL,1.08340,1.08350,1.08325,1.08335,520,1706732100000000000)[0m
[1m2024-01-31T20:15:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with open 1.08340[0m
[1m2024-01-31T20:15:00.000000000Z[0m [DEBUG] BACKTEST-TWEET-001.OrderMatchingEngine(XCME): Updating with high 1.08350[0m
[1m2024-01-31T20:15:00.000000000Z[0m [DEBUG] BACKTES