In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from google.colab import files
import datetime
import matplotlib.pyplot as plt
import tensorflow as tf

In [None]:
dataset = files.upload()
filename = list(dataset.keys())[0]

Saving PKO.WA (2004-2023).csv to PKO.WA (2004-2023) (4).csv


In [None]:
df = pd.read_csv(filename)

In [None]:
df = df.dropna(how='any',axis=0)

In [None]:
df = df[['Date', 'Close']]

In [None]:
def str_to_datetime(s):
  split = s.split('-')
  year, month, day = int(split[0]), int(split[1]), int(split[2])
  return datetime.datetime(year=year, month=month, day=day)


In [None]:
df['Date'] = df['Date'].apply(str_to_datetime)

In [None]:
df.index = df.pop('Date')

In [None]:
plt.figure(figsize=(16,10))
plt.plot(df.index, df['Close'])
plt.title('Dane')
plt.xlabel('Daty')
plt.ylabel('Cena zamknięcia [PLN]')
plt.legend()

In [None]:
def df_to_windowed_df(dateframe, first_date_str, last_date_str, n):
  first_date = str_to_datetime(first_date_str)
  last_date = str_to_datetime(last_date_str)

  target_date = first_date

  dates = []
  X, Y = [], []

  last_time = False
  while True:
    df_subset = dateframe.loc[:target_date].tail(n+1)

    if len(df_subset) != n+1:
      return

    values = df_subset['Close'].to_numpy()
    x, y = values[:-1], values[-1]

    dates.append(target_date)
    X.append(x)
    Y.append(y)

    next_week = dateframe.loc[target_date:target_date + datetime.timedelta(days=7)]
    next_datetime_str = str(next_week.head(2).tail(1).index.values[0])
    next_date_str = next_datetime_str.split('T')[0]
    year_month_day = next_date_str.split('-')
    year, month, day = year_month_day
    next_date = datetime.datetime(day=int(day), month=int(month), year=int(year))

    if last_time:
      break

    target_date = next_date

    if target_date == last_date:
      last_time = True

  ret_df = pd.DataFrame({})
  ret_df['Target Date'] = dates

  X = np.array(X)
  for i in range(0, n):
    X[:, i]
    ret_df[f'Target-{n-i}'] = X[:, i]

  ret_df['Target'] = Y

  return ret_df


n = 3

start = df['Close'].index[n]
start_date = start.strftime('%Y-%m-%d')

end = df['Close'].index[-1]
end_date = end.strftime('%Y-%m-%d')



windowed_df = df_to_windowed_df(df, start_date, end_date, n)


In [None]:
def windowed_df_to_date_X_y(windowed_dataframe):
  df_as_np = windowed_dataframe.to_numpy()

  dates = df_as_np[:, 0]

  middle_matrix = df_as_np[:, 1: -1]
  X = middle_matrix.reshape((len(dates), middle_matrix.shape[1], 1))

  Y = df_as_np[:, -1]

  return dates, X.astype(np.float32), Y.astype(np.float32)

dates, X, y = windowed_df_to_date_X_y(windowed_df)

dates.shape, X.shape, y.shape

((4849,), (4849, 30, 1), (4849,))

In [None]:
blue_color = (0.2, 0.4, 0.6);
green_color = (0.4, 0.7, 0.2);
red_color = (0.8, 0.2, 0.4);

In [None]:
size_of_set = 365

a = int(len(dates) - size_of_set * 2)
b = int(len(dates) - size_of_set)

dates_train, X_train, y_train = dates[:a], X[:a], y[:a]

dates_val, X_val, y_val = dates[a:b], X[a:b], y[a:b]

dates_test, X_test, y_test = dates[b:], X[b:], y[b:]

plt.figure(figsize=(16,10))
plt.plot(dates_train, y_train, color=blue_color)
plt.plot(dates_val, y_val, color=green_color)
plt.plot(dates_test, y_test, color=red_color)
plt.title('Dane podzielone na trzy zbiory')
plt.xlabel('Daty')
plt.ylabel('Cena zamknięcia [PLN]')

