## Búsqueda Local Estocástica
### Optimización de parámetros de indicadores técnicos

Este cuaderno muestra como utilizar búsqueda local estocástica para encontrar el mejor conjunto de parámetros del oscilador estocástico para maximizar el rendimiento que habría obtenido sobre un histórico de precios

In [None]:
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt
import itertools

### Datos 
Trajabajeremos sobre la serie de precios de un activo. Ej. Inditex

In [None]:
import pickle
with open('../data/stock_data.pkl', 'rb') as handle:
    stock_data = pickle.load(handle)

In [None]:
stock_series = stock_data['ITX'].close

In [None]:
stock_series.plot()

___

**Oscilador estocástico**

In [None]:
def min_max_scale(s):
    """
    Calcula para un vector s su normalización en el rango [0, 1]
    """
    return (s[-1] - np.min(s)) / (np.max(s) - np.min(s))

In [None]:
def stochastic_osc(s, win, lag):
    """
    Calcula el valor del oscilador estocástico
     1. calcula la posición relativa del precio en una ventana temporal
     2. Suaviza la señal anterior haciendo una media móvil, con tamaño inferior al de la ventana
    """
    so_raw = s.rolling(window=win).apply(min_max_scale, raw=True)
    so = so_raw.rolling(window=lag).mean()
    return so  

In [None]:
def stochastic_osc_signal(s, win=14, lag=3, obought=0.8, osold=0.2):
    """
    Calcula el estado de inversión "dentro" (1) o "fuera" (0) siguiendo
    la señal del oscilador estocástico. Esto es:
    - Se compra cuando se cruza de forma ascendente el umbral de sobreventa
    - Se vende cuando se cruza de forma descendente el umbral de sobrecompra
    """
    states = pd.Series(np.zeros(s.shape[0]), index=s.index)
    so = stochastic_osc(s, win, lag)
    if so.iloc[0] > osold:
        states.iloc[:1] = 1
        curr = 1
    else:
        curr = 0
    
    for i in range(s.shape[0] - 2):
        if so.iloc[i] < osold and so.iloc[i+1] >= osold:
            curr = 1
        elif so.iloc[i] > obought and so.iloc[i+1] <= obought: 
            curr = 0
        states.iloc[i+2] = curr
    
    return states

def state_returns(price, states):
    """ Calcula para una serie y unos estados de 
    estar dentro fuera, cual es el retorno total
    correspondiente.
    Se debe tener el mismo Index de pandas 
    """
    log_ret = np.log(price).diff()
    log_ret.iloc[0] = 0
    
    in_rets = log_ret * states
    total_log_ret = in_rets.sum()
    simple_ret = np.exp(total_log_ret) - 1
    return simple_ret
    

In [None]:
def backtest_so_returns(vseries, win=14, lag=3, obought=0.8, osold=0.2):
    """
    A partir de una serie de precios, calcula el rendimiento del oscilador estocástico 
    con los parámetros correspondientes
    """
    f_states = stochastic_osc_signal(
        vseries, 
        win=win, 
        lag=lag,
        obought=obought,
        osold=osold)
    so_return = state_returns(vseries, f_states)
    return so_return

In [None]:
backtest_so_returns(stock_series)

### Búsqueda Aleatoria
Primero realizaremos una búsqueda aleatoria en un espacio de parámetros para ver las capacidades
de una exploración **sin un algoritmo** de búsqueda especializado.

In [None]:
obought_space = np.linspace(0.51,0.98,48)
obought_space

In [None]:
osold_space = np.linspace(0.02,0.49,48)
osold_space

In [None]:
win_space = np.arange(10,120)
win_space

In [None]:
lag_space = np.arange(1,50)
lag_space

In [None]:
# numero de estados posibles en el espacio de parametros que hemos definido
win_space.shape[0]*lag_space.shape[0]*obought_space.shape[0]*osold_space.shape[0]

In [None]:
# A partir del espacio de parámetros que hemos definido anteriormente
# podemos generar set de parametros aleatorios, solo con la condición
# que el lag sea menor que la ventana
def rand_state():
    win = np.random.choice(win_space)
    avail_lag = lag_space[lag_space < win]
    
    state =  {
        'win': win,
        'lag': np.random.choice(avail_lag), 
        'obought':np.random.choice(obought_space), 
        'osold':np.random.choice(osold_space)
    }
    return state

