In [1]:
%load_ext autoreload
%autoreload 2

from pathlib import Path
from collections import deque
from threading import Thread
import os
import logging
import time
import asyncio

if "freqtrade" not in os.listdir():
    import nest_asyncio
    os.chdir("..")
    nest_asyncio.apply()
    # Uncomment if you want to enable Freqtrade Logging
#     logging.basicConfig(format='%(asctime)-15s - %(message)s')
#     logging.root.setLevel(logging.WARNING)

from freqtrade.commands.data_commands import start_download_data
PATH_MOUNT = Path.cwd().parent / "mount"
PATH_LOCAL_DATADIR = PATH_MOUNT / "data"

In [2]:
class Task:
    
    def __init__(self, pair: str, timeframe: str):
        self.pair = [pair]
        self.timeframe = [timeframe]
        

class Downloader:
    
    def __init__(self, task_list: list, exchange: str, path_datadir: Path):
        self.task_queue = deque(task_list)
        self.print_queue = deque()
        self.exchange = exchange
        self.path_datadir = path_datadir
        print(f"Got {len(self.task_queue)} tasks.")
        
    def start(self, num_threads: int):
        print("Starting threads")
        threads = []
        
        for i in range(num_threads):
            thread = Thread(target=self._download_job, args=(i,), daemon=True)
            thread.start()
            threads.append(thread)
        
        print_thread = Thread(target=self._print_job, daemon=True)
        print_thread.start()
        threads.append(print_thread)
    
        [it.join for it in threads]
    
    def _download_job(self, thread_num: int):
        self._print(f"Starting worker #{thread_num}")
        asyncio.set_event_loop(asyncio.new_event_loop())
        
        while len(self.task_queue) > 0:
            time.sleep(1.0)
            try:
                task = self.task_queue.pop()
            except Exception as e:
                self._print(f"LOOP ERROR at Thread #{thread_num}: {e}.")
                continue
            args = {
                "timeframes": task.timeframe,
                "pairs": task.pair,
                "exchange": self.exchange,
                "days": 2000,
                "datadir": self.path_datadir / exchange,
                "verbosity": logging.ERROR,
            }
            self._print(
                f"Thread #{thread_num} - Download {task.pair} {task.timeframe}. Tasks left: {len(self.task_queue)}"
            )
            done = False
            while not done:
                # Fault Tolerance
                try:
                    start_download_data(args)
                    done = True
                except Exception as e:
                    self._print(f"DOWNLOAD ERROR at Thread #{thread_num}: {e}. ({task.pair} {task.timeframe})")
                    pass
                
    def _print(self, text: str):
        self.print_queue.append(text)

    def _print_job(self):
        while len(self.task_queue) > 0:
            while len(self.print_queue) > 0:
                print(self.print_queue.pop())

In [3]:
# Multi Threaded Download Data
pairs = [
    # Normal, High-cap LONG pairs
    "BTC/USDT","ETH/USDT","ADA/USDT","BNB/USDT","XRP/USDT","DOT/USDT","DOGE/USDT","ZEC/USDT","NANO/USDT","NEO/USDT",
    "UNI/USDT","BCH/USDT","LINK/USDT","LTC/USDT","MATIC/USDT","XLM/USDT","SOL/USDT","ETC/USDT","VET/USDT","THETA/USDT",
    "EOS/USDT","TRX/USDT","FIL/USDT","XMR/USDT","AAVE/USDT","MKR/USDT","ATOM/USDT","ALGO/USDT","CAKE/USDT","KSM/USDT",
    "LUNA/USDT","BTT/USDT","AVAX/USDT","COMP/USDT","DASH/USDT","DCR/USDT","EGLD/USDT","WAVES/USDT","YFI/USDT","XEM/USDT",
    "CHZ/USDT","SUSHI/USDT","HOT/USDT","ZIL/USDT","SNX/USDT","MANA/USDT","ENJ/USDT","HNT/USDT","BAT/USDT","NEAR/USDT",
    "QTUM/USDT","GRT/USDT","ONE/USDT","ONT/USDT","BAKE/USDT","BNT/USDT","ZRX/USDT","FTM/USDT","OMG/USDT","CELO/USDT",
    "ICX/USDT","ANKR/USDT","RVN/USDT","CRV/USDT", "FTT/USDT", "TFUEL/USDT",
    # Leveraged SHORT pairs
    "BTCDOWN/USDT", "BNBDOWN/USDT", "ETHDOWN/USDT", "AAVEDOWN/USDT", "XRPDOWN/USDT", "ADADOWN/USDT", "SUSHIDOWN/USDT",
    "DOTDOWN/USDT", "1INCHDOWN/USDT", "LINKDOWN/USDT", "UNIDOWN/USDT", "SXPDOWN/USDT", "EOSDOWN/USDT", "BCHDOWN/USDT",
    "YFIDOWN/USDT", "XLMDOWN/USDT", "FILDOWN/USDT", "TRXDOWN/USDT", "XTZDOWN/USDT", "LTCDOWN/USDT",
]