plt.legend(['Zbiór uczący', 'Zbiór walidacyjny', 'Zbiór testowy'])

## Funkcje pomocnicze do obliczania błędów

In [None]:
def calculate_mse(actual, predicted):
    return np.square(np.subtract(np.array(actual), np.array(predicted)))

def calculate_mae(actual_values, predicted_values):
    return np.abs(np.array(actual_values) - np.array(predicted_values))

def mape(Y_actual,Y_Predicted):
    return np.mean(np.abs((Y_actual - Y_Predicted) / Y_actual)) * 100

# Funkcje pomocnicze do wyświetlania wykresów

In [None]:
def display_error_figure(data, title):
  plt.figure(figsize=(12,8))
  plt.plot(data, color=blue_color)
  plt.title(title)
  plt.ylabel('Wartość błędu')
  plt.xlabel('Obserwacje')
  plt.show()

def display_error_boxplot(data, title):
  plt.figure(figsize=(12,8))
  plt.boxplot(data)
  plt.title(title)
  plt.ylabel('Wartość błędu')
  plt.xlabel('')
  plt.show()

def display_prediction_data(dates, predictions, actual, title):
  plt.figure(figsize=(12,8))
  plt.plot(dates, actual, color=green_color)
  plt.plot(dates, predictions, color=blue_color)
  plt.legend(['Rzeczywsite', 'Przewidywane'])
  plt.xlabel('Daty')
  plt.ylabel('Cena zamknięcia [PLN]')
  plt.title(title)
  plt.show()

# Sieć neuronowa

In [None]:
import tensorflow as tf

model = tf.keras.models.Sequential([tf.keras.layers.Input((n, 1)),
                                    tf.keras.layers.LSTM(32),
                                    tf.keras.layers.Dense(16, activation='relu'),
                                    tf.keras.layers.Dense(16, activation='relu'),
                                    tf.keras.layers.Dense(48, activation='relu'),
                                    tf.keras.layers.Dense(1)])

model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), metrics=['mean_absolute_error', 'mean_squared_error'])

history = model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=20, batch_size=32)

In [None]:
test_predictions = model.predict(X_test).flatten()
train_predictions = model.predict(X_train).flatten()
val_predictions = model.predict(X_val).flatten()

## Model metrics

In [None]:
pd.DataFrame(history.history).plot(figsize=(12,8))

In [None]:
plt.figure(figsize=(12,8))
# plt.plot(history.history['mean_squared_error'][2:])
plt.plot(history.history['val_mean_squared_error'])
plt.title('Model MSE')
plt.ylabel('Wartości błędu')
plt.xlabel('Epoka')
plt.legend(['Uczący', 'Walidacyjny'])

#plt.boxplot(history.history['val_mean_squared_error'])
# plt.boxplot(history.history['mean_squared_error'])

In [None]:
plt.figure(figsize=(12,8))
plt.plot(history.history['mean_absolute_error'])
plt.plot(history.history['val_mean_absolute_error'])
plt.title('Model MAE')
plt.ylabel('Wartości błędu')
plt.xlabel('Epoka')
plt.legend(['Uczący', 'Walidacyjny'])

In [None]:
plt.figure(figsize=(12,8))
plt.plot(history.history['loss'], label="training loss")
plt.plot(history.history['val_loss'], label="validation loss")
plt.title('Strata modelu')
plt.ylabel('Strata')
plt.xlabel('Epoka')

In [None]:
plt.figure(figsize=(12,8))
plt.plot(history.history['loss'][1:], label="training loss")
plt.plot(history.history['val_loss'][1:], label="validation loss")
plt.title('Strata modelu')
plt.ylabel('Strata')
plt.xlabel('Epoka')

## Błędy

In [None]:
test_mse_errors = calculate_mse(y_test, test_predictions)
train_mse_errors = calculate_mse(y_train, train_predictions)
val_mse_errors = calculate_mse(y_val, val_predictions)

In [None]:
train_mae_errors = calculate_mae(y_train, train_predictions)
val_mae_errors = calculate_mae(y_val, val_predictions)
test_mae_errors = calculate_mae(y_test, test_predictions)

