# Genetic Algorithms and Finance
Attempted implementation of https://drive.google.com/file/d/0B9-kA56h5JCMMW4tU25HRExWb1U/view?usp=sharing

## Data Gathering

In [60]:
from getKey import *
import quandl
import pandas as pd
quandl.ApiConfig.api_key = getKey('quandl.key')

#S&P 500 returns
#SPY data from yahoo finance https://finance.yahoo.com/quote/%5EGSPC/history?period1=-631130400&period2=1544248800&interval=1d&filter=history&frequency=1d
SPY = pd.read_csv('SPY.csv', parse_dates = True)
SPY["Returns"] = (SPY["Close"] - SPY["Close"].shift(1))/SPY["Close"].shift(1)
SPY["Date"] = pd.to_datetime(SPY["Date"])
data = SPY.set_index("Date")
data = data.dropna()
print(type(data.index[0]))
data.head()

<class 'pandas._libs.tslibs.timestamps.Timestamp'>


Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume,Returns
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1950-01-04,16.85,16.85,16.85,16.85,16.85,1890000,0.011405
1950-01-05,16.93,16.93,16.93,16.93,16.93,2550000,0.004748
1950-01-06,16.98,16.98,16.98,16.98,16.98,2010000,0.002953
1950-01-09,17.08,17.08,17.08,17.08,17.08,2520000,0.005889
1950-01-10,17.030001,17.030001,17.030001,17.030001,17.030001,2160000,-0.002927


In [61]:
#True on the index that a crosses over b
def crossover(a, b):
    a.dropna(inplace = True)
    if not isinstance(b, pd.Series):
        b = pd.Series(b, index = a.index)
    else:
        b.dropna(inplace = True)
    minDate = max(a.index[0], b.index[0])
    maxDate = min(a.index[-1], b.index[-1])
    a = a[minDate:maxDate]
    b = b[minDate:maxDate]
    previousPeriodLessThan = a.shift(1) < b.shift(1) #check if in the previous day, a was less than b
    nextPeriodGreaterThan = a > b #in the day after the previous day ("today") a is greater than b
    out = previousPeriodLessThan & nextPeriodGreaterThan
    return out

#True on the index that a crosses under b
def crossunder(a, b):
    a.dropna(inplace = True)
    if not isinstance(b, pd.Series):
        b = pd.Series(b, index = a.index)
    else:
        b.dropna(inplace = True)
    minDate = max(a.index[0], b.index[0])
    maxDate = min(a.index[-1], b.index[-1])
    a = a[minDate:maxDate]
    b = b[minDate:maxDate]
    previousPeriodLessThan = a.shift(1) > b.shift(1) #check if in the previous day, a was less than b
    nextPeriodGreaterThan = a < b #in the day after the previous day ("today") a is greater than b
    out = previousPeriodLessThan & nextPeriodGreaterThan
    return out

Indicators: SMA, MACD, Slow Stochastic, RSI, CCI,
Momentum Oscillator, Price Oscillator, Larry Williams, Bollinger Bands and OBV.

In [62]:
import talib
import pandas as pd
close = data.Close
high = data.High
low = data.Low
volume = data.Volume

indicators = []

class Indicator:
    def __init__(self, name, buy, sell):
        self.name = name
        self.buy = buy.dropna()
        self.sell = sell.dropna()
        self.minDate = buy.index[0]
        self.maxDate = buy.index[-1]
    
    def get(self, side):
        if side == "buy":
            return self.buy
        elif side == "sell":
            return self.sell
        else:
            raise("Side must be either buy or sell")

In [63]:
#SMA
SMA50 = talib.SMA(close, 50)
SMA200 = talib.SMA(close, 200)
buy = crossover(SMA50, SMA200)
sell = crossunder(SMA50, SMA200)
SMA = Indicator("SMA", buy, sell)
indicators.append(SMA)

In [64]:
#MACD
macd, macdsignal, macdhist = talib.MACD(close, fastperiod=12, slowperiod=26, signalperiod=9)
buy = crossover(macd, macdsignal)
sell = crossunder(macd, macdsignal)
MACD = Indicator("MACD", buy, sell)
indicators.append(MACD)

In [65]:
#RSI
rsi = talib.RSI(close, 14)
buy = crossover(rsi, 30)
sell = crossunder(rsi, 70)
RSI = Indicator("RSI", buy, sell)
indicators.append(RSI)

In [66]:
#CCI
CCI = talib.CCI(high, low, close, timeperiod=14)
buy = crossover(CCI, 100)
sell = crossunder(CCI, -100)
CCI = Indicator("CCI", buy, sell)
indicators.append(CCI)

