# Redes neuronales aplicadas a series financieras
Este notebook compara tres modelos distintos de redes neuronales aplicadas a la predicción de series temporales financieras. Incluye técnicas de normalización, ventanas deslizantes y distintas arquitecturas.

**Modelos incluidos:**
- LSTM clásico
- CNN 1D
- Híbrido CNN + LSTM

## 1. Modelo LSTM para predicción bursátil

# Stock price prediction using LSTM neural network and Tensorflow
What do we need here:
1. Load data
2. Scale data for machine learning model
3. Setup neural network
4. Compile model
5. Teach neural netowk and fit this
6. Use the model for prediction
7. Draw the results chart

In [None]:
# Requirements
!pip install yahoo_fin

In [None]:
import numpy as np
import time as tm
import datetime as dt
import tensorflow as tf

# Data preparation
from yahoo_fin import stock_info as yf #acceder a información financiera y datos de acciones de Yahoo Finance de manera programática
from sklearn.preprocessing import MinMaxScaler #escalar o normalizar datos dentro de un rango específico
from collections import deque #estructura de datos similar a una lista, pero optimizada para realizar operaciones de inserción y eliminación en ambos extremos con mayor eficiencia

# AI
from keras.models import Sequential
from keras.layers import Dense, LSTM, Dropout

# Graphics library
import matplotlib.pyplot as plt

In [None]:
# SETTINGS

# Window size or the sequence length, 7 (1 week)
N_STEPS = 7

# Lookup steps, 1 is the next day, 3 = after tomorrow
LOOKUP_STEPS = [1, 2, 3]

# Stock ticker, GOOGL
STOCK = 'GOOGL'

# Current date
date_now = tm.strftime('%Y-%m-%d')
date_3_years_back = (dt.date.today() - dt.timedelta(days=1104)).strftime('%Y-%m-%d')

In [None]:
# LOAD DATA
# from yahoo_fin
# for 1104 bars with interval = 1d (one day)
init_df = yf.get_data(
    STOCK,
    start_date=date_3_years_back,
    end_date=date_now,
    interval='1d')

In [None]:
init_df

In [None]:
# remove columns which our neural network will not use
init_df = init_df.drop(['open', 'high', 'low', 'adjclose', 'ticker', 'volume'], axis=1)
# create the column 'date' based on index column
init_df['date'] = init_df.index

In [None]:
init_df

In [None]:
# Let's preliminary see our data on the graphic
plt.style.use(style='ggplot')
plt.figure(figsize=(16,10))
plt.plot(init_df['close'][-200:])
plt.xlabel('days')
plt.ylabel('price')
plt.legend([f'Actual price for {STOCK}'])
plt.show()

In [None]:
# Scale data for ML engine
scaler = MinMaxScaler()
init_df['scaled_close'] = scaler.fit_transform(np.expand_dims(init_df['close'].values, axis=1))
#Crea una nueva columna en el DataFrame init_df llamada scaled_close, que contiene los valores escalados entre 0 y 1.

In [None]:
init_df

In [None]:
#PREPARAMOS LOS DATOS
def PrepareData(days):
  #days: Número de días hacia adelante que se quiere predecir
  df = init_df.copy()
  #copia del DataFrame original para trabajar sin modificar los datos originales
  df['future'] = df['scaled_close'].shift(-days)
  #Crea una nueva columna, future, que contiene los valores futuros de scaled_close,
  # desplazados hacia arriba por el número de días definido en days. Esto
  # representa los valores objetivo
  last_sequence = np.array(df[['scaled_close']].tail(days))
  # Guarda los últimos days valores de la columna scaled_close como un array
  # NumPy
  df.dropna(inplace=True)
  # Elimina las filas con valores NaN, que ocurren en las últimas filas debido
  # al desplazamiento con shift
  sequence_data = []
  sequences = deque(maxlen=N_STEPS)

  for entry, target in zip(df[['scaled_close'] + ['date']].values, df['future'].values):
      sequences.append(entry)
      if len(sequences) == N_STEPS:
          sequence_data.append([np.array(sequences), target])
  #sequence_data: Una lista con pares [secuencia, valor_futuro] para entrenar el modelo

  last_sequence = list([s[:len(['scaled_close'])] for s in sequences]) + list(last_sequence)
  last_sequence = np.array(last_sequence).astype(np.float32)
  #last_sequence: La última ventana deslizante para predecir valores futuros después del entrenamiento

  # construct the X's and Y's
  X, Y = [], []
  for seq, target in sequence_data:
      X.append(seq)
      Y.append(target)

  # convert to numpy arrays
  X = np.array(X)
  Y = np.array(Y)

  return df, last_sequence, X, Y