In [None]:
from sklearn.metrics import mean_squared_error

train_mse = mean_squared_error(y_train, train_predictions)
test_mse = mean_squared_error(y_test, test_predictions)
val_mse = mean_squared_error(y_val, val_predictions)

train_mse, val_mse, test_mse

(0.5269312, 0.40351763, 4.1429653)

In [None]:
from sklearn.metrics import mean_absolute_error

train_mae = mean_absolute_error(y_train, train_predictions)
test_mae = mean_absolute_error(y_test, test_predictions)
val_mae = mean_absolute_error(y_val, val_predictions)

train_mae, val_mae, test_mae

(0.5330532, 0.55410874, 1.4257317)

In [None]:
from sklearn.metrics import mean_absolute_percentage_error

train_mape = mean_absolute_percentage_error(y_train, train_predictions)
test_mape = mean_absolute_percentage_error(y_test, test_predictions)
val_mape = mean_absolute_percentage_error(y_val, val_predictions)

train_mape, val_mape, test_mape

(0.015348279, 0.016157018, 0.035638202)

## Wykresy błędów

### MSE

In [None]:
display_error_figure(train_mse_errors, 'Błędy średniokwadratowe zbioru uczącego')
print('')
display_error_figure(val_mse_errors, 'Błędy średniokwadratowe zbioru walidacyjnego')
print('')
display_error_figure(test_mse_errors, 'Błędy średniokwadratowe zbioru testowego')

In [None]:
display_error_boxplot(train_mse_errors, 'Błędy średniokwadratowe uczącego')
print()
display_error_boxplot(val_mse_errors, 'Błędy średniokwadratowe walidacyjnego')
print()
display_error_boxplot(test_mse_errors, 'Błędy średniokwadratowe zbioru testowego')

### MAE

In [None]:
display_error_figure(train_mae_errors, 'Błędy bezwzględne zbioru uczącego')
print()
display_error_figure(val_mae_errors, 'Błędy bezwzględne zbioru walidacyjnego')
print()
display_error_figure(test_mae_errors, 'Błędy bezwzględne zbioru testowego')

In [None]:
display_error_boxplot(train_mae_errors, 'Błędy bezwzględne zbioru uczącego')
print()
display_error_boxplot(val_mae_errors, 'Błędy bezwzględne zbioru walidacyjnego')
print()
display_error_boxplot(test_mae_errors, 'Błędy bezwzględne zbioru testowego')

## Wykresy przewidywanych wartości

In [None]:
display_prediction_data(dates_train, train_predictions, y_train, 'Zbiór uczący')
print()
display_prediction_data(dates_val, val_predictions, y_val, 'Zbiór walidacyjny')
print()
display_prediction_data(dates_test, test_predictions, y_test, 'Zbiór testowy')

In [None]:
plt.figure(figsize=(12,8))
# plt.plot(dates_train, y_train)
# plt.plot(dates_train, train_predictions)

plt.plot(dates_val, y_val)
plt.plot(dates_val, val_predictions)

plt.plot(dates_test, y_test)
plt.plot(dates_test, test_predictions)

plt.xlabel('Daty')
plt.ylabel('Cena zamknięcia [PLN]')

plt.legend(['Validation', 'Validation prediction', 'Test', 'Test prediction'])
plt.title('Observations vs Predictions')

# ARIMA

In [None]:
pip install pmdarima

In [None]:
from pmdarima.arima import auto_arima

model_arima = auto_arima(y=y_train,
                    m=6,
                    D=1,
                    trace=True,
                    seasonal=True,
                    error_action='ignore',
                    suppress_warnings=True,
                    stepwise=True)

In [None]:
model_arima.summary()

In [None]:
y_train.shape
X_train.reshape(-1,1)

In [None]:
X_train = X_train[:,:,-1]
X_val = X_val[:,:,-1]
X_test = X_test[:,:,-1]

In [None]:
model_arima.fit(y=y_train, X=X_train)