In [67]:
#Momentum Oscillator
MOM = talib.MOM(close, 9)
buy = crossover(MOM, 100)
sell = crossunder(MOM, 100)
MOM = Indicator("MOM", buy, sell)
indicators.append(MOM)

In [68]:
#Price Oscillator
#Couldn't find

In [69]:
#Williams %R
Williams = talib.WILLR(high, low, close, timeperiod=14)
sell = crossover(Williams, -20)
buy = crossunder(Williams, -80)
Williams = Indicator("Williams %R", buy, sell)
indicators.append(Williams)

In [70]:
#Bollinger Bands
upperband, middleband, lowerband = talib.BBANDS(close, timeperiod=5, nbdevup=2, nbdevdn=2, matype=0)
buy = crossover(middleband, lowerband)
sell = crossunder(middleband, upperband)
bollinger = Indicator("Bollinger Bands", buy, sell)
indicators.append(bollinger)

In [71]:
#OBV
OBV = talib.OBV(close, volume)
OBV_signal = talib.SMA(OBV, 20)
buy = crossover(OBV, OBV_signal)
sell = crossunder(OBV, OBV_signal)
OBV = Indicator("OBV", buy, sell)
indicators.append(OBV)

In [72]:
minDate = max([indicator.buy.index[0] for indicator in indicators])
print(type(indicators[0].buy.index[0]))
maxDate = min([indicator.buy.index[-1] for indicator in indicators])
print("minDate " + str(minDate))
print("maxDate " + str(maxDate))
for i in range(len(indicators)):
    print(indicators[i].name)
    print(minDate)
    print(maxDate)
    print(indicators[i].buy.index[0])
    indicators[i].buy = indicators[i].buy[minDate:maxDate]
    indicators[i].sell = indicators[i].sell[minDate:maxDate]

data = data[minDate:maxDate]

<class 'pandas._libs.tslibs.timestamps.Timestamp'>
minDate 1950-10-19 00:00:00
maxDate 2018-12-07 00:00:00
SMA
1950-10-19 00:00:00
2018-12-07 00:00:00
1950-10-19 00:00:00
MACD
1950-10-19 00:00:00
2018-12-07 00:00:00
1950-02-21 00:00:00
RSI
1950-10-19 00:00:00
2018-12-07 00:00:00
1950-01-24 00:00:00
CCI
1950-10-19 00:00:00
2018-12-07 00:00:00
1950-01-23 00:00:00
MOM
1950-10-19 00:00:00
2018-12-07 00:00:00
1950-01-17 00:00:00
Williams %R
1950-10-19 00:00:00
2018-12-07 00:00:00
1950-01-23 00:00:00
Bollinger Bands
1950-10-19 00:00:00
2018-12-07 00:00:00
1950-01-10 00:00:00
OBV
1950-10-19 00:00:00
2018-12-07 00:00:00
1950-01-31 00:00:00


# Genetic Algorithm

