In [None]:
import rpa as r
import os
from datetime import datetime
import calendar
import pandas as pd


In [None]:
class Streak():
    def __init__(self):
        self.r = r

        self.r.init()

        # go to streak
        self.r.url('https://streak.tech')

        # set a maximum 5-minute timeout for user to login
        self.r.timeout(300)

        # use exist() function with XPath to check if logged in
        if not self.r.exist('//*[@id="authScreeens"]/div[2]/section/div[1]/div/p[1]'):
            self.r.dom(
                'alert("Quitting as login is not detected after 5 min. Bye!")')

        print('Logged in to streak.')

        # change back to default of 10 seconds to quit fast on error
        self.r.timeout(5)

    def __del__(self):
        self.r.close()

    def change_month(self, month):
        _sel_backtest_parameter_button = '//*[@id="tour_bt_6"]/div/div/div[6]/button'
        _sel_backtest_parameter_stop_date_button = '//*[@id="tour_bt_6"]/div/div/div[6]/div/div/div[3]/div[1]/div/input'
        _scroll_container = 'window.document.getElementById("tour_bt_6").getElementsByTagName("div")[0].getElementsByTagName("div")[0]'

        print(f'Changing month to {month}.')

        screen_month = self.r.read(
            '//*[@id="tour_bt_6"]/div/div/div[6]/div/div/div[3]/div[2]/div/table/thead/tr[1]/th[2]')
        screen_month_date = datetime.strptime(screen_month, "%B %Y")
        month_date = datetime.strptime(month, "%B %Y")

        change_month = (month_date.year - screen_month_date.year) * \
            12 + month_date.month - screen_month_date.month

        print(f'{change_month} month(s) to change.')

        # Scroll down
        self.r.wait(1)
        self.r.dom(f'{_scroll_container}.scrollBy(0, 1000)')

        # Open Backtest Parameters
        self.r.wait(1)
        self.r.click(_sel_backtest_parameter_button)

        self.r.wait(1)
        self.r.dom(f'{_scroll_container}.scrollBy(0, 1000)')

        # Backtesting stop date
        self.r.wait(1)
        self.r.click(_sel_backtest_parameter_stop_date_button)

        # Change month
        self.r.wait(1)
        if change_month > 0:
            for i in range(abs(change_month)):
                # Go right
                self.r.click(
                    '//*[@id="tour_bt_6"]/div/div/div[6]/div/div/div[3]/div[2]/div/table/thead/tr[1]/th[3]')
        else:
            for i in range(abs(change_month)):
                # Go left
                self.r.click(
                    '//*[@id="tour_bt_6"]/div/div/div[6]/div/div/div[3]/div[2]/div/table/thead/tr[1]/th[1]')

        # Calculate calendar col
        start_month_day, end_month_day = calendar.monthrange(
            month_date.year, month_date.month)
        col = datetime(month_date.year, month_date.month,
                       end_month_day).weekday() + 2
        col = (col - 7) if col > 7 else col
        col = col

        # Calculate calendar row
        row = 5

        # Select last date by row and col
        print(f'Selecting row {row} column {col} from calendar.')
        self.r.wait(1)
        self.r.dom(
            f'window.document.getElementsByTagName("tbody")[3].children[{row - 1}].children[{col - 1}].click()')

        # Check if row needs to be re-selected
        end_day = int(self.r.read(
            '//*[@id="tour_bt_6"]/div/div/div[6]/div/div/div[3]/div[1]/div/input').split(' ')[0])
        if ((end_day + 7) <= end_month_day):
            row = 6
            print(f'Re-selecting row {row} column {col} from calendar.')
            self.r.wait(1)
            self.r.dom(
                f'window.document.getElementsByTagName("tbody")[3].children[{row - 1}].children[{col - 1}].click()')

        # Close Backtest Parameters
        self.r.wait(1)
        self.r.click(_sel_backtest_parameter_button)

        # Scroll up
        self.r.wait(1)
        self.r.dom(f'{_scroll_container}.scrollBy(0, -1000)')

    def change_entry(self, start_hour=9, start_min=30):
        _sel_backtest_parameter_button = '//*[@id="tour_bt_6"]/div/div/div[4]/button'
        _sel_backtest_parameter_start_time = '//*[@id="tour_bt_6"]/div/div/div[4]/div/div/div[3]/div/div[1]/div/input'
        _sel_backtest_parameter_start_time_hour = '//*[@id="tour_bt_6"]/div/div/div[4]/div/div/div[3]/div/div[2]/div/table/tbody/tr/td/div/div[1]/div'
        _sel_backtest_parameter_start_time_hour_up = '//*[@id="tour_bt_6"]/div/div/div[4]/div/div/div[3]/div/div[2]/div/table/tbody/tr/td/div/div[1]/span[1]'
        _sel_backtest_parameter_start_time_hour_down = '//*[@id="tour_bt_6"]/div/div/div[4]/div/div/div[3]/div/div[2]/div/table/tbody/tr/td/div/div[1]/span[2]'
        _sel_backtest_parameter_start_time_min = '//*[@id="tour_bt_6"]/div/div/div[4]/div/div/div[3]/div/div[2]/div/table/tbody/tr/td/div/div[3]/div'
        _sel_backtest_parameter_start_time_min_up = '//*[@id="tour_bt_6"]/div/div/div[4]/div/div/div[3]/div/div[2]/div/table/tbody/tr/td/div/div[3]/span[1]'
        _sel_backtest_parameter_start_time_min_down = '//*[@id="tour_bt_6"]/div/div/div[4]/div/div/div[3]/div/div[2]/div/table/tbody/tr/td/div/div[3]/span[2]'
        _scroll_container = 'window.document.getElementById("tour_bt_6").getElementsByTagName("div")[0].getElementsByTagName("div")[0]'

        print(
            f'Changing start time to {start_hour}:{start_min}.')

        # Scroll down
        self.r.dom(f'{_scroll_container}.scrollBy(0, 1000)')

        # Open Entry Parameters
        self.r.wait(1)
        self.r.click(_sel_backtest_parameter_button)

        self.r.dom(f'{_scroll_container}.scrollBy(0, 1000)')

        self.r.wait(1)
        self.r.click(_sel_backtest_parameter_start_time)

        # Read current values
        current_hour = int(self.r.read(
            _sel_backtest_parameter_start_time_hour))
        current_min = int(self.r.read(_sel_backtest_parameter_start_time_min))

        print(f'Changing current time {current_hour}:{current_min}.')

        change_hour = start_hour - current_hour
        change_min = start_min - current_min

        for i in range(abs(change_hour)):
            self.r.click(_sel_backtest_parameter_start_time_hour_up) if change_hour > 0 else self.r.click(
                _sel_backtest_parameter_start_time_hour_down)

        for i in range(abs(change_min)):
            self.r.click(_sel_backtest_parameter_start_time_min_up) if change_min > 0 else self.r.click(
                _sel_backtest_parameter_start_time_min_down)

        # Close Entry Parameters
        self.r.wait(1)
        self.r.click(_sel_backtest_parameter_button)

        # Scroll up
        self.r.wait(1)
        self.r.dom(f'{_scroll_container}.scrollBy(0, -1000)')
        self.r.wait(1)

    def change_exit(self, stop_loss=None, target_profit=None):
        _sel_backtest_parameter_button = '//*[@id="tour_bt_6"]/div/div/div[5]/button'
        _sel_backtest_parameter_stop_loss = '//*[@id="tour_bt_6"]/div/div/div[5]/div/div/div[2]/div/div/div/input'
        _sel_backtest_parameter_target_profit = '//*[@id="tour_bt_6"]/div/div/div[5]/div/div/div[3]/div/div/div/input'
        _scroll_container = 'window.document.getElementById("tour_bt_6").getElementsByTagName("div")[0].getElementsByTagName("div")[0]'

        print(
            f'Changing stop loss to {stop_loss} and target profit to {target_profit}.')

        if(stop_loss is None and target_profit is None):
            print('Nothing to change. Exiting.')
            return

        # Scroll down
        self.r.dom(f'{_scroll_container}.scrollBy(0, 1000)')

        # Open Exit Parameters
        self.r.wait(1)
        self.r.click(_sel_backtest_parameter_button)

        self.r.dom(f'{_scroll_container}.scrollBy(0, 1000)')

        # Change stop loss
        if stop_loss:
            self.r.wait(1)
            self.r.type(_sel_backtest_parameter_stop_loss,
                        f'[clear]{stop_loss}')
            print(f'Changed stop loss to {stop_loss}.')

        # Change target profit
        if target_profit:
            self.r.wait(1)
            self.r.type(_sel_backtest_parameter_target_profit,
                        f'[clear]{target_profit}')
            print(f'Changed target profit to {target_profit}.')

        # Close Exit Parameters
        self.r.wait(1)
        self.r.click(_sel_backtest_parameter_button)

        # Scroll up
        self.r.wait(1)
        self.r.dom(f'{_scroll_container}.scrollBy(0, -1000)')
        self.r.wait(1)

    def algo(self, uuid):
        print(f'Switched to algo:{uuid}')

        # go to strategy page
        self.r.url(f'https://www.streak.tech/backtests?algo_uuid={uuid}')

    def backtest(self, month, output=None, retry_count=3, backtest_wait=20, check_cache=False):
        # Download prep
        file_id = self.r.read('//*[@id="screen_header"]/div/div/p[2]')
        if output:
            filename = f'data/{month}_{output}.csv'
        else:
            filename = f'data/{month}.csv'

        # Check cache and skip if file found
        if check_cache:
            if os.path.exists(filename):
                print(f'{filename} already exists. Skipping.')
                return

        # Clean existing file
        try:
            os.remove(filename)
        except OSError:
            pass

        # Get Month
        screen_month = self.r.read(
            '//*[@id="tour_bt_6"]/div/div/div[6]/div/div/div[3]/div[2]/div/table/thead/tr[1]/th[2]')
        print(f'Current month on screen:{screen_month}')

        self.r.wait(1)
        self.change_month(month)
        self.r.wait(1)

        # Validate month changed
        screen_month = self.r.read(
            '//*[@id="tour_bt_6"]/div/div/div[6]/div/div/div[3]/div[2]/div/table/thead/tr[1]/th[2]')
        if(screen_month != month):
            print('Failed to change month. Exiting.')
            return

        # Run Backtest
        start_date = self.r.read(
            '//*[@id="tour_bt_6"]/div/div/div[6]/div/div/div[2]/div[1]/div/input')
        end_date = self.r.read(
            '//*[@id="tour_bt_6"]/div/div/div[6]/div/div/div[3]/div[1]/div/input')
        self.r.click('(//*[@id="tour_bt_6"]/div/div/div[8]/button)')
        print(f'Started backtest for {start_date} to {end_date}.')

        # Try download else it is streak error
        for i in range(retry_count):
            print(f'Trying download count {i + 1}.')

            # Let backtest run
            self.r.wait(backtest_wait)

            # click menu
            self.r.hover(
                '//*[@id="authScreeens"]/div[2]/div/div/div/div[3]/button')
            self.r.wait(1)
            self.r.click(
                '//*[@id="authScreeens"]/div[2]/div/div/div/div[3]/button')

            # download all transactions
            self.r.click('//*[@id="main"]/div[4]/div/div[4]/button')
            self.r.wait(2)

            try:
                os.rename(f'{file_id}_transaction_details.csv', filename)
                print(f'Saved file to {filename}')
                self.r.wait(1)
                return
            except FileNotFoundError:
                pass
        print(f'Download failed for {filename}. Exiting.')

    def backtest_year(self, year, output, check_cache=False):
        months = calendar.month_name[1:13]
        [self.backtest(month=f'{month} {year}', output=output, check_cache=check_cache)
         for month in months]

    def gridsearch(self, grid):
        for index, row in grid.iterrows():
            # Set entry time
            self.change_entry(start_hour=row.start_hour,
                              start_min=row.start_min)
            # Set stop loss & target profit
            self.change_exit(stop_loss=row.stop_loss,
                             target_profit=row.target_profit)
            # Run backtest
            self.backtest_year(
                year=row.year, output=f'sl{row.stop_loss}tp{row.target_profit}et{row.start_hour}_{row.start_min}', check_cache=True)


In [None]:
grid = pd.DataFrame([{
    "year": 2020, "start_hour": 9, "start_min": 20, "stop_loss": 5, "target_profit": 300
}, {
    "year": 2021, "start_hour": 9, "start_min": 20, "stop_loss": 5, "target_profit": 300
}])


In [None]:
# Login
streak = Streak()
uuid = 'NGZlOGNkZjEtNTRkMC00ZmZmLTk1ZmMtZjBkMDk1NmYwNTk4'
streak.algo(uuid=uuid)


In [None]:
# Grid search
streak.gridsearch(grid=grid)


In [None]:
# Cleanup
del streak