In [None]:
train_forecast = model_arima.predict(X=X_train, n_periods=len(X_train)).flatten()
validation_forecast = model_arima.predict(X=X_val, n_periods=len(X_val)).flatten()
test_forecast = model_arima.predict(X=X_test, n_periods=len(X_test)).flatten()

## Błędy

In [None]:
arima_train_mae_errors = calculate_mae(y_train, train_forecast)
arima_val_mae_errors = calculate_mae(y_val, validation_forecast)
arima_test_mae_errors = calculate_mae(y_test, test_forecast)

In [None]:
arima_train_mse_errors = calculate_mse(y_train, train_predictions)
arima_val_mse_errors = calculate_mse(y_val, validation_forecast)
arima_test_mse_errors = calculate_mse(y_test, test_forecast)

In [None]:
from sklearn.metrics import mean_squared_error

arima_train_mse = mean_squared_error(y_train, train_forecast)
arima_val_mse = mean_squared_error(y_val, validation_forecast)
arima_test_mse = mean_squared_error(y_test, test_forecast)


arima_train_mse, arima_val_mse, arima_test_mse

In [None]:
from sklearn.metrics import mean_absolute_error

arima_train_mae = mean_absolute_error(y_train, train_forecast)
arima_val_mae = mean_absolute_error(y_val, validation_forecast)
arima_test_mae = mean_absolute_error(y_test, test_forecast)


arima_train_mae, arima_val_mae, arima_test_mae

In [None]:
from sklearn.metrics import mean_absolute_percentage_error

arima_train_mape = mean_absolute_percentage_error(y_train, train_forecast)
arima_val_mape = mean_absolute_percentage_error(y_val, validation_forecast)
arima_test_mape = mean_absolute_percentage_error(y_test, test_forecast)

arima_train_mape, arima_val_mape, arima_test_mape

### MSE

In [None]:
display_error_figure(arima_train_mse_errors, 'Błędy średniokwadratowe zbioru uczącego')
print('')
display_error_figure(arima_val_mse_errors, 'Błędy średniokwadratowe zbioru walidacyjnego')
print('')
display_error_figure(arima_test_mse_errors, 'Błędy średniokwadratowe zbioru testowego')

In [None]:
display_error_boxplot(arima_train_mse_errors, 'Błędy średniokwadratowe zbioru uczącego')
print()
display_error_boxplot(arima_val_mse_errors, 'Błędy średniokwadratowe zbioru walidacyjnego')
print()
display_error_boxplot(arima_test_mse_errors, 'Błędy średniokwadratowe zbioru testowego')

### MAE

In [None]:
display_error_figure(train_mae_errors, 'Błędy bezwzględne zbioru uczącego')
print()
display_error_figure(val_mae_errors, 'Błędy bezwzględne zbioru walidacyjnego')
print()
display_error_figure(test_mae_errors, 'Błędy bezwzględne zbioru testowego')

In [None]:
display_error_boxplot(train_mae_errors, 'Błędy bezwzględne zbioru uczącego')
print()
display_error_boxplot(val_mae_errors, 'Błędy bezwzględne zbioru walidacyjnego')
print()
display_error_boxplot(test_mae_errors, 'Błędy bezwzględne zbioru testowego')

## Wykresy przewidywanych wartości

In [None]:
display_prediction_data(dates_train, train_forecast, y_train, 'Zbiór uczący')
print()
display_prediction_data(dates_val, validation_forecast, y_val, 'Zbiór walidacyjny')
print()
display_prediction_data(dates_test, test_forecast, y_test, 'Zbiór testowy')

In [None]:
plt.figure(figsize=(12,8))
plt.plot(dates_train, y_train)
plt.plot(dates_train, train_forecast)

plt.plot(dates_val, y_val)
plt.plot(dates_val, validation_forecast)

plt.plot(dates_test, y_test)
plt.plot(dates_test, test_forecast)

plt.xlabel('Daty')
plt.ylabel('Cena zamknięcia [PLN]')

plt.legend(['Train', 'Train prediction',  'Validation', 'Validation prediction', 'Test', 'Test prediction'])
plt.title('Observations vs Predictions')