### Imports for strategy

In [1]:
from collections import deque
from datetime import timedelta
from typing import Dict

from nautilus_trader.core.message import Event
from nautilus_trader.core.functions import fast_mean
from nautilus_trader.model.enums import OrderSide, OrderPurpose, TimeInForce
from nautilus_trader.model.objects import Price, Tick, BarSpecification, BarType, Bar, Instrument
from nautilus_trader.model.identifiers import Symbol
from nautilus_trader.trading.strategy import TradingStrategy
from nautilus_trader.trading.sizing import FixedRiskSizer

from nautilus_indicators.average.ema import ExponentialMovingAverage
from nautilus_indicators.atr import AverageTrueRange
from nautilus_indicators.__info__ import __version__ as n_indicators_version

print(f'nautilus_indicators {n_indicators_version}')

nautilus_indicators 0.63.1


In [2]:
class EMACrossPy(TradingStrategy):
    """"
    A simple moving average cross example strategy. When the fast EMA crosses
    the slow EMA then a STOP_MARKET atomic order is placed for that direction
    with a trailing stop and profit target at 1R risk.
    """

    def __init__(self,
                 symbol: Symbol,
                 bar_spec: BarSpecification,
                 risk_bp: float=10.0,
                 fast_ema: int=10,
                 slow_ema: int=20,
                 atr_period: int=20,
                 sl_atr_multiple: float=2.0):
        """
        Initializes a new instance of the EMACrossPy class.

        :param symbol: The symbol for the strategy.
        :param bar_spec: The bar specification for the strategy.
        :param risk_bp: The risk per trade (basis points).
        :param fast_ema: The fast EMA period.
        :param slow_ema: The slow EMA period.
        :param atr_period: The ATR period.
        :param sl_atr_multiple: The ATR multiple for stop-loss prices.
        """
        # Order id tag must be unique at trader level
        super().__init__(order_id_tag=symbol.code, bar_capacity=40)

        # Custom strategy variables
        self.symbol = symbol
        self.bar_type = BarType(symbol, bar_spec)
        self.precision = 5          # dummy initial value for FX
        self.risk_bp = risk_bp
        self.entry_buffer = 0.0     # instrument.tick_size
        self.SL_buffer = 0.0        # instrument.tick_size * 10
        self.SL_atr_multiple = sl_atr_multiple

        self.instrument = None      # initialized in on_start()
        self.position_sizer = None  # initialized in on_start()

        # Create the indicators for the strategy
        self.fast_ema = ExponentialMovingAverage(fast_ema)
        self.slow_ema = ExponentialMovingAverage(slow_ema)
        self.atr = AverageTrueRange(atr_period)

    def on_start(self):
        """
        This method is called when self.start() is called, and after internal start logic.
        """
        # Put custom code to be run on strategy start here (or pass)
        self.instrument = self.get_instrument(self.symbol)
        self.precision = self.instrument.price_precision
        self.entry_buffer = self.instrument.tick_size.as_double() * 3.0
        self.SL_buffer = self.instrument.tick_size * 10.0
        self.position_sizer = FixedRiskSizer(self.instrument)

        # Register the indicators for updating
        self.register_indicator(
            data_source=self.bar_type,
            indicator=self.fast_ema,
            update_method=self.fast_ema.update)
        self.register_indicator(
            data_source=self.bar_type,
            indicator=self.slow_ema,
            update_method=self.slow_ema.update)
        self.register_indicator(
            data_source=self.bar_type,
            indicator=self.atr,
            update_method=self.atr.update)

        # Get historical data
        self.get_ticks(self.symbol)
        self.get_bars(self.bar_type)

        # Subscribe to live data
        self.subscribe_instrument(self.symbol)
        self.subscribe_bars(self.bar_type)
        self.subscribe_ticks(self.symbol)

        # Kick off strategy immediately with last bar
        if self.bar_count(self.bar_type) > 0:
            self.on_bar(self.bar_type, self.bar(self.bar_type, 0))

    def on_tick(self, tick: Tick):
        """
        This method is called whenever a Tick is received by the strategy, and
        after the Tick has been processed by the base class.
        The received Tick object is then passed into this method.

        :param tick: The received tick.
        """
        # self.log.info(f"Received Tick({tick})")  # For debugging

    def on_bar(self, bar_type: BarType, bar: Bar):
        """
        This method is called whenever the strategy receives a Bar, and after the
        Bar has been processed by the base class.
        The received BarType and Bar objects are then passed into this method.

        :param bar_type: The received bar type.
        :param bar: The received bar.
        """
        self.log.info(f"Received {bar_type} Bar({bar})")  # For debugging

        # Check if indicators ready
        if not self.indicators_initialized():
            self.log.info(f"Waiting for indicators to warm up "
                           f"[{self.bar_count(self.bar_type)}] ...")
            return  # Wait for indicators to warm up...

        # Check if tick data available
        if not self.has_ticks(self.symbol):
            self.log.info(f"Waiting for {self.symbol.value} ticks...")
            return  # Wait for ticks...

        # Get average spread
        average_spread = self.average_spread(self.symbol)

        # Check market liquidity
        if average_spread == 0.0:
            return  # Protect divide by zero
        else:
            liquidity_ratio = self.atr.value / average_spread
            if liquidity_ratio < 2.0:
                self.log.info(f"Liquidity Ratio == {liquidity_ratio} (no liquidity).")
                return

        spread_buffer = max(average_spread, self.spread(self.symbol))
        sl_buffer = self.atr.value * self.SL_atr_multiple

        if self.count_orders_working() == 0 and self.is_flat():  # No active or pending positions
            # BUY LOGIC
            if self.fast_ema.value >= self.slow_ema.value:
                self._enter_long(bar, sl_buffer, spread_buffer)
            # SELL LOGIC
            elif self.fast_ema.value < self.slow_ema.value:
                self._enter_short(bar, sl_buffer, spread_buffer)

        self._check_trailing_stops(bar, sl_buffer, spread_buffer)

    def _enter_long(self, bar: Bar, sl_buffer: float, spread_buffer: float):
        price_entry = Price(bar.high.as_double() + self.entry_buffer + spread_buffer, self.precision)
        price_stop_loss = Price(bar.low.as_double() - sl_buffer, self.precision)

        risk = price_entry.as_double() - price_stop_loss.as_double()
        price_take_profit = Price(price_entry.as_double() + risk, self.precision)

        exchange_rate = self.get_exchange_rate_for_account(
            quote_currency=self.instrument.quote_currency,
            price_type=PriceType.ASK)

        position_size = self.position_sizer.calculate(
            equity=self.account().free_equity,
            risk_bp=self.risk_bp,
            entry=price_entry,
            stop_loss=price_stop_loss,
            exchange_rate=exchange_rate,
            commission_rate_bp=0.15,
            hard_limit=20000000,
            units=1,
            unit_batch_size=10000)
        if position_size > 0:
            atomic_order = self.order_factory.atomic_stop_market(
                symbol=self.symbol,
                order_side=OrderSide.BUY,
                quantity=position_size,
                price_entry=price_entry,
                price_stop_loss=price_stop_loss,
                price_take_profit=price_take_profit,
                time_in_force=TimeInForce.GTD,
                expire_time=bar.timestamp + timedelta(minutes=1))

            self.submit_atomic_order(atomic_order, self.position_id_generator.generate())
        else:
            self.log.info("Insufficient equity for BUY signal.")

    def _enter_short(self, bar: Bar, sl_buffer: float, spread_buffer: float):
        price_entry = Price(bar.low.as_double() - self.entry_buffer, self.precision)
        price_stop_loss = Price(bar.high.as_double() + sl_buffer + spread_buffer, self.precision)

        risk = price_stop_loss.as_double() - price_entry.as_double()
        price_take_profit = Price(price_entry.as_double() - risk, self.precision)

        exchange_rate = self.get_exchange_rate_for_account(
            quote_currency=self.instrument.quote_currency,
            price_type=PriceType.ASK)

        position_size = self.position_sizer.calculate(
            equity=self.account().free_equity,
            risk_bp=self.risk_bp,
            entry=price_entry,
            stop_loss=price_stop_loss,
            exchange_rate=exchange_rate,
            commission_rate_bp=0.15,
            hard_limit=20000000,
            units=1,
            unit_batch_size=10000)

        if position_size > 0:  # Sufficient equity for a position
            atomic_order = self.order_factory.atomic_stop_market(
                symbol=self.symbol,
                order_side=OrderSide.SELL,
                quantity=position_size,
                price_entry=price_entry,
                price_stop_loss=price_stop_loss,
                price_take_profit=price_take_profit,
                time_in_force=TimeInForce.GTD,
                expire_time=bar.timestamp + timedelta(minutes=1))

            self.submit_atomic_order(atomic_order, self.position_id_generator.generate())
        else:
            self.log.info("Insufficient equity for SELL signal.")

    def _check_trailing_stops(self, bar: Bar, sl_buffer: float, spread_buffer: float):
        for working_order in self.orders_working().values():
            if working_order.purpose == OrderPurpose.STOP_LOSS:
                # SELL SIDE ORDERS
                if working_order.is_sell:
                    temp_price = Price(bar.low.as_double() - sl_buffer, self.precision)
                    if temp_price.gt(working_order.price):
                        self.modify_order(working_order, working_order.quantity, temp_price)
                # BUY SIDE ORDERS
                elif working_order.is_buy:
                    temp_price = Price(bar.high.as_double() + sl_buffer + spread_buffer, self.precision)
                    if temp_price.lt(working_order.price):
                        self.modify_order(working_order, working_order.quantity, temp_price)

    def on_instrument(self, instrument: Instrument):
        """
        This method is called whenever the strategy receives an Instrument update.

        :param instrument: The received instrument.
        """
        if self.instrument.symbol.equals(instrument.symbol):
            self.instrument = instrument

        self.log.info(f"Updated instrument {instrument}.")

    def on_event(self, event: Event):
        """
        This method is called whenever the strategy receives an Event object,
        and after the event has been processed by the TradingStrategy base class.
        These events could be AccountEvent, OrderEvent, PositionEvent, TimeEvent.

        :param event: The received event.
        """
        # Put custom code for event handling here (or pass)
        pass

    def on_stop(self):
        """
        This method is called when self.stop() is called and after internal
        stopping logic.
        """
        # Put custom code to be run on strategy stop here (or pass)
        pass

    def on_reset(self):
        """
        This method is called when self.reset() is called, and after internal
        reset logic such as clearing the internally held bars, ticks and resetting
        all indicators.
        """
        # Put custom code to be run on a strategy reset here (or pass)
        pass

    def on_save(self) -> {}:
        # Put custom state to be saved here (or return empty dictionary)
        return {}

    def on_load(self, state: {}):
        # Put custom state to be loaded here (or pass)
        pass

    def on_dispose(self):
        """
        This method is called when self.dispose() is called. Dispose of any
        resources that has been used by the strategy here.
        """
        # Put custom code to be run on a strategy disposal here (or pass)
        self.unsubscribe_instrument(self.symbol)
        self.unsubscribe_bars(self.bar_type)
        self.unsubscribe_ticks(self.symbol)