In [None]:
PrepareData(3) # 3 days

In [None]:
def GetTrainedModel(x_train, y_train):
  model = Sequential()
  model.add(LSTM(60, return_sequences=True, input_shape=(N_STEPS, len(['scaled_close']))))
  model.add(Dropout(0.3))
  model.add(LSTM(120, return_sequences=False))
  model.add(Dropout(0.3))
  model.add(Dense(20))
  model.add(Dense(1))

  BATCH_SIZE = 8
  EPOCHS = 80

  model.compile(loss='mean_squared_error', optimizer='adam')

  model.fit(x_train, y_train,
            batch_size=BATCH_SIZE,
            epochs=EPOCHS,
            verbose=1)

  model.summary()

  return model

In [None]:
# GET PREDICTIONS
predictions = []

for step in LOOKUP_STEPS:
  df, last_sequence, x_train, y_train = PrepareData(step)
  x_train = x_train[:, :, :len(['scaled_close'])].astype(np.float32)

  model = GetTrainedModel(x_train, y_train)

  last_sequence = last_sequence[-N_STEPS:]
  last_sequence = np.expand_dims(last_sequence, axis=0)
  prediction = model.predict(last_sequence)
  predicted_price = scaler.inverse_transform(prediction)[0][0]

  predictions.append(round(float(predicted_price), 2))

In [None]:
if bool(predictions) == True and len(predictions) > 0:
  #Comprueba si la lista predictions no está vacía
  predictions_list = [str(d)+'$' for d in predictions]
  #Toma cada valor de predictions (almacena los precios predichos) y lo convierte en una cadena
  predictions_str = ', '.join(predictions_list)
  message = f'{STOCK} prediction for upcoming 3 days ({predictions_str})'

  print(message)

In [None]:
# Execute model for the whole history range
copy_df = init_df.copy()
y_predicted = model.predict(x_train)
y_predicted_transformed = np.squeeze(scaler.inverse_transform(y_predicted))
#Convierte las predicciones escaladas de vuelta a su rango original
#Elimina cualquier dimensión extra en el array. Si y_predicted tiene la forma (n_samples, 1), ahora será un array 1D de forma (n_samples,)
first_seq = scaler.inverse_transform(np.expand_dims(y_train[:6], axis=1))
#Selecciona los primeros 6 valores del conjunto de entrenamiento (y_train) y los desescala para devolverlos a su rango original
last_seq = scaler.inverse_transform(np.expand_dims(y_train[-3:], axis=1))
#últimos 3 valores de y_train y los desescala
y_predicted_transformed = np.append(first_seq, y_predicted_transformed)
y_predicted_transformed = np.append(y_predicted_transformed, last_seq)
copy_df[f'predicted_close'] = y_predicted_transformed

In [None]:
copy_df

In [None]:
# Add predicted results to the table
date_now = dt.date.today()
date_tomorrow = dt.date.today() + dt.timedelta(days=1)
date_after_tomorrow = dt.date.today() + dt.timedelta(days=2)

copy_df.loc[date_now] = [predictions[0], f'{date_now}', 0, 0]
copy_df.loc[date_tomorrow] = [predictions[1], f'{date_tomorrow}', 0, 0]
copy_df.loc[date_after_tomorrow] = [predictions[2], f'{date_after_tomorrow}', 0, 0]

In [None]:
# Result chart
plt.style.use(style='ggplot')
plt.figure(figsize=(16,10))
plt.plot(copy_df['close'][-150:].head(147))
plt.plot(copy_df['predicted_close'][-150:].head(147), linewidth=1, linestyle='dashed')
plt.plot(copy_df['close'][-150:].tail(4))
plt.xlabel('days')
plt.ylabel('price')
plt.legend([f'Actual price for {STOCK}',
            f'Predicted price for {STOCK}',
            f'Predicted price for future 3 days'])
