Detete this notebook later. It's just for testing the contents of `train_parallel.py`

In [1]:
import json
import papermill as pm
from multiprocessing import Process, cpu_count
import numpy as np
import time
from src.config import market_tickers
import os

In [2]:
# os.chdir(os.path.dirname(os.path.abspath(__file__)))

In [3]:
def split_list(original_list, n):
    ''' splits a list into n sub-lists.
    '''
    original_list = np.array(original_list)
    seeds_sub_lists = [arr.tolist() for arr in np.array_split(original_list, n)]
    return tuple(seeds_sub_lists)

In [42]:
# read in config_file
with open('../src/config/train_config.json') as json_file:  
    config = json.load(json_file)    
all_seeds = config['RANDOM_SEEDS']
all_base_names = config['MODEL_BASE_NAMES']
nb_episodes = config['NB_EPISODES']
save_every = config['SAVE_EVERY']
all_markets = config['MARKETS']
gamma_trades = config['GAMMA_TRADES']
gamma_risks = config['GAMMA_RISKS']
gamma_holds = config['GAMMA_HOLDS']
nb_workers = 10#config['NB_WORKERS']
if nb_workers == -1:
    nb_workers = cpu_count()    
print('number of workers chosen:',nb_workers)

number of workers chosen: 10


In [43]:
config

{'RANDOM_SEEDS': [0],
 'MODEL_BASE_NAMES': ['RL_CNN'],
 'GAMMA_TRADES': [1, 7, 8, 9, 10, 11, 12, 50, 100],
 'GAMMA_RISKS': [1,
  2,
  3,
  6,
  10,
  18,
  32,
  56,
  100,
  178,
  316,
  562,
  1000,
  5000,
  10000],
 'GAMMA_HOLDS': [0.1, 1, 10, 100, 1000, 10000],
 'NB_WORKERS': 45,
 'NB_EPISODES': 200,
 'SAVE_EVERY': 200,
 'MARKETS': {'DOW_30': {'FROM': '2010-01-01', 'UNTIL': '2018-01-01'},
  'NIK_25': {'FROM': '2013-05-01', 'UNTIL': '2018-01-01'},
  'LA_40': {'FROM': '2010-03-01', 'UNTIL': '2014-12-01'}}}

In [44]:
 # split up seeds and gamma_risks into sets
nb_seed_sets = 1 #if (nb_workers >= len(all_seeds)) else len(all_seeds)//nb_workers
nb_gamma_risks_sets = nb_workers//len(all_markets) if (nb_workers >= len(gamma_risks)) else len(gamma_risks)//nb_workers
seed_sets = split_list(all_seeds, nb_seed_sets)
gamma_risks_sets = split_list(gamma_risks, nb_gamma_risks_sets)

# min amount of workers required for workload and config
min_workers = len(all_markets) * len(all_base_names) * len(seed_sets) * len(gamma_risks_sets)
assert min_workers<=nb_workers, f"Number of workers allowed: {nb_workers}. Minimum number of workers required: {min_workers}. Allow more workers!)"


In [47]:
print(f'nb_seed_sets: {nb_seed_sets}')
print(f'nb_gamma_risks_sets: {nb_gamma_risks_sets}')
print(f'seed_sets: {seed_sets}')
print(f'gamma_risks_sets: {gamma_risks_sets}')
print(f'min_workers: {min_workers}')

nb_seed_sets: 1
nb_gamma_risks_sets: 1
seed_sets: ([0],)
gamma_risks_sets: ([1, 2, 3, 6, 10, 18, 32, 56, 100, 178, 316, 562, 1000, 5000, 10000],)
min_workers: 3


In [48]:
len(gamma_risks)//nb_workers

1

In [49]:
len(gamma_risks)

15

In [50]:
nb_workers//len(all_markets)

3

In [None]:





def train_rl(seeds, market_name, tickers, model_base_name, from_date, until_date, 
            gamma_trades, gamma_risks, gamma_holds, nb_episodes, save_every):

    assert market_name in ['TEST_5', 'SP_11', 'DOW_30','NIK_25','LA_40', 'SP_500'], 'must choose a valid market name (or update valid list in assertion).'
    assert model_base_name in ['RL_CNN','RL_str_fcast','RL_all_inp'], 'must choose a valid model base name: "RL_CNN","RL_str_fcast", or "RL_all_inp".'

    # set path to notebooks
    os.chdir(os.path.abspath('../../notebooks/'))

    start = time.time()
    print(f'\tstarting {model_base_name} on {market_name} [{from_date} - {until_date}] with seeds {seeds}.')
    pm.execute_notebook(
                    input_path='train_template.ipynb',
                    output_path=f'slave_notebooks/{model_base_name}_{market_name}_risks_{gamma_risks[0]}_seeds_{seeds[0]}.ipynb',
                    parameters={
                                'SEED_LIST':seeds,
                                'TICKERS':tickers,
                                'MARKET_NAME':market_name,
                                'MODEL_BASE_NAME':model_base_name,
                                'FROM':from_date,
                                'UNTIL':until_date,
                                'NB_EPISODES':nb_episodes,
                                'SAVE_EVERY':save_every,
                                'GAMMA_TRADES':gamma_trades,
                                'GAMMA_RISKS':gamma_risks,
                                'GAMMA_HOLDS':gamma_holds
                               },
                    progress_bar=True,
                   )
    print(f'\tdone with {model_base_name} on {market_name} (risks:{gamma_risks[0]}.. seeds:{seeds[0]}..) in ', round(time.time() - start,2), 'seconds.')