### Get Data

In [10]:
import sys
import pytz

from datetime import datetime

from nautilus_research.__info__ import __version__ as nautilus_research_version
from nautilus_research.database import ResearchDatabaseClient, Database, Library, MarketDataType
from nautilus_research.data.integrity import check_tick_data

print(f'nautilus_research {nautilus_research_version}')

nautilus_research 0.66.2


In [11]:
library = ResearchDatabaseClient(database=Database.MARKET_DATA)(Library.FXCM)

start = datetime(2017, 2, 1, 0, 0, 0, 0, pytz.UTC)
stop = datetime(2017, 3, 1, 0, 0, 0, 0, pytz.UTC)

In [12]:
tick_data_audusd = library.read_ticks(
        symbol='AUDUSD',
        start_datetime=start,
        end_datetime=stop)

print(f'{tick_data_audusd.shape[0]:,} rows')
print(f'{sys.getsizeof(tick_data_audusd):,} bytes')
tick_data_audusd.head()

TypeError: Cannot compare tz-naive and tz-aware datetime-like objects.

In [None]:
# tick_data_gbpusd = library.read_ticks(
#         symbol='GBPUSD',
#         start_datetime=start,
#         end_datetime=stop)

# print(f'{tick_data_gbpusd.shape[0]:,} rows')
# print(f'{sys.getsizeof(tick_data_gbpusd):,} bytes')
# tick_data_gbpusd.head()