In [None]:
rstate = rand_state()
rstate 

In [None]:
# rendimiento sobre los parámetros aleatorios
backtest_so_returns(stock_series, **rstate)

Generaremos estados n aleatorios y guardaremos el máximo obtenido hasta el momento

In [None]:
n = 200
best_return = -1
for i in range(n):
    i_state = rand_state()
    i_return = backtest_so_returns(stock_series, **i_state)
    if i_return > best_return:
        best_return = i_return
        print (f'{i}: return:{best_return} -->{i_state}')
    

___

### Búsqueda Local Estocástica
En este apartado utilizaremos técnicas de SLS para mejorar la exploración sobre 
el conjunto de parámetros

Primero tenemos que definir una función que calcula la "vencidad" de un nodo, esto es,
el conjunto de estados a los que podríamos movernos en un paso de la búsqueda

In [None]:
def sucesores(params):
    min_win, max_win=10, 120
    min_lag, max_lag=1, 50
    min_ob, max_ob=0.51,0.98
    min_os, max_os=0.02,0.49
    
    
     # win=14, lag=3, obought=0.8, osold=0.2
    succ = []
    s = params.copy()
    if s['win'] < max_win:
        s['win'] += 1
        succ.append(s)
        
    s = params.copy()
    if s['win'] > min_win:
        s['win'] -= 1
        succ.append(s)
        
    s = params.copy()
    if s['lag'] < max_lag and s['lag']+1 < s['win']:
        s['lag'] += 1
        succ.append(s)
        
    s = params.copy()
    if s['lag'] < max_lag:
        s['lag'] -= 1
        succ.append(s)
    
    s = params.copy()
    if s['obought'] < max_ob:
        s['obought'] += 0.02
        succ.append(s)
    
    s = params.copy()
    if s['obought'] > min_ob:
        s['obought'] -= 0.02
        succ.append(s)
    
    s = params.copy()
    if s['osold'] < max_os:
        s['osold'] += 0.02
        succ.append(s)
    
    s = params.copy()
    if s['osold'] > min_os:
        s['osold'] -= 0.02
        succ.append(s)
        
    return succ

In [None]:
root = {
    'win':15,
    'lag':3, 
    'obought':0.8, 
    'osold':0.2
}

In [None]:
sucesores(root)

In [None]:
def iterative_improvement_so(vseries, params, niter=30):
    state = params
    hbase = backtest_so_returns(vseries, **state)
    i = 0
    while i < niter:
        i = i +1
        succ = sucesores(state)
        h_vals = [backtest_so_returns(vseries, **c_i) for c_i in succ]
        best_idx = h_vals.index(max(h_vals))
        best_succ = succ[best_idx]
        best_h = h_vals[best_idx]
        #print(f"best_h {round(best_h,3)}")
        if best_h > hbase:
            state = best_succ
            hbase = best_h
            print (f'{i}: return:{best_h} -->{best_succ}')
        else:
            #print("no better successor")
            break
            
    return state, hbase


In [None]:
init_state = rand_state()
iterative_improvement_so(stock_series, init_state, niter=30)

In [None]:
def sortedrestart_iimprovements_so(vseries, restarts=20):
    init_states = [rand_state() for i in range(restarts)]  
    h_vals = [backtest_so_returns(vseries, **c_i) for c_i in init_states]
    h_sorted_idx = sorted(range(len(h_vals)), 
                          key=lambda k: h_vals[k],
                          reverse=True)
    
    global_best = -1 

    for i, idx_sorted in enumerate(h_sorted_idx):
        iparams = init_states[idx_sorted]
        print(f"New restart {i}...")
        best_restart, h_restart = iterative_improvement_so(stock_series, iparams, 10)
        
        if h_restart > global_best:
            global_best = h_restart
            global_state = best_restart
            print (f'{i}: Global improvement, return:{h_restart} -->{best_restart}')
        
    return global_state, global_best


In [None]:
res_res = sortedrestart_iimprovements_so(stock_series, restarts=25)    


In [None]:
res_res