In [None]:
import numpy as np
%load_ext autoreload
%autoreload 2

In [None]:
# MT5 Imports
import MetaTrader5 as mt5
from MetaTrader5 import AccountInfo, TerminalInfo
import importlib
importlib.reload(mt5)

In [None]:
import dotenv
import os
import importlib
import pandas as pd
from datetime import datetime
import logging

#import naut_mt5 as n5
#from naut_mt5 import data_utils

import nautilus_trader
# test instrument provider
from nautilus_trader.test_kit.providers import TestInstrumentProvider
from nautilus_trader.persistence.wranglers import QuoteTickDataWrangler
from nautilus_trader.persistence.catalog import ParquetDataCatalog
import os
from nautilus_trader.data.engine import ParquetDataCatalog
from nautilus_trader.model.instruments import Instrument
from nautilus_trader.model.data import BarType, Bar
import pandas as pd
from pandas import DataFrame

from nautilus_trader.persistence.wranglers import BarDataWrangler


In [None]:
# ENVIRONMENT
if not dotenv.load_dotenv():
    logging.log(logging.INFO, "No .env file found")

dotenv.load_dotenv(override=True)
MT5_SERVER = os.environ["MT5_SERVER"]
MT5_LOGIN = os.environ["MT5_LOGIN"]
MT5_PASSWORD = os.environ["MT5_PASSWORD"]
DATA_PATH = os.environ["DATA_PATH"]
CATALOG_PATH = os.environ["CATALOG_PATH"]


print(f"MT5_SERVER: {MT5_SERVER}")


# Symbol Parameters

In [None]:
from nautilus_trader.model.identifiers import InstrumentId, Venue
from nautilus_trader.model.data import BarType, BarSpecification, BarAggregation
from nautilus_trader.model.data import QuoteTick

# load a couple of symbols into the catalog using the loader
symbol_broker = 'XAUUSD'
symbol_clean = 'XAUUSD'
venue= "SIM_IC"
instrument = TestInstrumentProvider.default_fx_ccy(symbol_clean, Venue(venue))
timeframe = mt5.TIMEFRAME_M1
start_date = datetime(1971, 1, 1)
end_date = datetime.now()

# variables dependent on parameters
if symbol_clean:
    symbol = symbol_clean
else:
    symbol = symbol_broker
    
loader_config = n5.MTLoginConfig(server=MT5_SERVER, login=MT5_LOGIN, password=MT5_PASSWORD)
loader = n5.MT5Loader(data_path=DATA_PATH, catalog_path=CATALOG_PATH, config=loader_config, venue=venue)
loader.init()

In [None]:
mt5.account_info().equity

# Delete the symbol from the catalog

In [None]:
# delete the symbols data from the catalog
bar_type = loader.get_bar_type(symbol, timeframe)
print(bar_type)

if not data_utils.delete_parquet_data(bar_type, CATALOG_PATH):
    print(f"INFO: Could not delete")
else:
    print(f"INFO: Deleted data for {bar_type}")
    
    

# Load the symbol from mt5 to csv

In [None]:
loader.load_symbol_rates_to_csv(symbol_broker, symbol_clean, timeframe, start_date, end_date, DATA_PATH)

# Load the symbol from csv to parquet catalog

In [None]:
from nautilus_trader.model.identifiers import InstrumentId, Venue, Symbol
from nautilus_trader.model.data import BarType, BarSpecification, BarAggregation
from nautilus_trader.model.data import QuoteTick
from nautilus_trader.model.enums import AssetClass
from nautilus_trader.model.instruments import CurrencyPair
from nautilus_trader.model.instruments import Cfd
from nautilus_trader.model.objects import Currency
from nautilus_trader.model.objects import Money
from nautilus_trader.model.objects import Price
from nautilus_trader.model.objects import Quantity
from nautilus_trader.model.currencies import USD
from decimal import Decimal

In [None]:
catalog = ParquetDataCatalog("./catalog")
symbol = Symbol("XAUUSD")
venue = Venue("SIM")
instrument_id = InstrumentId(symbol, venue)

print(f"Catalog: {catalog}")
print(f"Symbol: {symbol}")
print(f"Venue: {venue}")
print(f"Instrument: {instrument_id}")


symbol_str = symbol.value
base_currency = symbol_str[:3]
quote_currency = symbol_str[-3:]
price_precision = 2