In [None]:
# tick_data_eurusd = library.read_ticks(
#         symbol='EURUSD',
#         start_datetime=start,
#         end_datetime=stop)

# print(f'{tick_data_eurusd.shape[0]:,} rows')
# print(f'{sys.getsizeof(tick_data_eurusd):,} bytes')
# tick_data_eurusd.head()

In [None]:
# tick_data_usdjpy = library.read_ticks(
#         symbol='USDJPY',
#         start_datetime=start,
#         end_datetime=stop)

# print(f'{tick_data_usdjpy.shape[0]:,} rows')
# print(f'{sys.getsizeof(tick_data_usdjpy):,} bytes')
# tick_data_usdjpy.head()

### Build Backtest Engine

In [13]:
from nautilus_trader.common.logger import LogLevel
from nautilus_trader.model.enums import Currency, SecurityType, BarStructure, PriceType
from nautilus_trader.model.identifiers import Venue, TraderId
from nautilus_trader.backtest.loaders import InstrumentLoader
from nautilus_trader.backtest.config import BacktestConfig
from nautilus_trader.backtest.data import BacktestDataContainer
from nautilus_trader.backtest.models import FillModel
from nautilus_trader.backtest.engine import BacktestEngine

from nautilus_trader import __version__ as n_trader_version
print(f'nautilus_trader {n_trader_version}')