# NOTE: DON'T USE 1 MONTH (1M BIG M), it will OVERWRITE 1 MINUTE!!!
timeframes = ["1m", "3m", "5m", "15m", "30m", "1h", "2h", "4h", "6h", "12h", "6h", "8h", "1d", "3d", "1w"]
exchange = "binance"
num_threads = len(pairs) + 5

tasks = []

# Prepare the job
for tf in timeframes:
    for pair in pairs:
        new_task = Task(pair, tf)
        tasks.append(new_task)

downloader = Downloader(tasks, exchange, PATH_LOCAL_DATADIR)
downloader.start(num_threads)

Got 1290 tasks.
Starting threads
Starting worker #90
Starting worker #89
Starting worker #88
Starting worker #87
Starting worker #86
Starting worker #85
Starting worker #84
Starting worker #83
Starting worker #82
Starting worker #81
Starting worker #80
Starting worker #79
Starting worker #78
Starting worker #77
Starting worker #76
Starting worker #75
Starting worker #74
Starting worker #73
Starting worker #72
Starting worker #71
Starting worker #70
Starting worker #69
Starting worker #68
Starting worker #67
Starting worker #66
Starting worker #65
Starting worker #64
Starting worker #63
Starting worker #62
Starting worker #61
Starting worker #60
Starting worker #59
Starting worker #58
Starting worker #57
Starting worker #56
Starting worker #55
Starting worker #54
Starting worker #53
Starting worker #52
Starting worker #51
Starting worker #50
Starting worker #49
Starting worker #48
Starting worker #47
Starting worker #46
Starting worker #45
Starting worker #44
Starting worker #43
Startin

  _data['date'] = _data['date'].astype(np.int64) // 1000 // 1000


Thread #28 - Download ['ENJ/USDT'] ['3d']. Tasks left: 1164
Thread #70 - Download ['HNT/USDT'] ['3d']. Tasks left: 1165
Thread #10 - Download ['BAT/USDT'] ['3d']. Tasks left: 1166
Thread #71 - Download ['MANA/USDT'] ['3d']. Tasks left: 1163
Thread #4 - Download ['SNX/USDT'] ['3d']. Tasks left: 1162
Thread #6 - Download ['WAVES/USDT'] ['3d']. Tasks left: 1155
Thread #16 - Download ['YFI/USDT'] ['3d']. Tasks left: 1156
Thread #78 - Download ['XEM/USDT'] ['3d']. Tasks left: 1157
Thread #13 - Download ['CHZ/USDT'] ['3d']. Tasks left: 1158
Thread #21 - Download ['SUSHI/USDT'] ['3d']. Tasks left: 1159
Thread #38 - Download ['HOT/USDT'] ['3d']. Tasks left: 1160
Thread #1 - Download ['ZIL/USDT'] ['3d']. Tasks left: 1161
Thread #52 - Download ['EGLD/USDT'] ['3d']. Tasks left: 1154
Thread #41 - Download ['DASH/USDT'] ['3d']. Tasks left: 1152
Thread #87 - Download ['DCR/USDT'] ['3d']. Tasks left: 1153
Thread #82 - Download ['AVAX/USDT'] ['3d']. Tasks left: 1150
Thread #72 - Download ['COMP/USDT']