In [73]:
class TradingRule:
    #A trading rule has 5 indicators, 5 directions, and 4 operators = 14 components
    NO_INDICATORS = 5
    BOOLEAN_OPERATORS = ["and", "or", "xor"]
    
    def setIndicators(self):
        self.indicators = np.random.randint(len(indicators), size=TradingRule.NO_INDICATORS)
        return
    
    def printIndicators(self):
        for i in self.indicators:
            print(indicators[i].name)
    
    def setBooleanOperators(self):
        self.operators = np.random.randint(len(TradingRule.BOOLEAN_OPERATORS), size=TradingRule.NO_INDICATORS-1)
        self.operatorsStr = []
        for i in self.operators:
            self.operatorsStr.append(TradingRule.BOOLEAN_OPERATORS[i])
        return
    
    def setIndicatorDirections(self):
        self.indicatorDirections = np.random.randint(2, size=TradingRule.NO_INDICATORS)
        self.indicatorDirectionsStr = []
        for i in range(TradingRule.NO_INDICATORS):
            direction = self.indicatorDirections[i]
            if direction == 0:
                self.indicatorDirectionsStr.append("sell")
            elif direction == 1:
                self.indicatorDirectionsStr.append("buy")
            else:
                print(direction)
                raise("Indicator Direction isn't 0 or 1")
        return

    def __init__(self, side):
        self.side = side
        self.setIndicators()
        self.setBooleanOperators()
        self.setIndicatorDirections()
        return
    
    def print(self):
        out = ""
        for i in range(TradingRule.NO_INDICATORS):
            if i < TradingRule.NO_INDICATORS-1:
                out = out + indicators[self.indicators[i]].name + "-" + self.indicatorDirectionsStr[i] + " " + self.operatorsStr[i] + " "
            else:
                out = out + indicators[self.indicators[i]].name + "-" + self.indicatorDirectionsStr[i]
        print(out)
        return
    
    def generateSignals(self):
        out = indicators[self.indicators[0]].get(self.indicatorDirectionsStr[0])
        for i in range(len(self.operatorsStr)):
            operator = self.operatorsStr[i]
            if operator == "and":
                out = out & indicators[self.indicators[1+i]].get(self.indicatorDirectionsStr[1+i])
            elif operator == "or":
                out = out | indicators[self.indicators[1+i]].get(self.indicatorDirectionsStr[1+i])
            elif operator == "xor":
                out = out ^ indicators[self.indicators[1+i]].get(self.indicatorDirectionsStr[1+i])
            else:
                raise("Invalid operator")
        self.signal = out
        return
    
    def getGenotype(self):
        genotype = []
        genotype.append(self.indicators[0])
        genotype.append(self.indicatorDirections[0])
        for i in range(len(self.operators)):
            genotype.append(self.operators[i])
            genotype.append(self.indicators[1+i])
            genotype.append(self.indicatorDirections[1+i])
        return genotype
    
    def createFromGenotype(self, genotype):
        self.indicators = []
        self.indicatorDirections = []
        self.operators = []
        for i in range(TradingRule.NO_INDICATORS-1):
            self.indicators.append(genotype[3*i] % len(indicators))
            self.indicatorDirections.append(genotype[3*i + 1] % 2)
            self.operators.append(genotype[3*i + 2] % len(TradingRule.BOOLEAN_OPERATORS))
        self.indicators.append(genotype[-2] % len(indicators))
        self.indicatorDirections.append(genotype[-1] % 2)
        
        #Create string representations of indicator directions and boolean operators
        self.indicatorDirectionsStr = []
        for i in range(TradingRule.NO_INDICATORS):
            direction = self.indicatorDirections[i]
            if direction == 0:
                self.indicatorDirectionsStr.append("sell")
            elif direction == 1:
                self.indicatorDirectionsStr.append("buy")
            else:
                print(direction)
                raise("Indicator Direction isn't 0 or 1")
                
        self.operatorsStr = []
        for i in self.operators:
            self.operatorsStr.append(TradingRule.BOOLEAN_OPERATORS[i])
            
        self.generateSignals()
        return

In [74]:
class Strategy:
    start_index = 100
    length = 3*255
    START_DATE = data.index[start_index]
    END_DATE = data.index[start_index + length]
    def __init__(self):
        return
    
    def randomize(self):
        self.buyRule = TradingRule("buy")
        self.buyRule.generateSignals()
        self.sellRule = TradingRule("sell")
        self.sellRule.generateSignals()
        
    def printBuyRule(self):
        self.buyRule.print()
        return
    
    def printSellRule(self):
        self.sellRule.print()
        return
    
    def printRules(self):
        self.printBuyRule()
        self.printSellRule()
        return
    
    def evaluate(self):
        performance = pd.DataFrame(index = data[Strategy.START_DATE:Strategy.END_DATE].index, columns = ["Daily Returns", "Cumulative Returns"])
        performance["Daily Returns"][Strategy.START_DATE] = 0
        performance["Cumulative Returns"][Strategy.START_DATE] = 0
        holding = False
        for i in range(len(data[Strategy.START_DATE:Strategy.END_DATE])-1):
            today = performance.index[i]
            tomorrow = performance.index[i+1]
            #NOTE: my convention for the returns column is that the returns on todays date represent the returns from 24 hours ago
            # to now. Therefore, when a stock is bought, TOMORROW'S returns are set.
            if holding:
                #check for sell
                if self.sellRule.signal[today]:
                    performance["Daily Returns"].loc[tomorrow] = 0
                    holding = False
                else:
                    performance["Daily Returns"].loc[tomorrow] = data["Returns"][tomorrow]
            else:
                #check for buy
                if self.buyRule.signal[today]:
                    performance["Daily Returns"].loc[tomorrow] = data["Returns"][tomorrow]
                    holding = True
                else:
                    performance["Daily Returns"].loc[tomorrow] = 0
            
            performance["Cumulative Returns"].loc[tomorrow] = (1+performance["Daily Returns"].loc[tomorrow])*(1+performance["Cumulative Returns"][today]) - 1
        self.performance = performance.dropna()
        return
    
    def evaluateFitness(self):
        if self.performance["Daily Returns"].std() == 0:
            return False
        else:
            sharpe = (255 ** 0.5)*self.performance["Daily Returns"].mean()/self.performance["Daily Returns"].std()
            self.fitness = sharpe
            return True
        
    def getGenotype(self):
        return self.buyRule.getGenotype() + self.sellRule.getGenotype()
    
    def createFromGenotype(self, genotype):
        self.buyRule = TradingRule("buy")
        self.buyRule.createFromGenotype(genotype[:int(len(genotype)/2)])
        self.sellRule = TradingRule('sell')
        self.sellRule.createFromGenotype(genotype[int(len(genotype)/2):])
        self.evaluate()
        return