nautilus_trader 1.18.1


In [14]:
loader = InstrumentLoader()
AUDUSD = loader.default_fx_ccy(Symbol('AUDUSD', Venue('FXCM')))  # Test Instrument

data = BacktestDataContainer()
data.add_instrument(AUDUSD)
data.add_ticks(AUDUSD.symbol, tick_data_audusd)

strategies = [EMACrossPy(
    symbol=AUDUSD.symbol,
    bar_spec=BarSpecification(200, BarStructure.TICK, PriceType.MID),
    risk_bp=10,
    fast_ema=10,
    slow_ema=20,
    atr_period=20,
    sl_atr_multiple=2.0)]

config = BacktestConfig(
    exec_db_type='in-memory',
    exec_db_flush=False,
    frozen_account=False,
    starting_capital=1000000,
    account_currency=Currency.USD,
    bypass_logging=False,
    level_console=LogLevel.INFO,
    level_file=LogLevel.DEBUG,
    level_store=LogLevel.WARNING,
    log_thread=False,
    log_to_file=False)

fill_model = FillModel(
    prob_fill_at_limit=0.2,
    prob_fill_at_stop=0.95,
    prob_slippage=0.5,
    random_seed=None)

engine = BacktestEngine(
    data=data,
    strategies=strategies,
    config=config,          # Optional (can be None)
    fill_model=fill_model)  # Optional (can be None)

NameError: name 'tick_data_audusd' is not defined

In [None]:
help(engine.run)

In [None]:
engine.run()

In [None]:
engine.trader.generate_account_report()

In [None]:
import matplotlib.pyplot as plt
from pandas.plotting import register_matplotlib_converters

register_matplotlib_converters()

In [None]:
plt.plot(engine.trader.generate_account_report()['cash_balance'])

In [None]:
engine.trader.generate_order_fills_report()

In [None]:
engine.trader.generate_positions_report()