## Import the necessary libraries

In [1]:
import pandas as pd
import numpy as np
from scipy.fft import fft, ifft
from scipy.signal.windows import hann
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error, mean_squared_log_error
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import TimeSeriesSplit
import matplotlib.pyplot as plt
%matplotlib inline

## Load data

In [2]:
# Load and preprocess data
data = pd.read_csv(r'C:\Users\trieu\OneDrive\Documents\HK2_2024\IS403\IS403_BA\IS403_O22_HTCL_2\Dataset\AMV Historical Data.csv')
df = data.copy()
df['Date'] = pd.to_datetime(df['Date'])
df = df.sort_values('Date')
df.set_index('Date', inplace=True)
df = df.replace(',', '', regex=True)

In [3]:
# Select 'Price' column and handle missing values
price_data = df['Price'].astype(float).interpolate()

In [4]:
# Remove outliers (simple Z-score method)
z_scores = np.abs((price_data - price_data.mean()) / price_data.std())
price_data = price_data[z_scores < 3]

In [5]:
# Scale data
scaler = MinMaxScaler(feature_range=(0,1))
data_scaled = scaler.fit_transform(price_data.values.reshape(-1, 1))

In [6]:
# Split data
train_size = int(len(data_scaled) * 0.7)
train_data = data_scaled[:train_size]
test_data = data_scaled[train_size:]

In [7]:
# Perform FFT with windowing
def fft_transform(data, num_frequencies=10):
    n = len(data)
    window = hann(n)
    fft_result = fft(data * window)
    frequencies = np.fft.fftfreq(n)
    
    # Keep only top frequencies
    top_indices = np.argsort(np.abs(fft_result))[-num_frequencies:]
    filtered_fft = np.zeros_like(fft_result)
    filtered_fft[top_indices] = fft_result[top_indices]
    
    return filtered_fft, frequencies



In [8]:
# Inverse FFT
def ifft_transform(fft_result):
    return np.real(ifft(fft_result))

In [11]:
# Modify predict_with_trend function
def predict_with_trend(data, fft_result, days, window_size=30):
    # Linear trend
    x = np.arange(window_size)
    trend = np.polyfit(x, data[-window_size:].flatten(), 1)
    trend_func = np.poly1d(trend)
    
    # FFT prediction
    n = len(data)
    if len(fft_result) != n:
        fft_result, _ = fft_transform(data.flatten(), num_frequencies=best_num_freq)
    
    extended_fft = np.zeros(window_size + days, dtype=complex)
    extended_fft[:window_size] = fft_result[-window_size:]
    fft_pred = ifft_transform(extended_fft)[-days:]
    
    # Combine trend and FFT
    trend_pred = trend_func(np.arange(window_size, window_size+days))
    combined_pred = trend_pred + fft_pred
    return np.maximum(combined_pred, 0)  # Ensure non-negative values


In [12]:
# Predict for test data using sliding window
window_size = 30
step_size = 7
filtered_test = []

for i in range(0, len(test_data), step_size):
    if i + window_size > len(train_data):
        break
    window_data = np.concatenate((train_data[-window_size:], test_data[:i]))
    window_fft, _ = fft_transform(window_data.flatten(), num_frequencies=best_num_freq)
    pred = predict_with_trend(window_data, window_fft, step_size, window_size)
    filtered_test.extend(pred)

filtered_test = np.array(filtered_test)[:len(test_data)]

NameError: name 'best_num_freq' is not defined

In [None]:
# Predict future values
future_data = np.concatenate((train_data, test_data))
future_fft, _ = fft_transform(future_data.flatten(), num_frequencies=best_num_freq)
y_next_30_days = predict_with_trend(future_data, future_fft, 30, window_size)
y_next_60_days = predict_with_trend(future_data, future_fft, 60, window_size)
y_next_90_days = predict_with_trend(future_data, future_fft, 90, window_size)

In [None]:
# Calculate error metrics
test_mape = mean_absolute_percentage_error(test_data, filtered_test.reshape(-1, 1))
test_mse = mean_squared_error(test_data, filtered_test.reshape(-1, 1))
test_rmse = np.sqrt(test_mse)
test_msle = mean_squared_log_error(test_data, filtered_test.reshape(-1, 1))

print(f"MAPE on Test set: {test_mape}")
print(f"RMSE on Test set: {test_rmse}")
print(f"MSLE on Test set: {test_msle}")

In [None]:
# Inverse transform predictions
y_next_30_days = scaler.inverse_transform(y_next_30_days.reshape(-1, 1)).flatten()
y_next_60_days = scaler.inverse_transform(y_next_60_days.reshape(-1, 1)).flatten()
y_next_90_days = scaler.inverse_transform(y_next_90_days.reshape(-1, 1)).flatten()

# Print predictions
print("Predicted next 30 days:")
print(y_next_30_days)
print("\nPredicted next 60 days:")
print(y_next_60_days)
print("\nPredicted next 90 days:")
print(y_next_90_days)

In [None]:
# Create DataFrames for plotting
train_data_df = pd.DataFrame(scaler.inverse_transform(train_data), index=df.index[:train_size], columns=['Price'])
test_data_df = pd.DataFrame(scaler.inverse_transform(test_data), index=df.index[train_size:], columns=['Price'])
filtered_test_df = pd.DataFrame(scaler.inverse_transform(filtered_test.reshape(-1, 1)), index=df.index[train_size:], columns=['Predict Test'])
next_30_days_df = pd.DataFrame(y_next_30_days, index=pd.date_range(start=df.index[-1] + pd.DateOffset(days=1), periods=30), columns=['Next 30 Days'])
next_60_days_df = pd.DataFrame(y_next_60_days[30:], index=pd.date_range(start=df.index[-1] + pd.DateOffset(days=31), periods=30), columns=['Next 60 Days'])
next_90_days_df = pd.DataFrame(y_next_90_days[60:], index=pd.date_range(start=df.index[-1] + pd.DateOffset(days=61), periods=30), columns=['Next 90 Days'])

# Plot results
plt.figure(figsize=(12, 8))
plt.plot(train_data_df.index, train_data_df['Price'], label='Train')
plt.plot(test_data_df.index, test_data_df['Price'], label='Test')
plt.plot(filtered_test_df.index, filtered_test_df['Predict Test'], label='Predict Test')
plt.plot(next_30_days_df.index, next_30_days_df['Next 30 Days'], label='Next 30 Days')
plt.plot(next_60_days_df.index, next_60_days_df['Next 60 Days'], label='Next 60 Days')
plt.plot(next_90_days_df.index, next_90_days_df['Next 90 Days'], label='Next 90 Days')
plt.legend()
plt.grid()
plt.title("DHT Price Data With Ratio 7:3 (Optimized FFT)")
plt.xlabel("Date")
plt.ylabel("Price value")
plt.savefig('FFT_73_DHT_Optimized.png')
plt.show()