plt.show()

# RESUMEN
### Propósito del modelo
El objetivo es predecir los precios futuros de acciones (específicamente para el ticker GOOGL) utilizando datos históricos mediante una red neuronal LSTM implementada con TensorFlow/Keras.

### Pasos clave según el notebook
1. Carga de datos:
- Se utiliza la biblioteca yahoo_fin para extraer datos históricos de precios de acciones.
- Los datos abarcan los últimos 1104 días (alrededor de 3 años) con intervalos diarios (1d).
2. Preparación de los datos:
- Escalado: Los datos de precios se normalizan usando MinMaxScaler para ajustarlos a un rango entre 0 y 1.
- Ventanas deslizantes: Se preparan ventanas de datos de tamaño N_STEPS = 7 (una semana) para alimentar la red LSTM.
3. Arquitectura de la red neuronal:
- Utiliza Keras con un modelo Sequential:
- Capa LSTM inicial con 60 neuronas y return_sequences=True.
- Dropout del 30% para evitar sobreajuste.
- Segunda capa LSTM con 120 neuronas y return_sequences=False.
- Capas densas intermedias con 20 neuronas.
- Una capa densa final con 1 neurona para predecir un valor único (el precio futuro).
- Función de pérdida: mean_squared_error.
- Optimizador: adam.
4. Entrenamiento del modelo:
- Se entrena el modelo con BATCH_SIZE = 8 y EPOCHS = 80.
5. Predicciones:
- El modelo realiza predicciones para los próximos días (LOOKUP_STEPS = [1, 2, 3]).
- Las predicciones escaladas se desescalan utilizando MinMaxScaler para obtener los valores originales.
6. Visualización de resultados:
- Se genera un gráfico que compara los precios reales con las predicciones del modelo (incluidas las proyecciones futuras).


## 2. Modelo CNN 1D aplicado al índice China A50

In [None]:
!pip install yfinance


In [None]:
import yfinance as yf

In [None]:
data = yf.download("000001.SS", start="2023-01-22",  end = "2025-01-22")

In [None]:
import matplotlib.pyplot as plt

# Graficar precios de cierre
plt.figure(figsize=(12, 6))  # Ajustar el tamaño de la figura
plt.plot(data['Close'], label='Closing Price', color='blue')

# Añadir etiquetas y título
plt.title('Shanghai Composite Index (Closing Prices)', fontsize=16)
plt.xlabel('Date', fontsize=14)
plt.ylabel('Closing Price (CNY)', fontsize=14)
plt.legend()
plt.grid()

# Mostrar la gráfica
plt.show()

In [None]:
data


In [None]:
from sklearn.preprocessing import MinMaxScaler

# Eliminar filas con valores faltantes
data = data.dropna()

# Crear etiquetas basadas en el cambio del precio de cierre después de 10 días
def create_labels(data, days=10):
    labels = []
    close_values = data['Close'].values  # Convertimos la columna a un array de NumPy
    for i in range(len(close_values) - days):
        if close_values[i + days] > close_values[i]:
            labels.append(1)
        else:
            labels.append(0)
    # Agregar ceros para los últimos días sin etiquetas
    labels += [0] * days
    return labels

data['Label'] = create_labels(data)

# Remover los últimos 10 días porque no tienen etiquetas válidas
data = data[:-10]

# Normalizar las características (min-max normalization)
scaler = MinMaxScaler()
features = ['Open', 'High', 'Low', 'Close', 'Volume']
data[features] = scaler.fit_transform(data[features])

In [None]:
data

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split

# Preparar las entradas (X) y las etiquetas (y)
# Convierte el subconjunto de pandas (DataFrame) en un array de NumPy
X = data[features].values
y = data['Label'].values

# Transformar las entradas en forma de 3D para la CNN
# Formato: (n_samples, n_timesteps, n_features)
n_timesteps = 10  # Número de pasos de tiempo
X_cnn = []
y_cnn = []
for i in range(len(X) - n_timesteps):
  #Extrae una submatriz de tamaño (n_timesteps, n_features)
    X_cnn.append(X[i:i + n_timesteps])
    y_cnn.append(y[i + n_timesteps])

X_cnn = np.array(X_cnn)
y_cnn = np.array(y_cnn)

