In [2]:
import operator
import random
import numpy as np
import pandas as pd
from scipy.stats import pearsonr
from deap import base, creator, tools, gp
import operator
from functools import partial

In [11]:
# Load stock data 
data = pd.read_csv('../DownloadData/csv/OneMin_KLine.csv', index_col=0)
data['trade_time'] = pd.to_datetime(data['trade_time'])
data['security_code'] = data['security_code'].astype(str).str.zfill(6)

# Obtain  the return for the next n minutes
n = 5
data['new_ret'] = data.groupby('security_code')['ret'].shift(-n)
data = data[~data.isna().any(axis=1)]

In [12]:
infos = {}
cols = ['open', 'close', 'high', 'low']
for col in cols:
    infos[col] = pd.pivot_table(data, values=col, index=["trade_time"], columns=["security_code"])
    infos[col] = infos[col] / 1000000

In [18]:
# creating a primitive set
pset = gp.PrimitiveSet("MAIN", len(infos))
pset.addPrimitive(np.add, 2)
pset.addPrimitive(np.subtract, 2)
pset.addPrimitive(np.multiply, 2)
pset.addPrimitive(np.abs, 1)
pset.addPrimitive(np.negative, 1)
pset.addEphemeralConstant("randUni", partial(random.uniform, -1, 1))
pset.renameArguments(ARG0='open')
pset.renameArguments(ARG1='close')
pset.renameArguments(ARG2='high')
pset.renameArguments(ARG3='low')


In [27]:
# evaluate the fitness for each indivisual
# Define evaluation function based on IC
def evaluate_factor(individual, pset):
    print(individual)
    func = gp.compile(individual, pset)
    factor_values = func(*[infos[i] for i in cols])
    if not isinstance(factor_values, pd.DataFrame):
        return (-1,)
    
    df1 = pd.melt(factor_values.reset_index(), id_vars='trade_time', value_vars=factor_values.columns.tolist(), var_name='security_code', value_name='value', )
    df1.sort_values(by=['security_code', 'trade_time'], ascending=[True, True])
    df1 = df1[~df1['value'].isna()]
    
    df1 = pd.merge(df1, data[['trade_time', 'security_code', 'new_ret']], on=['trade_time', 'security_code'], how='inner')
    
    result = pearsonr(df1['value'], df1['new_ret'])
    return (abs(result[0]),)
    # expression = ''.join(individual)
    # factor_values = eval(expression, {'__builtins__': None}, data)  # Evaluate expression using data
    # ic_value, _ = pearsonr(factor_values, data['returns'])  # Calculate Pearson correlation coefficient (IC value)
    # return ic_value,
    

# DEAP setup
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
# creator.create("Individual", list, fitness=creator.FitnessMin)
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax)



In [28]:
from collections import defaultdict
__type__ = object
def cxOnePoint(ind1, ind2):
    """Randomly select crossover point in each individual and exchange each
    subtree with the point as root between each individual.

    :param ind1: First tree participating in the crossover.
    :param ind2: Second tree participating in the crossover.
    :returns: A tuple of two trees.
    """
    if len(ind1) < 2 or len(ind2) < 2:
        # No crossover on single node tree
        return ind1, ind2

    # List all available primitive types in each individual
    types1 = defaultdict(list)
    types2 = defaultdict(list)
    if ind1.root.ret == __type__:
        # Not STGP optimization
        types1[__type__] = list(range(1, len(ind1)))
        types2[__type__] = list(range(1, len(ind2)))
        common_types = [__type__]
    else:
        for idx, node in enumerate(ind1[1:], 1):
            types1[node.ret].append(idx)
        for idx, node in enumerate(ind2[1:], 1):
            types2[node.ret].append(idx)
        common_types = set(types1.keys()).intersection(set(types2.keys()))

    if len(common_types) > 0:
        type_ = random.choice(list(common_types))

        index1 = random.choice(types1[type_])
        index2 = random.choice(types2[type_])

        slice1 = ind1.searchSubtree(index1)
        slice2 = ind2.searchSubtree(index2)
        ind1[slice1], ind2[slice2] = ind2[slice2], ind1[slice1]

    return ind1, ind2