In [75]:
import numpy as np
def mate(a, b):
    #single crossover point
    a_genotype = a.getGenotype()
    b_genotype = b.getGenotype()
    if len(a_genotype) != len(b_genotype):
        raise("Unequal genotype lengths")
    crossover = np.random.randint(len(a_genotype)-2) + 1
    offspring_genotype = a_genotype[:crossover] + b_genotype[crossover:]
    #Mutations
    for i in range(len(offspring_genotype)):
        length = len(offspring_genotype)
        probability = np.random.randint(length)
        if(probability == 0):
            offspring_genotype[i] = np.random.randint(100)
    offspring = Strategy()
    offspring.createFromGenotype(offspring_genotype)
    return offspring

In [76]:
import numpy as np
import sys, math
np.random.RandomState(seed = 69)

class Population:
    PROPORTION_TO_KILL = 0.5
    STARTING_POPULATION = 20
    CARRYING_CAPACITY = STARTING_POPULATION
    CHILDREN_PER_MATE = 2

    def __init__(self):
        self.members = []
        return
    
    def addMember(self, member):
        self.members.append(member)
        return
    
    def getPopulationFitness(self):
        fitness = {}
        for member in self.members:
            fitness[member] = member.fitness
        return np.mean(list(fitness.values()))
    
    def kill(self):
        self.members.sort(key = lambda x: x.fitness, reverse = True)
        cutoff = math.floor(len(self.members)*Population.PROPORTION_TO_KILL)
        self.members = self.members[:cutoff]
        return
    
    def chooseParents(self):
        for i in range(len(self.members)):
            self.members[i].evaluateFitness()
        self.members.sort(key = lambda x: x.fitness, reverse = False)
        fitness_rank = [0]
        for i in range(1, len(self.members)):
            fitness_rank.append(i+fitness_rank[i-1])
        parent_a = np.random.randint(fitness_rank[-1]+1)
        parent_b = parent_a
        while(parent_b == parent_a):
            parent_b = np.random.randint(fitness_rank[-1]+1)
        for i in range(len(fitness_rank)):
            if parent_a <= fitness_rank[i]:
                parent_a = self.members[i]
                break
        for i in range(len(fitness_rank)):
            if parent_b <= fitness_rank[i]:
                parent_b = self.members[i]
                break
        return parent_a, parent_b
    
    def mate(self):
        while(len(self.members) < Population.CARRYING_CAPACITY):
            for i in range(Population.CHILDREN_PER_MATE):
                parent_a, parent_b = self.chooseParents()
                offspring = mate(parent_a, parent_b)
                if offspring.evaluateFitness():
                    self.members.append(offspring)
        return
    
    def killAndMate(self):
        self.kill()
        self.mate()
        return

In [77]:
import pandas as pd
import matplotlib.pyplot as plt
class GeneticAlgorithm:
    def __init__(self):
        pool = Population()
        while(len(pool.members) < Population.STARTING_POPULATION):
            #print(len(pool.members))
            strat = Strategy()
            strat.randomize()
            strat.evaluate()
            if strat.evaluateFitness():
                pool.members.append(strat)
        self.population = pool
        return
    
    def train(self, generations):
        self.results = pd.DataFrame(columns = ["Fitness", "Population"])
        for generation in range(generations):
            print("Generation " + str(generation))
            print("Fitness: " + str(self.population.getPopulationFitness()))
            self.results = self.results.append({'Fitness': self.population.getPopulationFitness(), 'Population': len(self.population.members)}, ignore_index=True)
            self.population.killAndMate()
        return
    
    def plotFitness(self):
        plt.figure(figsize = (15,7))
        plt.plot(self.results.index, self.results["Fitness"], 'bo')
        plt.ylabel("Sharpe Ratio")
        plt.xlabel("Generation")
        return
            
            

In [None]:
algo = GeneticAlgorithm()
algo.train(100)
algo.plotFitness()

Generation 0
Fitness: 0.7979133133883141


In [79]:
algo.population.members.sort(key=lambda x: x.fitness)
print([x.fitness for x in algo.population.members])

[0.8691803897535855, 1.7930038110192033, 2.0421516265992774, 2.091432333846705, 2.302535084384994, 2.33745060433605, 2.33745060433605, 2.33745060433605, 2.33745060433605, 2.33745060433605, 2.33745060433605, 2.33745060433605, 2.33745060433605, 2.33745060433605, 2.33745060433605, 2.33745060433605, 2.33745060433605, 2.516059136351204, 2.567531654774619, 2.567531654774619]
