In [5]:
# Importamos las librerías y módulos que usaremos
import os
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from statsmodels.tsa.api import SimpleExpSmoothing
from scipy.optimize import curve_fit

# Análisis de tendencias

Primero leemos los archivos y examinamos la serie temporal

In [6]:
# Chequenado y guardando el nombre del archivo para leer
if os.path.exists('../../Data/googleStock/Google_Stock_Train (2010-2022).csv'):
    filename = '../../Data/googleStock/Google_Stock_Train (2010-2022).csv'
# leyendo con pandas
df = pd.read_csv(filename)  # esta línea abre el archivo, lo lee y crea el dataframe
print(df.dtypes)    # Siempre es importante saber qué tipo de dato es cada columna
df['Date'] = pd.to_datetime(df['Date'])
df.plot(x = 'Date',subplots = True)
df.head() # Mirando las primeras columnas

NameError: name 'filename' is not defined

In [None]:
def ma(t, x, ws= 5):
    ''' Funcion para calcular la media movil de una serie temporal (simetrica)
        Entradas:
            - t: timestamps
            - x: serie datos
            - ws: tamaño de la ventana
        Salidas:
            - t: timestamps
            - y: media movil
    '''

    assert ws%2 == 1, 'ws debe ser entero e impar' 
    n_samples = x.shape[0]
    n_windows = n_samples-ws
    tstamps = []
    y = np.zeros(n_windows)
    for i in range(0, n_windows):
        right = n_samples - i 
        left = n_samples - i - ws 
        y[n_windows - i - 1] = x[left:right].mean() 
        tstamps.insert(0,t[(left+right)//2])
    return tstamps, y


: 

In [None]:
# Analizamos la media movil 
ws = 21 # ancho de la ventana temporal
m_averaged = []
for c in list(df.columns)[1:]:
    t, d = ma(df['Date'],df[c].to_numpy(), ws=ws)
    m_averaged.append(d)
m_averaged = np.array(m_averaged).T
labels = list(df.columns)[1:]
print(t)

: 

In [None]:
fig, axs = plt.subplots(len(labels),1, figsize=(10,15))
for i in range(len(labels)):
    axs[i].plot(t, m_averaged[:,i], label = labels[i])
    axs[i].legend()
plt.show()

: 

## Analizando tendencias lineales

In [None]:
def lineal(x, a, b):
    return a*x + b

def cuadratica(x,a,b,c):
    return a*x**2 + b*x + c


: 

In [None]:
# Para fitear funciones necesitamos pasar los timestamps a algún formato numérico
# Es usual utilizar unix epochs para trabajar con numeros de punto flotante
print(f'antes {type(t[0])}')
# create test data
dates = pd.to_datetime(t)
# calculate unix datetime
t_epochs =(dates - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s')
print(f'despues {type(t_epochs[0])}')

: 

In [None]:
def fit_batch(abcisas, data, function):
    assert type(data) == type(np.array([])), 'data tiene que ser un vector de numpy'
    if data.shape[0] != data.size:
        n_series = data.shape[1]
    else:
        n_series = data.size
    opt_vals = []
    errors = []
    for i in range(n_series):
        popt, pcov = curve_fit(function, abcisas, data[:,i])
        opt_vals.append(popt)
        errors.append(np.diag(pcov))
    
    return opt_vals, errors

def mae(y, y_hat):
    return np.abs(y-y_hat).mean(axis = 0)

def mse(y, y_hat):
    return ((y-y_hat)**2).mean(axis = 0)

: 

In [None]:
opt_vals, errors = fit_batch(t_epochs, m_averaged, lineal)
print(f'parametros óptimos: {opt_vals}')
print(f'traza de la matriz de covarianza: {errors}')

: 

In [None]:
fig, axs = plt.subplots(len(labels),1, figsize=(5,10))
for i in range(len(labels)):
    axs[i].plot(df['Date'],df[labels[i]], label = labels[i] + '- media movil ' + str(ws))
    axs[i].plot(t,lineal(t_epochs,*opt_vals[i]), label = 'linear trend')
    axs[i].legend()


: 

In [None]:
# para evaluar el error del fiteo usamos MAE y MSE sobre todos los datos
# evaluamos la tendencia lineal en los valores de las distribuciones
dates = pd.to_datetime(df['Date'])
t_epochs =(dates - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s')

predict = np.array([lineal(t_epochs,*opt_vals[i]) for i in range(len(labels))])
print(f'MAE para ajuste lineal de la MA(21):\n{mae(df[labels],predict.T)}')
print(f'Sqrt(MSE) para ajuste lineal de la MA(21):\n{np.sqrt(mse(df[labels],predict.T))}')


: 

In [None]:
# Se puede hacer lo mismo a partir de un fiteo cuadrático

# create test data
dates = pd.to_datetime(t)
# calculate unix datetime
t_epochs =(dates - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s')
opt_vals, errors = fit_batch(t_epochs, m_averaged, cuadratica)
print(f'parametros óptimos: {opt_vals}')
print(f'traza de la matriz de covarianza: {errors}')

# el dibujito
fig, axs = plt.subplots(len(labels),1, figsize=(5,10))
for i in range(len(labels)):
    axs[i].plot(df['Date'],df[labels[i]], label = labels[i] + '- media movil ' + str(ws))
    axs[i].plot(t,cuadratica(t_epochs,*opt_vals[i]), label = 'linear trend')
    axs[i].legend()

# para evaluar el error del fiteo usamos MAE y MSE sobre todos los datos
dates = pd.to_datetime(df['Date'])
t_epochs =(dates - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s')

predict = np.array([cuadratica(t_epochs,*opt_vals[i]) for i in range(len(labels))])
print(f'MAE para ajuste lineal de la MA(21):\n{mae(df[labels],predict.T)}')
print(f'Sqrt(MSE) para ajuste lineal de la MA(21):\n{np.sqrt(mse(df[labels],predict.T))}')


: 

Se observa que los errores se reducen bastante. Se pueden ensayar diferentes tipos de funciones, aquí mostraremos otro truco, que consiste en tomar el logaritmo natural de la distribución para ajustarla

In [None]:
log_ma = np.log(m_averaged)
fig, axs = plt.subplots(len(labels),1, figsize=(5,10))
for i in range(len(labels)):
    axs[i].plot(t, log_ma[:,i], label = labels[i])
    axs[i].legend()

: 

En las figuras previas se puede ver que un ajuste lineal puede ser útil

In [None]:
# create test data
dates = pd.to_datetime(t)
# calculate unix datetime
t_epochs =(dates - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s')
opt_vals, errors = fit_batch(t_epochs, log_ma, lineal)
print(f'parametros óptimos: {opt_vals}')
print(f'traza de la matriz de covarianza: {errors}')

# el dibujito
fig, axs = plt.subplots(len(labels),1, figsize=(5,10))
for i in range(len(labels)):
    axs[i].plot(dates, log_ma[:,i], label = labels[i] + '- log media movil ' + str(ws))
    axs[i].plot(t,lineal(t_epochs,*opt_vals[i]), label = 'linear trend')
    axs[i].legend()

# para evaluar el error del fiteo usamos MAE y MSE sobre todos los datos
dates = pd.to_datetime(df['Date'])
t_epochs =(dates - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s')

predict = np.array([np.exp(lineal(t_epochs,*opt_vals[i])) for i in range(len(labels))])
print(f'MAE para ajuste lineal de la MA(21):\n{mae(df[labels],predict.T)}')
print(f'Sqrt(MSE) para ajuste lineal de la MA(21):\n{np.sqrt(mse(df[labels],predict.T))}')


: 

y se puede observar que este ajuste proporciona un error similar al obtenido con un ajuste cuadrático.

Finalmente, otra manera de extraer estacionalidad es a partir de derivadas. Para esto cabe recordar que la derivada de una función polinómica reduce en uno el orden del polinomio.

In [None]:
def deriv_fwd(x, y):
    return (y[1:]-y[:-1])/(x[1:]-x[:-1])

: 

In [None]:
# create test data
dates = pd.to_datetime(df['Date'])
# calculate unix datetime
t_epochs =(dates - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s')

derivs = np.array([deriv_fwd(t_epochs.to_numpy(), df[l].to_numpy()) for l in labels]).T
# el dibujito
fig, axs = plt.subplots(len(labels),1, figsize=(5,10))
for i in range(len(labels)):
    axs[i].plot(dates.to_numpy()[1:], derivs[:,i], label = labels[i] )
    axs[i].legend()

# predict = np.array([np.exp(lineal(t_epochs,*opt_vals[i])) for i in range(len(labels))])
# print(f'MAE para ajuste lineal de la MA(21):\n{mae(df[labels],predict.T)}')
# print(f'Sqrt(MSE) para ajuste lineal de la MA(21):\n{np.sqrt(mse(df[labels],predict.T))}')

: 

Las derivadas muestran un comportamiento plano, con una desviación estándar que varía en el tiempo. Se puede ajustar la tendencia lineal sacando la media de esta distribución, y luego hacer alguna transformación que permita ajustar la desviación estándar