if __name__ == '__main__':

    # change dir to where this file is located 
    # so the context is the same no matter where it's run from
    os.chdir(os.path.dirname(os.path.abspath(__file__)))

    # start timer
    #main_start = time.time()

    if not os.path.exists('../../notebooks/slave_notebooks'):
        os.makedirs('../../notebooks/slave_notebooks')

    # read in config_file
    with open('../config/train_config.json') as json_file:  
        config = json.load(json_file)    
    all_seeds = config['RANDOM_SEEDS']
    all_base_names = config['MODEL_BASE_NAMES']
    nb_episodes = config['NB_EPISODES']
    save_every = config['SAVE_EVERY']
    all_markets = config['MARKETS']
    gamma_trades = config['GAMMA_TRADES']
    gamma_risks = config['GAMMA_RISKS']
    gamma_holds = config['GAMMA_HOLDS']
    nb_workers = config['NB_WORKERS']
    if nb_workers == -1:
        nb_workers = cpu_count()    
    print('number of workers chosen:',nb_workers)

    # split up seeds and gamma_risks into sets
    nb_seed_sets = 1 if (nb_workers >= len(all_seeds)) else len(all_seeds)//nb_workers
    nb_gamma_risks_sets = 1 if (nb_workers >= len(gamma_risks)) else len(gamma_risks)//nb_workers
    seed_sets = split_list(all_seeds, nb_seed_sets)
    gamma_risks_sets = split_list(gamma_risks, nb_gamma_risks_sets)

    # min amount of workers required for workload and config
    min_workers = len(all_markets) * len(all_base_names) * len(seed_sets) * len(gamma_risks_sets)
    assert min_workers<=nb_workers, f"Number of workers allowed: {nb_workers}. Minimum number of workers required: {min_workers}. Allow more workers!)"

    processes = []
    # start training in separate processes
    for market_name, dates in all_markets.items(): # for all markets
        tickers = getattr(market_tickers, market_name+'_TICKER')
        for mod_idx, mod_name in enumerate(all_base_names): # for all models
            for seed_idx in range(len(seed_sets)): # for all seed sets
                for gamma_risk_idx in range(len(gamma_risks_sets)):
                    proc = Process(target=train_rl, args=(
                        seed_sets[seed_idx], 
                        market_name, 
                        tickers, 
                        mod_name, 
                        dates['FROM'], 
                        dates['UNTIL'], 
                        gamma_trades, 
                        gamma_risks_sets[gamma_risk_idx], 
                        gamma_holds,
                        nb_episodes, 
                        save_every))
                    processes.append(proc)
                    proc.start()
            
    for p in processes:
        p.join()
        
    #print()
    #print('all workers done in', round(time.time() - main_start,2), 'seconds.')

In [1]:
import numpy as np

def make_int(x):
    """ if x is an integer (e.g. 2.0), return its integer value without trailing zeroes (e.g. 2)
    """
    if isinstance(x, list) or isinstance(x, tuple):
        return [int(i) if i.is_integer() else i for i in x]
    
    if x.is_integer():
        x = int(x)
    return x

def split_list(original_list, n):
    ''' splits a list into n sub-lists.
    '''
    original_list = np.array(original_list)
    seeds_sub_lists = [arr.tolist() for arr in np.array_split(original_list, n)]
    return tuple(seeds_sub_lists)

In [2]:
my_list = [0.1, 1, 10, 100, 1000, 10000, 100000]
sub_lists = split_list(my_list, 2)
sub_lists

([0.1, 1.0, 10.0, 100.0], [1000.0, 10000.0, 100000.0])

In [4]:
tuple([make_int(lst) for lst in split_list(my_list, 7)])

([0.1], [1], [10], [100], [1000], [10000], [100000])

In [5]:
split_list(my_list, 7)

([0.1], [1.0], [10.0], [100.0], [1000.0], [10000.0], [100000.0])