In [14]:
# normal imports
import time
import pandas as pd
import numpy as np
import datetime as dt
import yfinance as yf
from datetime import datetime
from decimal import Decimal

# nautilus trader imports
from nautilus_trader.backtest.engine import BacktestEngine, BacktestEngineConfig
from nautilus_trader.model.currencies import USD
from nautilus_trader.model.enums import AccountType, OmsType
from nautilus_trader.model.identifiers import Venue, ClientId
from nautilus_trader.model.objects import Money
from nautilus_trader.persistence.wranglers import BarDataWrangler
from nautilus_trader.test_kit.providers import TestInstrumentProvider, TestDataProvider
from nautilus_trader.config import StrategyConfig
from nautilus_trader.model.identifiers import InstrumentId
from nautilus_trader.model.instruments import Instrument
from nautilus_trader.model.orders.list import OrderList
from nautilus_trader.trading.strategy import Strategy
from nautilus_trader.config import LoggingConfig
from nautilus_trader.model.data import Bar, BarSpecification, BarType
from nautilus_trader.model.enums import OrderSide, PositionSide, TimeInForce
from nautilus_trader.persistence.wranglers import BarDataWrangler
from BasicMRStrategy import BasicMR, BasicMRConfig
from nautilus_trader.backtest.node import BacktestNode, BacktestVenueConfig, BacktestDataConfig, BacktestRunConfig, BacktestEngineConfig
from nautilus_trader.config import ImportableStrategyConfig,  ImportableActorConfig, StreamingConfig
from nautilus_trader.persistence.catalog import ParquetDataCatalog
from nautilus_trader.model.enums import AggregationSource
from nautilus_trader.core.datetime import dt_to_unix_nanos
from util import yf_to_timeseries
from nautilus_trader.core.data import Data

from itertools import repeat
from nautilus_trader.model.objects import Price

# other file related imports
from pathlib import Path
import fsspec 
import shutil


# creating instrument
MSFT_SIM = TestInstrumentProvider.equity(symbol="MSFT", venue="SIM")

# downloading data
start_str = "2023-01-01"
end_str = "2023-01-31"

msft_df = yf.download("MSFT", start=start_str, end=end_str, interval="1h")
msft_ts = yf_to_timeseries(msft_df, 7)


# processing data
bartype = BarType.from_str("MSFT.SIM-1-HOUR-LAST-EXTERNAL")


[*********************100%%**********************]  1 of 1 completed


In [27]:
from nautilus_trader.core.datetime import dt_to_unix_nanos, unix_nanos_to_dt, format_iso8601
import msgspec
import pyarrow as pa

def unix_nanos_to_str(unix_nanos):
    return format_iso8601(unix_nanos_to_dt(unix_nanos))

In [28]:
ts_event = msft_ts.index.view(np.uint64)
ts_init = ts_event.copy()

In [30]:
class SingleBar(Data):
    def __init__(self, price: float, ts_event=0, ts_init=0):
        self.price = price
        self._ts_event = ts_event  
        self._ts_init = ts_init 

    def __repr__(self):
        return (f"SingleBar("
                f"price={self.price:.2f}, "
                f"ts_event={unix_nanos_to_str(self._ts_event)}, "
                f"ts_init={unix_nanos_to_str(self._ts_init)}, ")

    @property
    def ts_event(self) -> int:
        return self._ts_event
    
    @ts_event.setter
    def ts_event(self, value):
        self._ts_event = value 

    @property
    def ts_init(self) -> int:
        return self._ts_init
    
    @ts_init.setter
    def ts_init(self, value):
        self._ts_init = value  

    def to_dict(self):
        return {
            "price": self.price,
            "ts_event": self._ts_event,
            "ts_init": self._ts_init
        }

    @classmethod
    def from_dict(cls, data: dict):
        return SingleBar(data["price"], data["ts_event"], data["ts_init"])

    @classmethod
    def from_bytes(cls, data: bytes):
        return cls.from_dict(msgspec.msgpack.decode(data))
    
    def to_bytes(self):
        return msgspec.msgpack.encode(self.to_dict())


    def to_catalog(self):
        return pa.RecordBatch.from_pylist([self.to_dict()], schema=SingleBar.schema())

    @classmethod
    def from_catalog(cls, table: pa.Table):
        return [SingleBar.from_dict(d) for d in table.to_pylist()]

    @classmethod
    def schema(cls):
        return pa.schema(
            {
                "price": pa.float64(),
                "ts_event": pa.int64(),
                "ts_init": pa.int64()
            }
        )

In [31]:
def build_bar(price, ts_event, ts_init, price_precision=2):
    return SingleBar(
        price=price,
        ts_event=ts_event,
        ts_init=ts_init
    )

bars = list(map(build_bar, msft_ts.Price, ts_event, ts_init))
bars[:5]

[SingleBar(price=243.08, ts_event=2023-01-03T09:30:00.000Z, ts_init=2023-01-03T09:30:00.000Z, ,
 SingleBar(price=239.79, ts_event=2023-01-03T10:30:00.000Z, ts_init=2023-01-03T10:30:00.000Z, ,
 SingleBar(price=238.36, ts_event=2023-01-03T11:30:00.000Z, ts_init=2023-01-03T11:30:00.000Z, ,
 SingleBar(price=238.15, ts_event=2023-01-03T12:30:00.000Z, ts_init=2023-01-03T12:30:00.000Z, ,
 SingleBar(price=238.44, ts_event=2023-01-03T13:30:00.000Z, ts_init=2023-01-03T13:30:00.000Z, ]

In [32]:
from nautilus_trader.serialization.base import register_serializable_type
from nautilus_trader.serialization.arrow.serializer import register_arrow

register_serializable_type(SingleBar, SingleBar.to_dict, SingleBar.from_dict)
register_arrow(SingleBar, SingleBar.schema(), SingleBar.to_catalog, SingleBar.from_catalog)

In [33]:
# create path and clear if it exists
CATALOG_PATH = Path.cwd() / "Data" / "MSFT2023catalog"

if CATALOG_PATH.exists():
    shutil.rmtree(CATALOG_PATH)
CATALOG_PATH.mkdir(parents=True)

# create catalog
catalog = ParquetDataCatalog(CATALOG_PATH)

# write to catalog
catalog.write_data([MSFT_SIM])
catalog.write_data(bars)

instrument = catalog.instruments()[0]

In [34]:
instrument

Equity(id=MSFT.SIM, raw_symbol=MSFT, asset_class=EQUITY, instrument_class=SPOT, quote_currency=USD, is_inverse=False, price_precision=2, price_increment=0.01, size_precision=0, size_increment=1, multiplier=1, lot_size=100, margin_init=0, margin_maint=0, maker_fee=0, taker_fee=0, info=None)