c = CurrencyPair(
    instrument_id=instrument_id,
    raw_symbol=symbol,
    base_currency=Currency.from_str(base_currency),
    quote_currency=Currency.from_str(quote_currency),
    price_precision=price_precision,
    size_precision=0,
    price_increment=Price(1 / 10 ** price_precision, price_precision),
    size_increment=Quantity.from_int(1),
    lot_size=Quantity.from_str("100"),
    max_quantity=Quantity.from_str("1e7"),
    min_quantity=Quantity.from_str("1"),
    max_price=None,
    min_price=None,
    max_notional=Money(50_000_000.00, USD),
    min_notional=Money(1.00, USD), # todo: check this
    margin_init=Decimal("0.03"),
    margin_maint=Decimal("0.03"),
    maker_fee=Decimal("0.00002"),
    taker_fee=Decimal("0.00002"),
    ts_event=0,
    ts_init=0,
)

In [None]:
catalog.list_data_types()
catalog.instruments()

In [None]:
csv_path = './XAUUSD_data_m1.csv'
df = pd.read_csv(csv_path, index_col="time", parse_dates=True)

print(df.head())

In [None]:
bar_type = BarType.from_str(f"{instrument_id}-1-MINUTE-MID-EXTERNAL")
print(f"Bar type: {bar_type}")

In [None]:
wrangler = BarDataWrangler(bar_type, instrument)
bars: list[Bar] = wrangler.process(df)
#basename_template = "part-{i}" + f"-from-{int(start.timestamp())}-to-{int(end.timestamp())}"
print(f"Bars: {bars}")

In [None]:
catalog.write_data([instrument],)
catalog.write_data(bars,)

In [None]:
def load_df_bars(df: pd.DataFrame, bar_type: BarType, instrument: Instrument) -> list[Bar]:
    wrangler = BarDataWrangler(bar_type, instrument)
    bars: list[Bar] = wrangler.process(df)
    return bars

def bar_type_to_str(bar_type: BarType) -> str:
    agg_src_mapping = {
        1: 'EXTERNAL',
        2: 'INTERNAL'
    }
    return f"{bar_type.instrument_id}-{bar_type.spec}-{agg_src_mapping[bar_type.aggregation_source]}"

In [None]:
def load_df_bars(df: DataFrame, bar_type: BarType, instrument: Instrument) -> list[Bar]:
    wrangler = BarDataWrangler(bar_type, instrument)
    bars: list[Bar] = wrangler.process(df)
    return bars

In [None]:

# create catalog directory if it doesn't exist
if not os.path.exists(self.catalog_path):
    print(f"INFO: Creating catalog directory at: {self.catalog_path}")
    os.mkdir(self.catalog_path)

instrument = TestInstrumentProvider.default_fx_ccy(symbol, self.venue)
ticker_path = os.path.join(self.data_path, symbol.replace('/', '') + ".csv")

bar_type = self.get_bar_type(symbol=symbol, timeframe=timeframe)  # arbitrary? but .SIM-*** and meaningful name
wrangler = BarDataWrangler(bar_type, instrument)
bars: list[Bar] = wrangler.process(df)

# instrument also has to be written in order to access data for the instrument
catalog.write_data([instrument], "part-{i}")
catalog.write_data(bars, basename_template="part-{i}")

In [None]:
loader.load_csv_to_catalog(symbol_broker, symbol_clean, timeframe, start_date, end_date)

In [None]:
instrument = loader.get_instrument_FOREX(symbol=symbol)

# MT5 Ticks

In [None]:
from datetime import datetime
import MetaTrader5 as mt5
# display data on the MetaTrader 5 package
print("MetaTrader5 package author: ",mt5.__author__)
print("MetaTrader5 package version: ",mt5.__version__)
 
# import the 'pandas' module for displaying data obtained in the tabular form
import pandas as pd
pd.set_option('display.max_columns', 500) # number of columns to be displayed
pd.set_option('display.width', 1500)      # max table width to display
# import pytz module for working with time zone
import pytz
 
# establish connection to MetaTrader 5 terminal
if not mt5.initialize():
    print("initialize() failed, error code =",mt5.last_error())
    quit()
 
# set time zone to UTC
timezone = pytz.timezone("Etc/UTC")
# create 'datetime' objects in UTC time zone to avoid the implementation of a local time zone offset
utc_from = datetime(2020, 1, 10, tzinfo=timezone)
utc_to = datetime(2020, 1, 11, hour = 13, tzinfo=timezone)
# get bars from USDJPY M5 within the interval of 2020.01.10 00:00 - 2020.01.11 13:00 in UTC time zone
rates = mt5.copy_rates_range("XAUUSD", mt5.TIMEFRAME_M1, utc_from, utc_to)
 
