In [32]:
import sys 
sys.path.append("..")
import re 
import threading
from concurrent.futures import ThreadPoolExecutor
from tools25 import *

In [33]:
# 1. 读取需要回测的alpha 
# 2. 用指定线程数回测alpha同时把回测完成的和失败的都保存到文件 
# 文件命名格式: 
# alpha_template_id_alpha_complete_list.txt 模拟成功的 alpha 表达式列表 
# alpha_template_id_alpha_fail_list.txt 模拟失败的 alpha 表达式列表
# alpha_template_id_alpha_error_list.txt 因为其他原因没有模拟的 alpha 表达式列表

In [34]:
dir_path = os.path.dirname(os.getcwd())

import logging
logging.basicConfig(filename='../log/simulate_run_%s.log'%(datetime.datetime.now().strftime('%Y%m%d')), level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


In [None]:
def get_simulated_alpha_list(alpha_template_id):
    simulated_alpha_list = []
    simulated_alpha_file_list = [alpha_template_id + file_ending for file_ending in ['_alpha_complete_list.txt', '_alpha_fail_list.txt']]
    for fn in simulated_alpha_file_list:
        fn_path = os.path.join(dir_path, 'data', fn)
        if os.path.exists(fn_path):
            with open(fn_path, 'r') as f:
                lines = f.readlines()
            simulated_alpha_list += [line.strip() for line in lines]
    return simulated_alpha_list  # 返回同一个alpha模板ID alpha_template_id 之前所有模拟完成和失败的 alpha 表达式列表

In [None]:
# 1. 读取需要回测的alpha 
# 每次只能回测同一个alpha_template_id的alpha表达式，后面 single_simulate 时写入只能指定同一个 alpha_template_id 文件
input_file_list = ['alpha_template_1_padding_list.txt']   
for input_file in input_file_list:
    alpha_template_id = re.match(r'^(alpha_template_\d+)_([^_]+)_([^_]+)', input_file).groups()[0]
    input_file_path = os.path.join(dir_path, 'data', input_file)
    with open(input_file_path, 'r') as f:
        alpha_list = f.readlines()
    simulated_alpha_list = get_simulated_alpha_list(alpha_template_id)
    alpha_list = [json.loads(alpha) for alpha in alpha_list if json.loads(alpha)['regular'] not in simulated_alpha_list]
    print(f"读取到 {len(alpha_list)} 个alpha表达式")
    print(alpha_list[:2])

读取到 82656 个alpha表达式
[{'type': 'REGULAR', 'settings': {'instrumentType': 'EQUITY', 'region': 'USA', 'universe': 'TOP3000', 'delay': 1, 'decay': 1, 'neutralization': 'SUBINDUSTRY', 'truncation': 0.08, 'pasteurization': 'ON', 'unitHandling': 'VERIFY', 'nanHandling': 'ON', 'language': 'FASTEXPR', 'visualization': False}, 'regular': 'group_rank(ts_rank(assets, 252), market)'}, {'type': 'REGULAR', 'settings': {'instrumentType': 'EQUITY', 'region': 'USA', 'universe': 'TOP3000', 'delay': 1, 'decay': 1, 'neutralization': 'SUBINDUSTRY', 'truncation': 0.08, 'pasteurization': 'ON', 'unitHandling': 'VERIFY', 'nanHandling': 'ON', 'language': 'FASTEXPR', 'visualization': False}, 'regular': 'group_rank(ts_rank(assets, 252), industry)'}]


In [None]:
# 2. 用指定线程数回测alpha同时把回测完成的和失败的都保存到文件 

# 创建一个线程锁
file_lock = threading.Lock()

def write_to_file(file_path, content):
    """
    多线程安全地写入文件
    """
    with file_lock:  # 确保只有一个线程可以进入此代码块
        with open(file_path, 'a+') as f:
            f.write(content + '\n')

def single_simulate(alpha, alpha_fail_file, alpha_complete_file, complete_alpha_id_file, alpha_error_file):

    keep_alive = True
    try_cnt = 2

    while keep_alive and try_cnt > 0:
        try:
            simulation_response = s.post('https://api.worldquantbrain.com/simulations', json=alpha)
            simulation_progress_url = simulation_response.headers['Location']
            while True:
                simulation_progress = s.get(simulation_progress_url)
                if simulation_progress.headers.get("Retry-After", 0) == 0:
                    break
                sleep(float(simulation_progress.headers["Retry-After"]))
            
            response = simulation_progress.json()
            status = response.get("status", 0)
            if status != "COMPLETE":
                write_to_file(alpha_fail_file, alpha['regular'])
                logging.warning(f"alpha expression: {alpha['regular']}, alpha id: {response['alpha']}, status: {response['status']}, message: {response['message']} \n {simulation_progress_url}")
            else:
                write_to_file(alpha_complete_file, alpha['regular'])
                write_to_file(complete_alpha_id_file, response["alpha"]) # 用于第三步检查回测成功的alpha是否可以提交 
            keep_alive = False
        except Exception as e:
            s = login()
            if try_cnt == 1:
                logging.error(f"{alpha['regular']} \n Error occurred: {e}")
                write_to_file(alpha_error_file, alpha['regular'])
            try_cnt -= 1
    sleep(1)


In [None]:
process_num = 3 
# alpha_list_sub = alpha_list[: 10] # 测试用，实际使用时可以注释掉
alpha_list_sub = random.sample(alpha_list, 10000)

alpha_fail_file = os.path.join(dir_path, 'data', alpha_template_id + '_alpha_fail_list.txt')  # 回测失败的 alpha 表达式
alpha_complete_file = os.path.join(dir_path, 'data', alpha_template_id + '_alpha_complete_list.txt')  # 回测完成的 alpha 表达式
complete_alpha_id_file = os.path.join(dir_path, 'data', alpha_template_id + '_complete_alpha_id_list.txt')  # 回测完成的 alpha id
alpha_error_file = os.path.join(dir_path, 'data', alpha_template_id + '_alpha_error_list.txt')  # 因为其他原因未回测的 alpha 表达式 

# 使用 ThreadPoolExecutor 来并行处理 alpha 表达式
with ThreadPoolExecutor(max_workers=process_num) as executor:
    futures = [executor.submit(single_simulate, alpha, alpha_fail_file, alpha_complete_file, complete_alpha_id_file, alpha_error_file) for alpha in alpha_list_sub]