# Dividir los datos en entrenamiento (80%) y prueba (20%)
#ensuring that the model is trained on earlier data and tested on later data.
X_train, X_test, y_train, y_test = train_test_split(X_cnn, y_cnn, test_size=0.2, random_state=42, shuffle=False)

# Imprimir la forma de los datos
print("Forma de X_train:", X_train.shape)  # (n_samples_train, n_timesteps, n_features)
print("Forma de y_train:", y_train.shape)  # (n_samples_train,)
print("Forma de X_test:", X_test.shape)
print("Forma de y_test:", y_test.shape)

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, LeakyReLU
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
import numpy as np

# Parámetros del modelo
n_classes = 2         # Clasificación binaria
dropout_rate = 0.8    # Dropout para evitar sobreajuste
learning_rate = 0.0001  # Tasa de aprendizaje
batch_size = 1000     # Tamaño del batch

# Crear el modelo CNN
model = Sequential()

model.add(Conv1D(filters=32, kernel_size=3, strides=1, activation='relu', input_shape=(10, 5)))
model.add(MaxPooling1D(pool_size=2))
model.add(Conv1D(filters=64, kernel_size=3, strides=1))
model.add(LeakyReLU(alpha=0.01))
model.add(MaxPooling1D(pool_size=2))
model.add(Conv1D(filters=128, kernel_size=3, strides=1))
model.add(LeakyReLU(alpha=0.01))
model.add(MaxPooling1D(pool_size=2))
model.add(Conv1D(filters=256, kernel_size=3, strides=1))
model.add(LeakyReLU(alpha=0.01))
model.add(MaxPooling1D(pool_size=2))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(rate=1 - dropout_rate))
model.add(Dense(n_classes, activation='softmax'))

# Compilar el modelo
model.compile(optimizer=Adam(learning_rate=learning_rate),
              loss=SparseCategoricalCrossentropy(),
              metrics=['accuracy'])

# Resumen del modelo
model.summary()

# Entrenar el modelo
# Supongamos que tienes datos de entrenamiento `X_train` y `y_train` preparados
# X_train: Datos de entrada con forma (n_samples, n_timesteps, n_features)
# y_train: Etiquetas con forma (n_samples, )
# history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.2)

In [None]:
history = model.fit(X_train, y_train,
                    batch_size=batch_size,
                    epochs=40,
                    validation_split=0.2)

In [None]:
# Evaluar el modelo con datos de prueba
loss, accuracy = model.evaluate(X_test, y_test)
print(f"Pérdida en los datos de prueba: {loss:.4f}")
print(f"Exactitud en los datos de prueba: {accuracy:.4f}")


In [None]:
# Obtener predicciones
y_pred_prob = model.predict(X_test)  # Probabilidades predichas
y_pred = np.argmax(y_pred_prob, axis=1)  # Convertir probabilidades a clases

print("Predicciones:", y_pred)

from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

# Calcular métricas
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1:.4f}")



In [None]:
import matplotlib.pyplot as plt

# Graficar pérdida
plt.plot(history.history['loss'], label='Pérdida de entrenamiento')
plt.plot(history.history['val_loss'], label='Pérdida de validación')
plt.xlabel('Épocas')
plt.ylabel('Pérdida')
plt.legend()
plt.show()

# Graficar exactitud
plt.plot(history.history['accuracy'], label='Exactitud de entrenamiento')
plt.plot(history.history['val_accuracy'], label='Exactitud de validación')
plt.xlabel('Épocas')
plt.ylabel('Exactitud')
plt.legend()
plt.show()


## 3. Modelo híbrido CNN + LSTM aplicado a datos financieros reales

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import yfinance as yf
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, SimpleRNN, Conv1D, Flatten
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error

# Fetch Stock Data from Yahoo Finance
stock_symbol = "AAPL"  # You can change this to any stock symbol
df = yf.download(stock_symbol, start="2020-01-01", end="2024-01-01")
df = df[['Close']]

# Data Preprocessing
scaler = MinMaxScaler(feature_range=(0, 1))
df_scaled = scaler.fit_transform(df)

print(df_scaled.shape)

# Convert to time series data for training
def create_sequences(data, seq_length):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length])
    return np.array(X), np.array(y)

SEQ_LENGTH = 50  # Number of days to look back
X, y = create_sequences(df_scaled, SEQ_LENGTH)