# shut down connection to the MetaTrader 5 terminal
mt5.shutdown()
 
# display each element of obtained data in a new line
print("Display obtained data 'as is'")
counter=0
for rate in rates:
    counter+=1
    if counter<=10:
        print(rate)
 
# create DataFrame out of the obtained data
rates_frame = pd.DataFrame(rates)
# convert time in seconds into the 'datetime' format
rates_frame['time']=pd.to_datetime(rates_frame['time'], unit='s')
 
# display data
print("\nDisplay dataframe with data")
print(rates_frame.head(10))

In [None]:
import MetaTrader5 as mt5

mt5.initialize()
maxbars = mt5.terminal_info().maxbars
for count in range(maxbars):
    rates = mt5.copy_rates_from_pos('XAUUSD', mt5.TIMEFRAME_M1, 0, 10000)
    errno, strerror = mt5.last_error()
    if errno != mt5.RES_S_OK:
        print(f"Failed on count={count} with strerror={strerror}")
        break
mt5.shutdown()

In [None]:
rates_frame = pd.DataFrame(rates)
rates_frame['time']=pd.to_datetime(rates_frame['time'], unit='s')
rates_frame

In [None]:
print(mt5.symbols_get("XAUUSD"))
print(mt5.account_info())

In [None]:
pos = mt5.positions_total()
print(pos)

In [38]:
mt5.initialize()
ticks = mt5.copy_ticks_range("XAUUSD", datetime(2024,1,1), datetime(2024,1,1), mt5.COPY_TICKS_ALL)
df = pd.DataFrame(ticks)
df['time'] = pd.to_datetime(df['time'], unit='s')
df.head()

In [None]:
data = loader.copy_rates_range("XAUUSD", timeframe, datetime(2023,1,1), datetime.now())

## Copy Rates to Disk

In [None]:
import MetaTrader5 as mt5
import pandas as pd
from datetime import datetime
import os

def download_and_save_data(symbol, timeframe, start_pos, num_bars, chunk_size, csv_filename, parquet_filename):
    # Initialize MetaTrader 5 connection
    if not mt5.initialize():
        print("initialize() failed, error code =", mt5.last_error())
        return
    
    # Create empty list to store rates
    all_rates = []
    
    # Check if files already exist
    csv_exists = os.path.exists(csv_filename)
    parquet_exists = os.path.exists(parquet_filename)
    
    # Download the data in chunks
    for i in range(0, num_bars, chunk_size):
        chunk_bars = min(chunk_size, num_bars - i)
        
        # Fetch the data for the chunk
        rates = mt5.copy_rates_from_pos(symbol, timeframe, start_pos + i, chunk_bars)
        
        if rates is None:
            print(f"Error retrieving data at position {start_pos + i}.")
            errno, strerror = mt5.last_error()
            if errno != mt5.RES_S_OK:
                print(f"Failed on count={count} with strerror={strerror}")
            break
        
        # Convert to DataFrame
        rates_frame = pd.DataFrame(rates)
        rates_frame['time'] = pd.to_datetime(rates_frame['time'], unit='s')
        
        # Append to CSV
        rates_frame.to_csv(
            csv_filename, 
            mode='a', 
            header=not csv_exists,  # Write header only if file doesn't exist
            index=False
        )
        
        # Append to Parquet
        if parquet_exists:
            rates_frame.to_parquet(
                parquet_filename, 
                engine='fastparquet', 
                append=True,  # Append data if file exists
                index=False
            )
        else:
            rates_frame.to_parquet(
                parquet_filename, 
                engine='fastparquet', 
                index=False
            )
            parquet_exists = True  # Update the flag
        
        # Update progress
        print(f"Saved {len(rates_frame)} rows to CSV and Parquet.")
        csv_exists = True  # Update the flag
    
    # Shutdown MetaTrader 5 connection
    mt5.shutdown()
    print("Download complete.")

# Parameters
symbol = "XAUUSD"
timeframe = mt5.TIMEFRAME_M1  # Daily bars
timeframe_str = "m1"
start_pos = 0
num_bars = 100000000  # Total number of bars you want to download
chunk_size = 1000  # Size of each chunk to download at a time
csv_filename = f"{symbol}_data_{timeframe_str}.csv"
parquet_filename = f"{symbol}_data_{timeframe_str}.parquet"