In [29]:
def initIndividual(container, func, size):
    return container(gp.PrimitiveTree(func()) for _ in range(size))

toolbox = base.Toolbox()
toolbox.register("expr", gp.genGrow, pset=pset, min_=1, max_=2)
# toolbox.register("individual", initIndividual, creator.Individual, toolbox.expr, size=3)  # 假设我们创建3个特征
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("compile", gp.compile, pset=pset)


toolbox.register("evaluate", evaluate_factor, pset=pset)
toolbox.register("select", tools.selTournament, tournsize=3)
toolbox.register("mate", cxOnePoint)
toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)

In [30]:

def main():
    pop_size = 10
    n_gen = 2

    population = toolbox.population(n=pop_size)

    print("Start of evolution")

    # Evaluate the entire population
    fitnesses = list(map(toolbox.evaluate, population))
    for ind, fit in zip(population, fitnesses):
        ind.fitness.values = fit

    print("  Evaluated %i individuals" % len(population))

    for gen in range(n_gen):
        print("-- Generation %i --" % gen)

        # Select the next generation individuals
        offspring = toolbox.select(population, len(population))

        # Clone the selected individuals
        offspring = list(map(toolbox.clone, offspring))

        # Apply crossover and mutation on the offspring
        for child1, child2 in zip(offspring[::2], offspring[1::2]):
            if random.random() < 0.5:  # Crossover probability
                toolbox.mate(child1, child2)
                del child1.fitness.values
                del child2.fitness.values

        for mutant in offspring:
            if random.random() < 0.2:  # Mutation probability
                toolbox.mutate(mutant)
                del mutant.fitness.values

        # Evaluate the individuals with an invalid fitness
        invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
        fitnesses = map(toolbox.evaluate, invalid_ind)
        for ind, fit in zip(invalid_ind, fitnesses):
            ind.fitness.values = fit

        print("  Evaluated %i individuals" % len(invalid_ind))

        # The population is entirely replaced by the offspring
        population[:] = offspring

        # Gather all the fitnesses in one list and print the stats
        fits = [ind.fitness.values[0] for ind in population]

        length = len(population)
        mean = np.mean(fits)
        std = np.std(fits)

        print("  Min %s" % min(fits))
        print("  Max %s" % max(fits))
        print("  Avg %s" % mean)
        print("  Std %s" % std)

    print("-- End of evolution --")

    best_ind = tools.selBest(population, 1)[0]
    print("Best individual is %s, %s" % (best_ind, best_ind.fitness.values))

if __name__ == "__main__":
    main()


Start of evolution
negative(add(-0.8771294668090275, close))
negative(subtract(-0.3919954687584184, high))
multiply(high, 0.7740584152458749)
add(high, high)
add(negative(high), -0.1287364409778311)
add(0.5455727598150109, subtract(high, -0.08538405452079578))
subtract(add(low, high), absolute(low))
absolute(add(open, low))
add(close, open)
absolute(high)
  Evaluated 10 individuals
-- Generation 0 --
negative(subtract(-0.3919954687584184, high))
add(high, high)
absolute(close)
add(high, open)
negative(subtract(-0.3919954687584184, -0.2336994004270132))
add(0.5455727598150109, subtract(high, -0.3919954687584184))
negative(subtract(-0.08538405452079578, high))
absolute(low)
  Evaluated 8 individuals
  Min -1.0
  Max 0.007790037826016881
  Avg -0.09301650379483599
  Std 0.30232783774272637
-- Generation 1 --
subtract(add(low, high), absolute(close))
absolute(add(low, close))
subtract(add(high, negative(negative(open))), absolute(low))
add(low, high)
add(0.5455727598150109, subtract(high, 