print(X.shape)
print(y.shape)



In [None]:

# Split into training and testing
split = int(0.8 * len(X))
X_train, X_test, y_train, y_test = X[:split], X[split:], y[:split], y[split:]

# Reshape for CNN input
X_train_cnn = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
X_test_cnn = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)



# Define Models
def build_rnn():
    model = Sequential([
        SimpleRNN(50, activation="relu", return_sequences=True, input_shape=(SEQ_LENGTH, 1)),
        SimpleRNN(50, activation="relu"),
        Dense(1)
    ])
    model.compile(optimizer=Adam(learning_rate=0.001), loss="mse")
    return model

def build_lstm():
    model = Sequential([
        LSTM(50, activation="relu", return_sequences=True, input_shape=(SEQ_LENGTH, 1)),
        LSTM(50, activation="relu"),
        Dense(1)
    ])
    model.compile(optimizer=Adam(learning_rate=0.001), loss="mse")
    return model

def build_cnn():
    model = Sequential([
        Conv1D(filters=64, kernel_size=3, activation="relu", input_shape=(SEQ_LENGTH, 1)),
        Flatten(),
        Dense(50, activation="relu"),
        Dense(1)
    ])
    model.compile(optimizer=Adam(learning_rate=0.001), loss="mse")
    return model

# Train models
rnn_model = build_rnn()
lstm_model = build_lstm()
cnn_model = build_cnn()

history_rnn = rnn_model.fit(X_train, y_train, epochs=20, batch_size=16, validation_data=(X_test, y_test), verbose=1)
history_lstm = lstm_model.fit(X_train, y_train, epochs=20, batch_size=16, validation_data=(X_test, y_test), verbose=1)
history_cnn = cnn_model.fit(X_train_cnn, y_train, epochs=20, batch_size=16, validation_data=(X_test_cnn, y_test), verbose=1)

# Predictions
y_pred_rnn = rnn_model.predict(X_test)
y_pred_lstm = lstm_model.predict(X_test)
y_pred_cnn = cnn_model.predict(X_test_cnn)

# Inverse transform predictions
y_pred_rnn = scaler.inverse_transform(y_pred_rnn)
y_pred_lstm = scaler.inverse_transform(y_pred_lstm)
y_pred_cnn = scaler.inverse_transform(y_pred_cnn)
y_test_actual = scaler.inverse_transform(y_test)

# Compute metrics
mae_rnn = mean_absolute_error(y_test_actual, y_pred_rnn)
mse_rnn = mean_squared_error(y_test_actual, y_pred_rnn)

mae_lstm = mean_absolute_error(y_test_actual, y_pred_lstm)
mse_lstm = mean_squared_error(y_test_actual, y_pred_lstm)

mae_cnn = mean_absolute_error(y_test_actual, y_pred_cnn)
mse_cnn = mean_squared_error(y_test_actual, y_pred_cnn)

# Plot predictions vs actual prices
plt.figure(figsize=(12, 6))
plt.plot(y_test_actual, label="Actual Price", color='black')
plt.plot(y_pred_rnn, label="RNN Prediction", linestyle="dashed")
plt.plot(y_pred_lstm, label="LSTM Prediction", linestyle="dotted")
plt.plot(y_pred_cnn, label="CNN Prediction", linestyle="dashdot")
plt.legend()
plt.title("Stock Price Prediction Comparison")
plt.xlabel("Time")
plt.ylabel("Stock Price")
plt.show()

# Display results
results_df = pd.DataFrame({
    "Model": ["RNN", "LSTM", "CNN"],
    "MAE": [mae_rnn, mae_lstm, mae_cnn],
    "MSE": [mse_rnn, mse_lstm, mse_cnn]
})

print(results_df)


## 4. Comparación visual y conclusiones

In [None]:
# Ejemplo opcional para comparar visualmente predicciones
# import matplotlib.pyplot as plt
# plt.plot(y_test, label='Real')
# plt.plot(y_pred_lstm, label='LSTM')
# plt.plot(y_pred_cnn, label='CNN')
# plt.plot(y_pred_hybrid, label='CNN + LSTM')
# plt.legend()
# plt.title('Comparación de modelos sobre datos financieros')
# plt.show()