# Call the function to download and save the data
download_and_save_data(symbol, timeframe, start_pos, num_bars, chunk_size, csv_filename, parquet_filename)


# Load a mt5 csv ticks file

In [None]:
import duckdb

con = duckdb.connect()

In [None]:
df = con.sql("from parquet_scan('XAUUSD_data_m1.parquet')").fetchdf()

import matplotlib.pyplot as plt
plt.plot(df['time'], df['close'])

In [None]:
csv_file = os.path.join(DATA_PATH, "EURUSD.i_201808220305_202312012359.csv")

In [None]:
ts_start = pd.Timestamp("201808220305") 
ts_end = pd.Timestamp("202312012359")

In [None]:
#df: pd.DataFrame = dd.read_csv(csv_file, header=0, sep="\t", parse_dates={'time' : [0, 1]})

In [None]:
# time    <BID>    <ASK>   <LAST> <VOLUME> <FLAGS>
#new_columns = ['time', 'bid', 'ask', 'last', 'volume', 'flags']
#df = df.rename(columns=dict(zip(df.columns, new_columns)))

In [None]:
wrangler = QuoteTickDataWrangler(instrument)
catalog = ParquetDataCatalog(CATALOG_PATH)

In [None]:
# Process the csv: 
chunk_size = 10**6 # ticks per parquet file
i=0

for chunk in pd.read_csv(csv_file, sep='\t', chunksize=chunk_size):
    chunk['time'] = pd.to_datetime(chunk['<DATE>'] + ' ' +  chunk['<TIME>'])
    chunk.set_index('time', inplace=True)
    new_columns = ['date', 'time', 'bid', 'ask', 'last', 'volume', 'flags']
    chunk = chunk.rename(columns=dict(zip(chunk.columns, new_columns)))
    chunk: pd.DataFrame = chunk.drop(['date','time'], axis=1)
    
    # process
    if i == 0:
        catalog.write_data([instrument])
    
    # metadata
    # first timestamp
    ts_start = chunk.index[0]
    # last timestamp
    ts_end = chunk.index[-1]
    
    # i got some weird ouliers in the resulting df and want to log
    # if they are from the filling or from the data
    
    # log nans indexes and print values after the fill
    nans = chunk[chunk.isna().any(axis=1)]
    
    min_bid_before = min(chunk['bid'].values)
    min_ask_before = min(chunk['ask'].values)
    
    print(f"min bid before: {min_bid_before}")
    print(f"min ask before: {min_ask_before}")
    
    if len(nans) > 0:
        print(f"found {len(nans)} nans in chunk {i}")
    
    # important! fill the nans with previous values
    chunk = chunk.fillna(method='bfill')
    chunk = chunk.fillna(method='ffill')
    # many nans cause mid price to be half of actual price because some nans remain and get to be filled with 0
    # fill nans that still remain with following values
    
    min_bid_after = min(chunk['bid'].values)
    min_ask_after = min(chunk['ask'].values)
    
    print(f"min bid after: {min_bid_after}")
    print(f"min ask after: {min_ask_after}")
    
    
    # log min value of ƒilled nans
    min(nans['bid'].values)
    min(nans['ask'].values)
    
    
    ticks: list[QuoteTick] =  wrangler.process(chunk)
    catalog.write_data(ticks, basename_template=f"chunk-{i}")
    print(f"written {chunk_size} ticks to: chunk-{i} {ts_start} {ts_end}")
    i = i+1


In [None]:
# from last big import of EUR/USD
# written 1000000 ticks to: chunk-136 2023-11-13 18:03:49.591000 2023-12-01 23:59:56.437000

In [None]:

pd.Timestamp(1534907125665000000)

In [None]:
pd.Timestamp(1544908782211000000)

In [None]:
ticks = catalog.quote_ticks(instrument_ids=[instrument.symbol.value], start = pd.Timestamp(1534907125665000000), end=pd.Timestamp(1534909125665000000))

In [None]:
import matplotlib.pyplot as plt

In [None]:
asks = [t.ask_price for t in ticks]
asks = np.array(asks)
asks[asks == 0] = np.nan
# fill nans with previous value
asks = pd.Series(asks).fillna(method='ffill').values

ts = [t.ts_event for t in ticks]

In [None]:
# nanos to timestamp
ts = np.array(ts)

In [None]:
ts = pd.to_datetime(ts)

In [None]:
plt.plot(ts, asks, label='ask' )
# plt.plot(bids, x, label='bid')
plt.legend()
plt.show()