<a href="https://colab.research.google.com/github/zabihullahburhani/-Improving-the-Accuracy-of-Crude-Oil-Price-Prediction-Using-a-Transformer-Based-Hybrid-Approach/blob/main/Time_Series_using_my_Hybrid_appraoch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

# 1️⃣ تعریف مدل ترکیبی LSTM و Transformer
class LSTMTransformerModel(nn.Module):
    def __init__(self, input_dim, lstm_hidden_dim, d_model, n_heads, num_layers, seq_len, dropout=0.1):
        super(LSTMTransformerModel, self).__init__()

        # لایه LSTM
        self.lstm = nn.LSTM(input_dim, lstm_hidden_dim, num_layers=1, batch_first=True)

        # پروجکشن برای تبدیل خروجی LSTM به ابعاد d_model
        self.lstm_projection = nn.Linear(lstm_hidden_dim, d_model)

        # لایه‌های Transformer
        transformer_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=d_model * 4,
            dropout=dropout,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(transformer_layer, num_layers=num_layers)

        # لایه خروجی
        self.fc = nn.Linear(d_model, input_dim)
        self.seq_len = seq_len

    def forward(self, x):
        # عبور از LSTM
        lstm_out, _ = self.lstm(x)  # (batch_size, seq_len, lstm_hidden_dim)

        # پروجکشن به ابعاد d_model
        transformer_in = self.lstm_projection(lstm_out)  # (batch_size, seq_len, d_model)

        # عبور از Transformer
        transformer_out = self.transformer(transformer_in)  # (batch_size, seq_len, d_model)

        # پیش‌بینی با استفاده از لایه خطی
        out = self.fc(transformer_out[:, -1, :])  # فقط از آخرین timestep استفاده می‌کنیم
        return out

# 2️⃣ بارگذاری داده‌ها
def load_data(file_path):
    data = pd.read_csv(file_path, usecols=['Date', 'Close'])
    data['Date'] = pd.to_datetime(data['Date'])
    data.set_index('Date', inplace=True)
    return data

# 3️⃣ پیش‌پردازش داده‌ها
def preprocess_data(data, seq_len=60, train_ratio=0.8):
    scaler = MinMaxScaler()
    data_scaled = scaler.fit_transform(data)

    X, y = [], []
    for i in range(len(data_scaled) - seq_len):
        X.append(data_scaled[i:i+seq_len])
        y.append(data_scaled[i+seq_len])

    X, y = np.array(X), np.array(y)

    train_size = int(len(X) * train_ratio)
    X_train, X_test = X[:train_size], X[train_size:]
    y_train, y_test = y[:train_size], y[train_size:]

    return X_train, y_train, X_test, y_test, scaler

# 4️⃣ آموزش و اعتبارسنجی مدل
def train_evaluate_model(X, y, k=10, epochs=20, lr=0.001, model_path="lstm_transformer_model.pth"):
    kf = KFold(n_splits=k, shuffle=True)
    rmse_scores, mse_scores, mae_scores = [], [], []

    best_model = None
    best_mse = float('inf')

    for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
        print(f"Fold {fold+1}/{k}")

        train_x, val_x = torch.tensor(X[train_idx], dtype=torch.float32), torch.tensor(X[val_idx], dtype=torch.float32)
        train_y, val_y = torch.tensor(y[train_idx], dtype=torch.float32), torch.tensor(y[val_idx], dtype=torch.float32)

        model = LSTMTransformerModel(
            input_dim=1,
            lstm_hidden_dim=64,
            d_model=64,
            n_heads=8,
            num_layers=2,
            seq_len=60
        )
        criterion = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr=lr)

        for epoch in range(epochs):
            model.train()
            optimizer.zero_grad()
            predictions = model(train_x)
            loss = criterion(predictions.squeeze(), train_y)
            loss.backward()
            optimizer.step()

            if (epoch+1) % 5 == 0:
                print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

        model.eval()
        with torch.no_grad():
            val_predictions = model(val_x).detach().numpy()
            val_actual = val_y.numpy()

        mse = mean_squared_error(val_actual, val_predictions)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(val_actual, val_predictions)

        mse_scores.append(mse)
        rmse_scores.append(rmse)
        mae_scores.append(mae)

        print(f"Fold {fold+1} - MSE: {mse:.4f}, RMSE: {rmse:.4f}, MAE: {mae:.4f}")

        if mse < best_mse:
            best_mse = mse
            best_model = model
            torch.save(model.state_dict(), model_path)

    print("\nFinal Cross-Validation Results:")
    print(f"Average MSE: {np.mean(mse_scores):.4f}")
    print(f"Average RMSE: {np.mean(rmse_scores):.4f}")
    print(f"Average MAE: {np.mean(mae_scores):.4f}")

    return best_model

# 5️⃣ پیش‌بینی و مقایسه
def predict_and_compare(model, X_train, y_train, X_test, y_test, scaler, prediction_steps=30):
    model.eval()
    with torch.no_grad():
        test_predictions = model(torch.tensor(X_test, dtype=torch.float32)).detach().numpy()
    test_actual = y_test

    test_predictions = scaler.inverse_transform(test_predictions.reshape(-1, 1))
    test_actual = scaler.inverse_transform(test_actual.reshape(-1, 1))

    future_predictions = []
    current_input = torch.tensor(X_test[-1], dtype=torch.float32).unsqueeze(0)

    for _ in range(prediction_steps):
        with torch.no_grad():
            prediction = model(current_input).detach().numpy()
        future_predictions.append(prediction[0, 0])

        new_input = np.roll(current_input.numpy(), -1, axis=1)
        new_input[0, -1] = prediction
        current_input = torch.tensor(new_input, dtype=torch.float32)

    future_predictions = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1))

    plt.figure(figsize=(14, 6))
    plt.plot(range(len(test_actual)), test_actual, label="Actual Prices", color="blue")
    plt.plot(range(len(test_predictions)), test_predictions, label="Predicted Prices (Test Data)", color="green")
    plt.plot(range(len(test_actual), len(test_actual) + prediction_steps), future_predictions, label="Future Predictions", color="red")
    plt.legend()
    plt.title("Time Series Price Forecast and Comparison")
    plt.xlabel("Steps")
    plt.ylabel("Price")
    plt.show()

    mse_test = mean_squared_error(test_actual, test_predictions)
    rmse_test = np.sqrt(mse_test)
    mae_test = mean_absolute_error(test_actual, test_predictions)
    print("\nTest Data Evaluation:")
    print(f"MSE: {mse_test:.4f}, RMSE: {rmse_test:.4f}, MAE: {mae_test:.4f}")

    return future_predictions

# 6️⃣ اجرای فرآیند
file_path = "Crude oil.csv"
data = load_data(file_path)
X_train, y_train, X_test, y_test, scaler = preprocess_data(data)

model = train_evaluate_model(X_train, y_train)
future_predictions = predict_and_compare(model, X_train, y_train, X_test, y_test, scaler)

above mode LSTM + Transformer

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

# 1️⃣ تعریف مدل ترکیبی CNN و Transformer
class CNNTransformerModel(nn.Module):
    def __init__(self, input_dim, cnn_channels, d_model, n_heads, num_layers, seq_len, dropout=0.1):
        super(CNNTransformerModel, self).__init__()

        # لایه CNN
        self.cnn = nn.Sequential(
            nn.Conv1d(in_channels=input_dim, out_channels=cnn_channels, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv1d(in_channels=cnn_channels, out_channels=cnn_channels, kernel_size=3, padding=1),
            nn.ReLU()
        )

        # پروجکشن برای تبدیل خروجی CNN به ابعاد d_model
        self.cnn_projection = nn.Linear(cnn_channels, d_model)

        # لایه‌های Transformer
        transformer_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=d_model * 4,
            dropout=dropout,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(transformer_layer, num_layers=num_layers)

        # لایه خروجی
        self.fc = nn.Linear(d_model, input_dim)
        self.seq_len = seq_len

    def forward(self, x):
        # تغییر ابعاد برای CNN: (batch_size, input_dim, seq_len)
        x = x.permute(0, 2, 1)

        # عبور از CNN
        cnn_out = self.cnn(x)  # (batch_size, cnn_channels, seq_len)
        cnn_out = cnn_out.permute(0, 2, 1)  # (batch_size, seq_len, cnn_channels)

        # پروجکشن به ابعاد d_model
        transformer_in = self.cnn_projection(cnn_out)  # (batch_size, seq_len, d_model)

        # عبور از Transformer
        transformer_out = self.transformer(transformer_in)  # (batch_size, seq_len, d_model)

        # پیش‌بینی با استفاده از لایه خطی
        out = self.fc(transformer_out[:, -1, :])  # فقط از آخرین timestep استفاده می‌کنیم
        return out

# 2️⃣ بارگذاری داده‌ها
def load_data(file_path):
    data = pd.read_csv(file_path, usecols=['Date', 'Close'])
    data['Date'] = pd.to_datetime(data['Date'])
    data.set_index('Date', inplace=True)
    return data

# 3️⃣ پیش‌پردازش داده‌ها
def preprocess_data(data, seq_len=60, train_ratio=0.8):
    scaler = MinMaxScaler()
    data_scaled = scaler.fit_transform(data)

    X, y = [], []
    for i in range(len(data_scaled) - seq_len):
        X.append(data_scaled[i:i+seq_len])
        y.append(data_scaled[i+seq_len])

    X, y = np.array(X), np.array(y)

    train_size = int(len(X) * train_ratio)
    X_train, X_test = X[:train_size], X[train_size:]
    y_train, y_test = y[:train_size], y[train_size:]

    return X_train, y_train, X_test, y_test, scaler

# 4️⃣ آموزش و اعتبارسنجی مدل
def train_evaluate_model(X, y, k=10, epochs=20, lr=0.001, model_path="cnn_transformer_model.pth"):
    kf = KFold(n_splits=k, shuffle=True)
    rmse_scores, mse_scores, mae_scores = [], [], []

    best_model = None
    best_mse = float('inf')

    for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
        print(f"Fold {fold+1}/{k}")

        train_x, val_x = torch.tensor(X[train_idx], dtype=torch.float32), torch.tensor(X[val_idx], dtype=torch.float32)
        train_y, val_y = torch.tensor(y[train_idx], dtype=torch.float32), torch.tensor(y[val_idx], dtype=torch.float32)

        model = CNNTransformerModel(
            input_dim=1,
            cnn_channels=64,
            d_model=64,
            n_heads=8,
            num_layers=2,
            seq_len=60
        )
        criterion = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr=lr)

        for epoch in range(epochs):
            model.train()
            optimizer.zero_grad()
            predictions = model(train_x)
            loss = criterion(predictions.squeeze(), train_y)
            loss.backward()
            optimizer.step()

            if (epoch+1) % 5 == 0:
                print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

        model.eval()
        with torch.no_grad():
            val_predictions = model(val_x).detach().numpy()
            val_actual = val_y.numpy()

        mse = mean_squared_error(val_actual, val_predictions)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(val_actual, val_predictions)

        mse_scores.append(mse)
        rmse_scores.append(rmse)
        mae_scores.append(mae)

        print(f"Fold {fold+1} - MSE: {mse:.4f}, RMSE: {rmse:.4f}, MAE: {mae:.4f}")

        if mse < best_mse:
            best_mse = mse
            best_model = model
            torch.save(model.state_dict(), model_path)

    print("\nFinal Cross-Validation Results:")
    print(f"Average MSE: {np.mean(mse_scores):.4f}")
    print(f"Average RMSE: {np.mean(rmse_scores):.4f}")
    print(f"Average MAE: {np.mean(mae_scores):.4f}")

    return best_model

# 5️⃣ پیش‌بینی و مقایسه
def predict_and_compare(model, X_train, y_train, X_test, y_test, scaler, prediction_steps=30):
    model.eval()
    with torch.no_grad():
        test_predictions = model(torch.tensor(X_test, dtype=torch.float32)).detach().numpy()
    test_actual = y_test

    test_predictions = scaler.inverse_transform(test_predictions.reshape(-1, 1))
    test_actual = scaler.inverse_transform(test_actual.reshape(-1, 1))

    future_predictions = []
    current_input = torch.tensor(X_test[-1], dtype=torch.float32).unsqueeze(0)

    for _ in range(prediction_steps):
        with torch.no_grad():
            prediction = model(current_input).detach().numpy()
        future_predictions.append(prediction[0, 0])

        new_input = np.roll(current_input.numpy(), -1, axis=1)
        new_input[0, -1] = prediction
        current_input = torch.tensor(new_input, dtype=torch.float32)

    future_predictions = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1))

    plt.figure(figsize=(14, 6))
    plt.plot(range(len(test_actual)), test_actual, label="Actual Prices", color="blue")
    plt.plot(range(len(test_predictions)), test_predictions, label="Predicted Prices (Test Data)", color="green")
    plt.plot(range(len(test_actual), len(test_actual) + prediction_steps), future_predictions, label="Future Predictions", color="red")
    plt.legend()
    plt.title("Crude Oil Price Forecast and Comparison (CNN + Transformer)")
    plt.xlabel("Steps")
    plt.ylabel("Price")
    plt.show()

    mse_test = mean_squared_error(test_actual, test_predictions)
    rmse_test = np.sqrt(mse_test)
    mae_test = mean_absolute_error(test_actual, test_predictions)
    print("\nTest Data Evaluation:")
    print(f"MSE: {mse_test:.4f}, RMSE: {rmse_test:.4f}, MAE: {mae_test:.4f}")

    return future_predictions

# 6️⃣ اجرای فرآیند
file_path = "Crude oil.csv"
data = load_data(file_path)
X_train, y_train, X_test, y_test, scaler = preprocess_data(data)

model = train_evaluate_model(X_train, y_train)
future_predictions = predict_and_compare(model, X_train, y_train, X_test, y_test, scaler)

code: cnn+transformer

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

# 1️⃣ تعریف مدل ترکیبی GRU و Attention
class GRUAttentionModel(nn.Module):
    def __init__(self, input_dim, gru_hidden_dim, seq_len, dropout=0.1):
        super(GRUAttentionModel, self).__init__()

        # لایه GRU
        self.gru = nn.GRU(input_dim, gru_hidden_dim, num_layers=1, batch_first=True)

        # مکانیزم توجه (Attention)
        self.attention = nn.Linear(gru_hidden_dim * 2, seq_len)  # برای محاسبه وزن‌های توجه
        self.context_vector = nn.Linear(gru_hidden_dim, gru_hidden_dim)  # برای تولید بردار زمینه

        # لایه خروجی
        self.fc = nn.Linear(gru_hidden_dim, input_dim)
        self.dropout = nn.Dropout(dropout)
        self.seq_len = seq_len

    def forward(self, x):
        # عبور از GRU
        gru_out, _ = self.gru(x)  # (batch_size, seq_len, gru_hidden_dim)
        gru_out = self.dropout(gru_out)

        # محاسبه وزن‌های توجه
        last_hidden = gru_out[:, -1, :]  # (batch_size, gru_hidden_dim)
        last_hidden_expanded = last_hidden.unsqueeze(1).repeat(1, self.seq_len, 1)  # (batch_size, seq_len, gru_hidden_dim)
        attention_input = torch.cat((gru_out, last_hidden_expanded), dim=2)  # (batch_size, seq_len, gru_hidden_dim * 2)
        attention_weights = torch.softmax(self.attention(attention_input), dim=1)  # (batch_size, seq_len, seq_len)

        # محاسبه بردار زمینه (Context Vector)
        context = torch.bmm(attention_weights.transpose(1, 2), gru_out)  # (batch_size, seq_len, gru_hidden_dim)
        context = self.context_vector(context[:, -1, :])  # (batch_size, gru_hidden_dim)

        # پیش‌بینی با استفاده از لایه خطی
        out = self.fc(context)  # (batch_size, input_dim)
        return out

# 2️⃣ بارگذاری داده‌ها
def load_data(file_path):
    data = pd.read_csv(file_path, usecols=['Date', 'Close'])
    data['Date'] = pd.to_datetime(data['Date'])
    data.set_index('Date', inplace=True)
    return data

# 3️⃣ پیش‌پردازش داده‌ها
def preprocess_data(data, seq_len=60, train_ratio=0.8):
    scaler = MinMaxScaler()
    data_scaled = scaler.fit_transform(data)

    X, y = [], []
    for i in range(len(data_scaled) - seq_len):
        X.append(data_scaled[i:i+seq_len])
        y.append(data_scaled[i+seq_len])

    X, y = np.array(X), np.array(y)

    train_size = int(len(X) * train_ratio)
    X_train, X_test = X[:train_size], X[train_size:]
    y_train, y_test = y[:train_size], y[train_size:]

    return X_train, y_train, X_test, y_test, scaler

# 4️⃣ آموزش و اعتبارسنجی مدل
def train_evaluate_model(X, y, k=10, epochs=20, lr=0.001, model_path="gru_attention_model.pth"):
    kf = KFold(n_splits=k, shuffle=True)
    rmse_scores, mse_scores, mae_scores = [], [], []

    best_model = None
    best_mse = float('inf')

    for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
        print(f"Fold {fold+1}/{k}")

        train_x, val_x = torch.tensor(X[train_idx], dtype=torch.float32), torch.tensor(X[val_idx], dtype=torch.float32)
        train_y, val_y = torch.tensor(y[train_idx], dtype=torch.float32), torch.tensor(y[val_idx], dtype=torch.float32)

        model = GRUAttentionModel(
            input_dim=1,
            gru_hidden_dim=64,
            seq_len=60
        )
        criterion = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr=lr)

        for epoch in range(epochs):
            model.train()
            optimizer.zero_grad()
            predictions = model(train_x)
            loss = criterion(predictions.squeeze(), train_y)
            loss.backward()
            optimizer.step()

            if (epoch+1) % 5 == 0:
                print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

        model.eval()
        with torch.no_grad():
            val_predictions = model(val_x).detach().numpy()
            val_actual = val_y.numpy()

        mse = mean_squared_error(val_actual, val_predictions)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(val_actual, val_predictions)

        mse_scores.append(mse)
        rmse_scores.append(rmse)
        mae_scores.append(mae)

        print(f"Fold {fold+1} - MSE: {mse:.4f}, RMSE: {rmse:.4f}, MAE: {mae:.4f}")

        if mse < best_mse:
            best_mse = mse
            best_model = model
            torch.save(model.state_dict(), model_path)

    print("\nFinal Cross-Validation Results:")
    print(f"Average MSE: {np.mean(mse_scores):.4f}")
    print(f"Average RMSE: {np.mean(rmse_scores):.4f}")
    print(f"Average MAE: {np.mean(mae_scores):.4f}")

    return best_model

# 5️⃣ پیش‌بینی و مقایسه
def predict_and_compare(model, X_train, y_train, X_test, y_test, scaler, prediction_steps=30):
    model.eval()
    with torch.no_grad():
        test_predictions = model(torch.tensor(X_test, dtype=torch.float32)).detach().numpy()
    test_actual = y_test

    test_predictions = scaler.inverse_transform(test_predictions.reshape(-1, 1))
    test_actual = scaler.inverse_transform(test_actual.reshape(-1, 1))

    future_predictions = []
    current_input = torch.tensor(X_test[-1], dtype=torch.float32).unsqueeze(0)

    for _ in range(prediction_steps):
        with torch.no_grad():
            prediction = model(current_input).detach().numpy()
        future_predictions.append(prediction[0, 0])

        new_input = np.roll(current_input.numpy(), -1, axis=1)
        new_input[0, -1] = prediction
        current_input = torch.tensor(new_input, dtype=torch.float32)

    future_predictions = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1))

    plt.figure(figsize=(14, 6))
    plt.plot(range(len(test_actual)), test_actual, label="Actual Prices", color="blue")
    plt.plot(range(len(test_predictions)), test_predictions, label="Predicted Prices (Test Data)", color="green")
    plt.plot(range(len(test_actual), len(test_actual) + prediction_steps), future_predictions, label="Future Predictions", color="red")
    plt.legend()
    plt.title("Crude Oil Price Forecast and Comparison (GRU + Attention)")
    plt.xlabel("Steps")
    plt.ylabel("Price")
    plt.show()

    mse_test = mean_squared_error(test_actual, test_predictions)
    rmse_test = np.sqrt(mse_test)
    mae_test = mean_absolute_error(test_actual, test_predictions)
    print("\nTest Data Evaluation:")
    print(f"MSE: {mse_test:.4f}, RMSE: {rmse_test:.4f}, MAE: {mae_test:.4f}")

    return future_predictions

# 6️⃣ اجرای فرآیند
file_path = "Crude oil.csv"
data = load_data(file_path)
X_train, y_train, X_test, y_test, scaler = preprocess_data(data)

model = train_evaluate_model(X_train, y_train)
future_predictions = predict_and_compare(model, X_train, y_train, X_test, y_test, scaler)

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

# 1️⃣ تعریف مدل ترکیبی GRU و Transformer
class GRUTransformerModel(nn.Module):
    def __init__(self, input_dim, gru_hidden_dim, d_model, n_heads, num_layers, seq_len, dropout=0.1):
        super(GRUTransformerModel, self).__init__()

        # لایه GRU
        self.gru = nn.GRU(input_dim, gru_hidden_dim, num_layers=1, batch_first=True)

        # پروجکشن برای تبدیل خروجی GRU به ابعاد d_model
        self.gru_projection = nn.Linear(gru_hidden_dim, d_model)

        # لایه‌های Transformer
        transformer_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=d_model * 4,
            dropout=dropout,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(transformer_layer, num_layers=num_layers)

        # لایه خروجی
        self.fc = nn.Linear(d_model, input_dim)
        self.dropout = nn.Dropout(dropout)
        self.seq_len = seq_len

    def forward(self, x):
        # عبور از GRU
        gru_out, _ = self.gru(x)  # (batch_size, seq_len, gru_hidden_dim)
        gru_out = self.dropout(gru_out)

        # پروجکشن به ابعاد d_model
        transformer_in = self.gru_projection(gru_out)  # (batch_size, seq_len, d_model)

        # عبور از Transformer
        transformer_out = self.transformer(transformer_in)  # (batch_size, seq_len, d_model)

        # پیش‌بینی با استفاده از لایه خطی
        out = self.fc(transformer_out[:, -1, :])  # فقط از آخرین timestep استفاده می‌کنیم
        return out

# 2️⃣ بارگذاری داده‌ها
def load_data(file_path):
    data = pd.read_csv(file_path, usecols=['Date', 'Close'])
    data['Date'] = pd.to_datetime(data['Date'])
    data.set_index('Date', inplace=True)
    return data

# 3️⃣ پیش‌پردازش داده‌ها
def preprocess_data(data, seq_len=60, train_ratio=0.8):
    scaler = MinMaxScaler()
    data_scaled = scaler.fit_transform(data)

    X, y = [], []
    for i in range(len(data_scaled) - seq_len):
        X.append(data_scaled[i:i+seq_len])
        y.append(data_scaled[i+seq_len])

    X, y = np.array(X), np.array(y)

    train_size = int(len(X) * train_ratio)
    X_train, X_test = X[:train_size], X[train_size:]
    y_train, y_test = y[:train_size], y[train_size:]

    return X_train, y_train, X_test, y_test, scaler

# 4️⃣ آموزش و اعتبارسنجی مدل
def train_evaluate_model(X, y, k=10, epochs=20, lr=0.001, model_path="gru_transformer_model.pth"):
    kf = KFold(n_splits=k, shuffle=True)
    rmse_scores, mse_scores, mae_scores = [], [], []

    best_model = None
    best_mse = float('inf')

    for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
        print(f"Fold {fold+1}/{k}")

        train_x, val_x = torch.tensor(X[train_idx], dtype=torch.float32), torch.tensor(X[val_idx], dtype=torch.float32)
        train_y, val_y = torch.tensor(y[train_idx], dtype=torch.float32), torch.tensor(y[val_idx], dtype=torch.float32)

        model = GRUTransformerModel(
            input_dim=1,
            gru_hidden_dim=64,
            d_model=64,
            n_heads=8,
            num_layers=2,
            seq_len=60
        )
        criterion = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr=lr)

        for epoch in range(epochs):
            model.train()
            optimizer.zero_grad()
            predictions = model(train_x)
            loss = criterion(predictions.squeeze(), train_y)
            loss.backward()
            optimizer.step()

            if (epoch+1) % 5 == 0:
                print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

        model.eval()
        with torch.no_grad():
            val_predictions = model(val_x).detach().numpy()
            val_actual = val_y.numpy()

        mse = mean_squared_error(val_actual, val_predictions)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(val_actual, val_predictions)

        mse_scores.append(mse)
        rmse_scores.append(rmse)
        mae_scores.append(mae)

        print(f"Fold {fold+1} - MSE: {mse:.4f}, RMSE: {rmse:.4f}, MAE: {mae:.4f}")

        if mse < best_mse:
            best_mse = mse
            best_model = model
            torch.save(model.state_dict(), model_path)

    print("\nFinal Cross-Validation Results:")
    print(f"Average MSE: {np.mean(mse_scores):.4f}")
    print(f"Average RMSE: {np.mean(rmse_scores):.4f}")
    print(f"Average MAE: {np.mean(mae_scores):.4f}")

    return best_model

# 5️⃣ پیش‌بینی و مقایسه
def predict_and_compare(model, X_train, y_train, X_test, y_test, scaler, prediction_steps=30):
    model.eval()
    with torch.no_grad():
        test_predictions = model(torch.tensor(X_test, dtype=torch.float32)).detach().numpy()
    test_actual = y_test

    test_predictions = scaler.inverse_transform(test_predictions.reshape(-1, 1))
    test_actual = scaler.inverse_transform(test_actual.reshape(-1, 1))

    future_predictions = []
    current_input = torch.tensor(X_test[-1], dtype=torch.float32).unsqueeze(0)

    for _ in range(prediction_steps):
        with torch.no_grad():
            prediction = model(current_input).detach().numpy()
        future_predictions.append(prediction[0, 0])

        new_input = np.roll(current_input.numpy(), -1, axis=1)
        new_input[0, -1] = prediction
        current_input = torch.tensor(new_input, dtype=torch.float32)

    future_predictions = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1))

    plt.figure(figsize=(14, 6))
    plt.plot(range(len(test_actual)), test_actual, label="Actual Prices", color="blue")
    plt.plot(range(len(test_predictions)), test_predictions, label="Predicted Prices (Test Data)", color="green")
    plt.plot(range(len(test_actual), len(test_actual) + prediction_steps), future_predictions, label="Future Predictions", color="red")
    plt.legend()
    plt.title("Crude Oil Price Forecast and Comparison (GRU + Transformer)")
    plt.xlabel("Steps")
    plt.ylabel("Price")
    plt.show()

    mse_test = mean_squared_error(test_actual, test_predictions)
    rmse_test = np.sqrt(mse_test)
    mae_test = mean_absolute_error(test_actual, test_predictions)
    print("\nTest Data Evaluation:")
    print(f"MSE: {mse_test:.4f}, RMSE: {rmse_test:.4f}, MAE: {mae_test:.4f}")

    return future_predictions

# 6️⃣ اجرای فرآیند
file_path = "Crude oil.csv"
data = load_data(file_path)
X_train, y_train, X_test, y_test, scaler = preprocess_data(data)

model = train_evaluate_model(X_train, y_train)
future_predictions = predict_and_compare(model, X_train, y_train, X_test, y_test, scaler)

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

class GRUModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, seq_len):
        super(GRUModel, self).__init__()
        self.gru = nn.GRU(input_dim, hidden_dim, num_layers=1, batch_first=True)
        self.fc = nn.Linear(hidden_dim, input_dim)
        self.seq_len = seq_len

    def forward(self, x):
        gru_out, _ = self.gru(x)
        out = self.fc(gru_out[:, -1, :])
        return out

def load_data(file_path):
    data = pd.read_csv(file_path, usecols=['Date', 'Close'])
    data['Date'] = pd.to_datetime(data['Date'])
    data.set_index('Date', inplace=True)
    return data

def preprocess_data(data, seq_len=60, train_ratio=0.8):
    scaler = MinMaxScaler()
    data_scaled = scaler.fit_transform(data)
    X, y = [], []
    for i in range(len(data_scaled) - seq_len):
        X.append(data_scaled[i:i+seq_len])
        y.append(data_scaled[i+seq_len])
    X, y = np.array(X), np.array(y)
    train_size = int(len(X) * train_ratio)
    X_train, X_test = X[:train_size], X[train_size:]
    y_train, y_test = y[:train_size], y[train_size:]
    return X_train, y_train, X_test, y_test, scaler

def train_evaluate_model(X, y, k=10, epochs=20, lr=0.001, model_path="gru_model.pth"):
    kf = KFold(n_splits=k, shuffle=True)
    rmse_scores, mse_scores, mae_scores = [], [], []
    best_model, best_mse = None, float('inf')
    for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
        print(f"Fold {fold+1}/{k}")
        train_x, val_x = torch.tensor(X[train_idx], dtype=torch.float32), torch.tensor(X[val_idx], dtype=torch.float32)
        train_y, val_y = torch.tensor(y[train_idx], dtype=torch.float32), torch.tensor(y[val_idx], dtype=torch.float32)
        model = GRUModel(input_dim=1, hidden_dim=64, seq_len=60)
        criterion = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr=lr)
        for epoch in range(epochs):
            model.train()
            optimizer.zero_grad()
            predictions = model(train_x)
            loss = criterion(predictions.squeeze(), train_y)
            loss.backward()
            optimizer.step()
            if (epoch+1) % 5 == 0:
                print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")
        model.eval()
        with torch.no_grad():
            val_predictions = model(val_x).detach().numpy()
            val_actual = val_y.numpy()
        mse = mean_squared_error(val_actual, val_predictions)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(val_actual, val_predictions)
        mse_scores.append(mse)
        rmse_scores.append(rmse)
        mae_scores.append(mae)
        print(f"Fold {fold+1} - MSE: {mse:.4f}, RMSE: {rmse:.4f}, MAE: {mae:.4f}")
        if mse < best_mse:
            best_mse = mse
            best_model = model
            torch.save(model.state_dict(), model_path)
    print("\nFinal Cross-Validation Results:")
    print(f"Average MSE: {np.mean(mse_scores):.4f}")
    print(f"Average RMSE: {np.mean(rmse_scores):.4f}")
    print(f"Average MAE: {np.mean(mae_scores):.4f}")
    return best_model

def predict_and_compare(model, X_test, y_test, scaler, prediction_steps=30):
    model.eval()
    with torch.no_grad():
        test_predictions = model(torch.tensor(X_test, dtype=torch.float32)).detach().numpy()
    test_actual = scaler.inverse_transform(y_test.reshape(-1, 1))
    test_predictions = scaler.inverse_transform(test_predictions.reshape(-1, 1))
    future_predictions = []
    current_input = torch.tensor(X_test[-1], dtype=torch.float32).unsqueeze(0)
    for _ in range(prediction_steps):
        with torch.no_grad():
            prediction = model(current_input).detach().numpy()
        future_predictions.append(prediction[0, 0])
        new_input = np.roll(current_input.numpy(), -1, axis=1)
        new_input[0, -1] = prediction
        current_input = torch.tensor(new_input, dtype=torch.float32)
    future_predictions = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1))
    plt.figure(figsize=(14, 6))
    plt.plot(range(len(test_actual)), test_actual, label="Actual Prices", color="blue")
    plt.plot(range(len(test_predictions)), test_predictions, label="Predicted Prices (Test Data)", color="green")
    plt.plot(range(len(test_actual), len(test_actual) + prediction_steps), future_predictions, label="Future Predictions", color="red")
    plt.legend()
    plt.title("Crude Oil Price Forecast (GRU)")
    plt.xlabel("Steps")
    plt.ylabel("Price")
    plt.show()
    mse_test = mean_squared_error(test_actual, test_predictions)
    rmse_test = np.sqrt(mse_test)
    mae_test = mean_absolute_error(test_actual, test_predictions)
    print("\nTest Data Evaluation:")
    print(f"MSE: {mse_test:.4f}, RMSE: {rmse_test:.4f}, MAE: {mae_test:.4f}")
    return future_predictions

file_path = "Crude oil.csv"
data = load_data(file_path)
X_train, y_train, X_test, y_test, scaler = preprocess_data(data)
model = train_evaluate_model(X_train, y_train)
future_predictions = predict_and_compare(model, X_test, y_test, scaler)

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, seq_len):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=1, batch_first=True)
        self.fc = nn.Linear(hidden_dim, input_dim)
        self.seq_len = seq_len

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        out = self.fc(lstm_out[:, -1, :])
        return out

def load_data(file_path):
    data = pd.read_csv(file_path, usecols=['Date', 'Close'])
    data['Date'] = pd.to_datetime(data['Date'])
    data.set_index('Date', inplace=True)
    return data

def preprocess_data(data, seq_len=60, train_ratio=0.8):
    scaler = MinMaxScaler()
    data_scaled = scaler.fit_transform(data)
    X, y = [], []
    for i in range(len(data_scaled) - seq_len):
        X.append(data_scaled[i:i+seq_len])
        y.append(data_scaled[i+seq_len])
    X, y = np.array(X), np.array(y)
    train_size = int(len(X) * train_ratio)
    X_train, X_test = X[:train_size], X[train_size:]
    y_train, y_test = y[:train_size], y[train_size:]
    return X_train, y_train, X_test, y_test, scaler

def train_evaluate_model(X, y, k=10, epochs=20, lr=0.001, model_path="lstm_model.pth"):
    kf = KFold(n_splits=k, shuffle=True)
    rmse_scores, mse_scores, mae_scores = [], [], []
    best_model, best_mse = None, float('inf')
    for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
        print(f"Fold {fold+1}/{k}")
        train_x, val_x = torch.tensor(X[train_idx], dtype=torch.float32), torch.tensor(X[val_idx], dtype=torch.float32)
        train_y, val_y = torch.tensor(y[train_idx], dtype=torch.float32), torch.tensor(y[val_idx], dtype=torch.float32)
        model = LSTMModel(input_dim=1, hidden_dim=64, seq_len=60)
        criterion = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr=lr)
        for epoch in range(epochs):
            model.train()
            optimizer.zero_grad()
            predictions = model(train_x)
            loss = criterion(predictions.squeeze(), train_y)
            loss.backward()
            optimizer.step()
            if (epoch+1) % 5 == 0:
                print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")
        model.eval()
        with torch.no_grad():
            val_predictions = model(val_x).detach().numpy()
            val_actual = val_y.numpy()
        mse = mean_squared_error(val_actual, val_predictions)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(val_actual, val_predictions)
        mse_scores.append(mse)
        rmse_scores.append(rmse)
        mae_scores.append(mae)
        print(f"Fold {fold+1} - MSE: {mse:.4f}, RMSE: {rmse:.4f}, MAE: {mae:.4f}")
        if mse < best_mse:
            best_mse = mse
            best_model = model
            torch.save(model.state_dict(), model_path)
    print("\nFinal Cross-Validation Results:")
    print(f"Average MSE: {np.mean(mse_scores):.4f}")
    print(f"Average RMSE: {np.mean(rmse_scores):.4f}")
    print(f"Average MAE: {np.mean(mae_scores):.4f}")
    return best_model

def predict_and_compare(model, X_test, y_test, scaler, prediction_steps=30):
    model.eval()
    with torch.no_grad():
        test_predictions = model(torch.tensor(X_test, dtype=torch.float32)).detach().numpy()
    test_actual = scaler.inverse_transform(y_test.reshape(-1, 1))
    test_predictions = scaler.inverse_transform(test_predictions.reshape(-1, 1))
    future_predictions = []
    current_input = torch.tensor(X_test[-1], dtype=torch.float32).unsqueeze(0)
    for _ in range(prediction_steps):
        with torch.no_grad():
            prediction = model(current_input).detach().numpy()
        future_predictions.append(prediction[0, 0])
        new_input = np.roll(current_input.numpy(), -1, axis=1)
        new_input[0, -1] = prediction
        current_input = torch.tensor(new_input, dtype=torch.float32)
    future_predictions = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1))
    plt.figure(figsize=(14, 6))
    plt.plot(range(len(test_actual)), test_actual, label="Actual Prices", color="blue")
    plt.plot(range(len(test_predictions)), test_predictions, label="Predicted Prices (Test Data)", color="green")
    plt.plot(range(len(test_actual), len(test_actual) + prediction_steps), future_predictions, label="Future Predictions", color="red")
    plt.legend()
    plt.title("Crude Oil Price Forecast (LSTM)")
    plt.xlabel("Steps")
    plt.ylabel("Price")
    plt.show()
    mse_test = mean_squared_error(test_actual, test_predictions)
    rmse_test = np.sqrt(mse_test)
    mae_test = mean_absolute_error(test_actual, test_predictions)
    print("\nTest Data Evaluation:")
    print(f"MSE: {mse_test:.4f}, RMSE: {rmse_test:.4f}, MAE: {mae_test:.4f}")
    return future_predictions

file_path = "Crude oil.csv"
data = load_data(file_path)
X_train, y_train, X_test, y_test, scaler = preprocess_data(data)
model = train_evaluate_model(X_train, y_train)
future_predictions = predict_and_compare(model, X_test, y_test, scaler)

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

class TransformerModel(nn.Module):
    def __init__(self, input_dim, d_model, n_heads, num_layers, seq_len):
        super(TransformerModel, self).__init__()
        self.input_projection = nn.Linear(input_dim, d_model)
        self.positional_encoding = nn.Parameter(torch.zeros(1, seq_len, d_model))
        transformer_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=n_heads, dim_feedforward=d_model*4, batch_first=True)
        self.transformer = nn.TransformerEncoder(transformer_layer, num_layers=num_layers)
        self.fc = nn.Linear(d_model, input_dim)
        self.seq_len = seq_len

    def forward(self, x):
        x = self.input_projection(x) + self.positional_encoding
        x = self.transformer(x)
        out = self.fc(x[:, -1, :])
        return out

def load_data(file_path):
    data = pd.read_csv(file_path, usecols=['Date', 'Close'])
    data['Date'] = pd.to_datetime(data['Date'])
    data.set_index('Date', inplace=True)
    return data

def preprocess_data(data, seq_len=60, train_ratio=0.8):
    scaler = MinMaxScaler()
    data_scaled = scaler.fit_transform(data)
    X, y = [], []
    for i in range(len(data_scaled) - seq_len):
        X.append(data_scaled[i:i+seq_len])
        y.append(data_scaled[i+seq_len])
    X, y = np.array(X), np.array(y)
    train_size = int(len(X) * train_ratio)
    X_train, X_test = X[:train_size], X[train_size:]
    y_train, y_test = y[:train_size], y[train_size:]
    return X_train, y_train, X_test, y_test, scaler

def train_evaluate_model(X, y, k=10, epochs=20, lr=0.001, model_path="transformer_model.pth"):
    kf = KFold(n_splits=k, shuffle=True)
    rmse_scores, mse_scores, mae_scores = [], [], []
    best_model, best_mse = None, float('inf')
    for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
        print(f"Fold {fold+1}/{k}")
        train_x, val_x = torch.tensor(X[train_idx], dtype=torch.float32), torch.tensor(X[val_idx], dtype=torch.float32)
        train_y, val_y = torch.tensor(y[train_idx], dtype=torch.float32), torch.tensor(y[val_idx], dtype=torch.float32)
        model = TransformerModel(input_dim=1, d_model=64, n_heads=8, num_layers=2, seq_len=60)
        criterion = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr=lr)
        for epoch in range(epochs):
            model.train()
            optimizer.zero_grad()
            predictions = model(train_x)
            loss = criterion(predictions.squeeze(), train_y)
            loss.backward()
            optimizer.step()
            if (epoch+1) % 5 == 0:
                print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")
        model.eval()
        with torch.no_grad():
            val_predictions = model(val_x).detach().numpy()
            val_actual = val_y.numpy()
        mse = mean_squared_error(val_actual, val_predictions)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(val_actual, val_predictions)
        mse_scores.append(mse)
        rmse_scores.append(rmse)
        mae_scores.append(mae)
        print(f"Fold {fold+1} - MSE: {mse:.4f}, RMSE: {rmse:.4f}, MAE: {mae:.4f}")
        if mse < best_mse:
            best_mse = mse
            best_model = model
            torch.save(model.state_dict(), model_path)
    print("\nFinal Cross-Validation Results:")
    print(f"Average MSE: {np.mean(mse_scores):.4f}")
    print(f"Average RMSE: {np.mean(rmse_scores):.4f}")
    print(f"Average MAE: {np.mean(mae_scores):.4f}")
    return best_model

def predict_and_compare(model, X_test, y_test, scaler, prediction_steps=30):
    model.eval()
    with torch.no_grad():
        test_predictions = model(torch.tensor(X_test, dtype=torch.float32)).detach().numpy()
    test_actual = scaler.inverse_transform(y_test.reshape(-1, 1))
    test_predictions = scaler.inverse_transform(test_predictions.reshape(-1, 1))
    future_predictions = []
    current_input = torch.tensor(X_test[-1], dtype=torch.float32).unsqueeze(0)
    for _ in range(prediction_steps):
        with torch.no_grad():
            prediction = model(current_input).detach().numpy()
        future_predictions.append(prediction[0, 0])
        new_input = np.roll(current_input.numpy(), -1, axis=1)
        new_input[0, -1] = prediction
        current_input = torch.tensor(new_input, dtype=torch.float32)
    future_predictions = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1))
    plt.figure(figsize=(14, 6))
    plt.plot(range(len(test_actual)), test_actual, label="Actual Prices", color="blue")
    plt.plot(range(len(test_predictions)), test_predictions, label="Predicted Prices (Test Data)", color="green")
    plt.plot(range(len(test_actual), len(test_actual) + prediction_steps), future_predictions, label="Future Predictions", color="red")
    plt.legend()
    plt.title("Crude Oil Price Forecast (Transformer)")
    plt.xlabel("Steps")
    plt.ylabel("Price")
    plt.show()
    mse_test = mean_squared_error(test_actual, test_predictions)
    rmse_test = np.sqrt(mse_test)
    mae_test = mean_absolute_error(test_actual, test_predictions)
    print("\nTest Data Evaluation:")
    print(f"MSE: {mse_test:.4f}, RMSE: {rmse_test:.4f}, MAE: {mae_test:.4f}")
    return future_predictions

file_path = "Crude oil.csv"
data = load_data(file_path)
X_train, y_train, X_test, y_test, scaler = preprocess_data(data)
model = train_evaluate_model(X_train, y_train)
future_predictions = predict_and_compare(model, X_test, y_test, scaler)

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt
import math

class ProbSparseAttention(nn.Module):
    def __init__(self, d_model, n_heads, factor=5):
        super(ProbSparseAttention, self).__init__()
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads
        self.factor = factor
        self.query_projection = nn.Linear(d_model, d_model)
        self.key_projection = nn.Linear(d_model, d_model)
        self.value_projection = nn.Linear(d_model, d_model)
        self.out_projection = nn.Linear(d_model, d_model)

    def forward(self, Q, K, V, mask=None):
        batch_size, seq_len, _ = Q.size()
        Q = self.query_projection(Q).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        K = self.key_projection(K).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        V = self.value_projection(V).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        _, indices = scores.topk(self.factor, dim=-1)
        sparse_scores = torch.zeros_like(scores).scatter_(-1, indices, scores)
        attn = torch.softmax(sparse_scores, dim=-1)
        out = torch.matmul(attn, V)
        out = out.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
        return self.out_projection(out)

class Informer(nn.Module):
    def __init__(self, input_dim, d_model, n_heads, num_layers, seq_len, pred_len):
        super(Informer, self).__init__()
        self.input_projection = nn.Linear(input_dim, d_model)
        self.positional_encoding = nn.Parameter(torch.zeros(1, max(seq_len, pred_len), d_model))
        self.encoder_layers = nn.ModuleList([nn.ModuleList([ProbSparseAttention(d_model, n_heads), nn.LayerNorm(d_model), nn.Linear(d_model, d_model*4), nn.ReLU(), nn.Linear(d_model*4, d_model), nn.LayerNorm(d_model)]) for _ in range(num_layers)])
        self.decoder_projection = nn.Linear(input_dim, d_model)
        self.decoder_layers = nn.ModuleList([nn.ModuleList([ProbSparseAttention(d_model, n_heads), nn.LayerNorm(d_model), ProbSparseAttention(d_model, n_heads), nn.LayerNorm(d_model), nn.Linear(d_model, d_model*4), nn.ReLU(), nn.Linear(d_model*4, d_model), nn.LayerNorm(d_model)]) for _ in range(num_layers)])
        self.output_projection = nn.Linear(d_model, input_dim)
        self.seq_len = seq_len
        self.pred_len = pred_len

    def forward(self, x_enc, x_dec):
        enc_input = self.input_projection(x_enc) + self.positional_encoding[:, :x_enc.size(1), :]
        for attn, norm1, ff1, relu, ff2, norm2 in self.encoder_layers:
            attn_output = attn(enc_input, enc_input, enc_input)
            enc_input = norm1(enc_input + attn_output)
            ff_output = ff2(relu(ff1(enc_input)))
            enc_input = norm2(enc_input + ff_output)
        dec_input = self.decoder_projection(x_dec) + self.positional_encoding[:, :x_dec.size(1), :]
        for attn1, norm1, attn2, norm2, ff1, relu, ff2, norm3 in self.decoder_layers:
            dec_input = norm1(dec_input + attn1(dec_input, dec_input, dec_input))
            dec_input = norm2(dec_input + attn2(dec_input, enc_input, enc_input))
            ff_output = ff2(relu(ff1(dec_input)))
            dec_input = norm3(dec_input + ff_output)
        output = self.output_projection(dec_input[:, -self.pred_len:, :])
        return output

def load_data(file_path):
    data = pd.read_csv(file_path, usecols=['Date', 'Close'])
    data['Date'] = pd.to_datetime(data['Date'])
    data.set_index('Date', inplace=True)
    return data

def preprocess_data(data, seq_len=60, train_ratio=0.8):
    scaler = MinMaxScaler()
    data_scaled = scaler.fit_transform(data)
    X, y = [], []
    for i in range(len(data_scaled) - seq_len):
        X.append(data_scaled[i:i+seq_len])
        y.append(data_scaled[i+seq_len])
    X, y = np.array(X), np.array(y)
    train_size = int(len(X) * train_ratio)
    X_train, X_test = X[:train_size], X[train_size:]
    y_train, y_test = y[:train_size], y[train_size:]
    return X_train, y_train, X_test, y_test, scaler

def train_evaluate_model(X, y, k=10, epochs=20, lr=0.001, model_path="informer_model.pth"):
    kf = KFold(n_splits=k, shuffle=True)
    rmse_scores, mse_scores, mae_scores = [], [], []
    best_model, best_mse = None, float('inf')
    for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
        print(f"Fold {fold+1}/{k}")
        train_x, val_x = torch.tensor(X[train_idx], dtype=torch.float32), torch.tensor(X[val_idx], dtype=torch.float32)
        train_y, val_y = torch.tensor(y[train_idx], dtype=torch.float32), torch.tensor(y[val_idx], dtype=torch.float32)
        model = Informer(input_dim=1, d_model=64, n_heads=8, num_layers=2, seq_len=60, pred_len=1)
        criterion = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr=lr)
        for epoch in range(epochs):
            model.train()
            optimizer.zero_grad()
            predictions = model(train_x, train_x)
            loss = criterion(predictions.squeeze(), train_y)
            loss.backward()
            optimizer.step()
            if (epoch+1) % 5 == 0:
                print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")
        model.eval()
        with torch.no_grad():
            val_predictions = model(val_x, val_x).detach().numpy()
            val_actual = val_y.numpy()
        mse = mean_squared_error(val_actual, val_predictions)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(val_actual, val_predictions)
        mse_scores.append(mse)
        rmse_scores.append(rmse)
        mae_scores.append(mae)
        print(f"Fold {fold+1} - MSE: {mse:.4f}, RMSE: {rmse:.4f}, MAE: {mae:.4f}")
        if mse < best_mse:
            best_mse = mse
            best_model = model
            torch.save(model.state_dict(), model_path)
    print("\nFinal Cross-Validation Results:")
    print(f"Average MSE: {np.mean(mse_scores):.4f}")
    print(f"Average RMSE: {np.mean(rmse_scores):.4f}")
    print(f"Average MAE: {np.mean(mae_scores):.4f}")
    return best_model

def predict_and_compare(model, X_test, y_test, scaler, prediction_steps=30):
    model.eval()
    with torch.no_grad():
        test_predictions = model(torch.tensor(X_test, dtype=torch.float32), torch.tensor(X_test, dtype=torch.float32)).detach().numpy()
    test_actual = scaler.inverse_transform(y_test.reshape(-1, 1))
    test_predictions = scaler.inverse_transform(test_predictions.reshape(-1, 1))
    future_predictions = []
    current_input = torch.tensor(X_test[-1], dtype=torch.float32).unsqueeze(0)
    for _ in range(prediction_steps):
        with torch.no_grad():
            prediction = model(current_input, current_input).detach().numpy()
        future_predictions.append(prediction[0, 0])
        new_input = np.roll(current_input.numpy(), -1, axis=1)
        new_input[0, -1] = prediction
        current_input = torch.tensor(new_input, dtype=torch.float32)
    future_predictions = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1))
    plt.figure(figsize=(14, 6))
    plt.plot(range(len(test_actual)), test_actual, label="Actual Prices", color="blue")
    plt.plot(range(len(test_predictions)), test_predictions, label="Predicted Prices (Test Data)", color="green")
    plt.plot(range(len(test_actual), len(test_actual) + prediction_steps), future_predictions, label="Future Predictions", color="red")
    plt.legend()
    plt.title("Crude Oil Price Forecast (Informer)")
    plt.xlabel("Steps")
    plt.ylabel("Price")
    plt.show()
    mse_test = mean_squared_error(test_actual, test_predictions)
    rmse_test = np.sqrt(mse_test)
    mae_test = mean_absolute_error(test_actual, test_predictions)
    print("\nTest Data Evaluation:")
    print(f"MSE: {mse_test:.4f}, RMSE: {rmse_test:.4f}, MAE: {mae_test:.4f}")
    return future_predictions

file_path = "Crude oil.csv"
data = load_data(file_path)
X_train, y_train, X_test, y_test, scaler = preprocess_data(data)
model = train_evaluate_model(X_train, y_train)
future_predictions = predict_and_compare(model, X_test, y_test, scaler)

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

class AutoCorrelation(nn.Module):
    def __init__(self, d_model, n_heads, factor=5):
        super(AutoCorrelation, self).__init__()
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads
        self.factor = factor
        self.query_projection = nn.Linear(d_model, d_model)
        self.key_projection = nn.Linear(d_model, d_model)
        self.value_projection = nn.Linear(d_model, d_model)
        self.out_projection = nn.Linear(d_model, d_model)

    def forward(self, Q, K, V):
        batch_size, seq_len, _ = Q.size()
        Q = self.query_projection(Q).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        K = self.key_projection(K).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        V = self.value_projection(V).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        Q_fft = torch.fft.rfft(Q, dim=2)
        K_fft = torch.fft.rfft(K, dim=2)
        scores = torch.fft.irfft(Q_fft * torch.conj(K_fft), dim=2, n=seq_len)
        _, indices = scores.topk(self.factor, dim=-1)
        sparse_scores = torch.zeros_like(scores).scatter_(-1, indices, scores)
        attn = torch.softmax(sparse_scores, dim=-1)
        out = torch.matmul(attn, V)
        out = out.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
        return self.out_projection(out)

class SeriesDecomposition(nn.Module):
    def __init__(self, kernel_size=25):
        super(SeriesDecomposition, self).__init__()
        self.avg_pool = nn.AvgPool1d(kernel_size, stride=1, padding=kernel_size//2)

    def forward(self, x):
        trend = self.avg_pool(x.permute(0, 2, 1)).permute(0, 2, 1)
        seasonal = x - trend
        return trend, seasonal

class Autoformer(nn.Module):
    def __init__(self, input_dim, d_model, n_heads, num_layers, seq_len, pred_len):
        super(Autoformer, self).__init__()
        self.input_projection = nn.Linear(input_dim, d_model)
        self.decomposition = SeriesDecomposition()
        self.encoder_layers = nn.ModuleList([nn.ModuleList([AutoCorrelation(d_model, n_heads), nn.LayerNorm(d_model), nn.Linear(d_model, d_model*4), nn.ReLU(), nn.Linear(d_model*4, d_model), nn.LayerNorm(d_model)]) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([nn.ModuleList([AutoCorrelation(d_model, n_heads), nn.LayerNorm(d_model), AutoCorrelation(d_model, n_heads), nn.LayerNorm(d_model), nn.Linear(d_model, d_model*4), nn.ReLU(), nn.Linear(d_model*4, d_model), nn.LayerNorm(d_model)]) for _ in range(num_layers)])
        self.output_projection = nn.Linear(d_model, input_dim)
        self.seq_len = seq_len
        self.pred_len = pred_len

    def forward(self, x_enc, x_dec):
        trend, seasonal = self.decomposition(x_enc)
        enc_input = self.input_projection(seasonal)
        for attn, norm1, ff1, relu, ff2, norm2 in self.encoder_layers:
            attn_output = attn(enc_input, enc_input, enc_input)
            enc_input = norm1(enc_input + attn_output)
            ff_output = ff2(relu(ff1(enc_input)))
            enc_input = norm2(enc_input + ff_output)
        dec_input = self.input_projection(x_dec)
        for attn1, norm1, attn2, norm2, ff1, relu, ff2, norm3 in self.decoder_layers:
            dec_input = norm1(dec_input + attn1(dec_input, dec_input, dec_input))
            dec_input = norm2(dec_input + attn2(dec_input, enc_input, enc_input))
            ff_output = ff2(relu(ff1(dec_input)))
            dec_input = norm3(dec_input + ff_output)
        output = self.output_projection(dec_input[:, -self.pred_len:, :]) + trend[:, -self.pred_len:, :]
        return output

def load_data(file_path):
    data = pd.read_csv(file_path, usecols=['Date', 'Close'])
    data['Date'] = pd.to_datetime(data['Date'])
    data.set_index('Date', inplace=True)
    return data

def preprocess_data(data, seq_len=60, train_ratio=0.8):
    scaler = MinMaxScaler()
    data_scaled = scaler.fit_transform(data)
    X, y = [], []
    for i in range(len(data_scaled) - seq_len):
        X.append(data_scaled[i:i+seq_len])
        y.append(data_scaled[i+seq_len])
    X, y = np.array(X), np.array(y)
    train_size = int(len(X) * train_ratio)
    X_train, X_test = X[:train_size], X[train_size:]
    y_train, y_test = y[:train_size], y[train_size:]
    return X_train, y_train, X_test, y_test, scaler

def train_evaluate_model(X, y, k=10, epochs=20, lr=0.001, model_path="autoformer_model.pth"):
    kf = KFold(n_splits=k, shuffle=True)
    rmse_scores, mse_scores, mae_scores = [], [], []
    best_model, best_mse = None, float('inf')
    for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
        print(f"Fold {fold+1}/{k}")
        train_x, val_x = torch.tensor(X[train_idx], dtype=torch.float32), torch.tensor(X[val_idx], dtype=torch.float32)
        train_y, val_y = torch.tensor(y[train_idx], dtype=torch.float32), torch.tensor(y[val_idx], dtype=torch.float32)
        model = Autoformer(input_dim=1, d_model=64, n_heads=8, num_layers=2, seq_len=60, pred_len=1)
        criterion = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr=lr)
        for epoch in range(epochs):
            model.train()
            optimizer.zero_grad()
            predictions = model(train_x, train_x)
            loss = criterion(predictions.squeeze(), train_y)
            loss.backward()
            optimizer.step()
            if (epoch+1) % 5 == 0:
                print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")
        model.eval()
        with torch.no_grad():
            val_predictions = model(val_x, val_x).detach().numpy()
            val_actual = val_y.numpy()
        mse = mean_squared_error(val_actual, val_predictions)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(val_actual, val_predictions)
        mse_scores.append(mse)
        rmse_scores.append(rmse)
        mae_scores.append(mae)
        print(f"Fold {fold+1} - MSE: {mse:.4f}, RMSE: {rmse:.4f}, MAE: {mae:.4f}")
        if mse < best_mse:
            best_mse = mse
            best_model = model
            torch.save(model.state_dict(), model_path)
    print("\nFinal Cross-Validation Results:")
    print(f"Average MSE: {np.mean(mse_scores):.4f}")
    print(f"Average RMSE: {np.mean(rmse_scores):.4f}")
    print(f"Average MAE: {np.mean(mae_scores):.4f}")
    return best_model

def predict_and_compare(model, X_test, y_test, scaler, prediction_steps=30):
    model.eval()
    with torch.no_grad():
        test_predictions = model(torch.tensor(X_test, dtype=torch.float32), torch.tensor(X_test, dtype=torch.float32)).detach().numpy()
    test_actual = scaler.inverse_transform(y_test.reshape(-1, 1))
    test_predictions = scaler.inverse_transform(test_predictions.reshape(-1, 1))
    future_predictions = []
    current_input = torch.tensor(X_test[-1], dtype=torch.float32).unsqueeze(0)
    for _ in range(prediction_steps):
        with torch.no_grad():
            prediction = model(current_input, current_input).detach().numpy()
        future_predictions.append(prediction[0, 0])
        new_input = np.roll(current_input.numpy(), -1, axis=1)
        new_input[0, -1] = prediction
        current_input = torch.tensor(new_input, dtype=torch.float32)
    future_predictions = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1))
    plt.figure(figsize=(14, 6))
    plt.plot(range(len(test_actual)), test_actual, label="Actual Prices", color="blue")
    plt.plot(range(len(test_predictions)), test_predictions, label="Predicted Prices (Test Data)", color="green")
    plt.plot(range(len(test_actual), len(test_actual) + prediction_steps), future_predictions, label="Future Predictions", color="red")
    plt.legend()
    plt.title("Crude Oil Price Forecast (Autoformer)")
    plt.xlabel("Steps")
    plt.ylabel("Price")
    plt.show()
    mse_test = mean_squared_error(test_actual, test_predictions)
    rmse_test = np.sqrt(mse_test)
    mae_test = mean_absolute_error(test_actual, test_predictions)
    print("\nTest Data Evaluation:")
    print(f"MSE: {mse_test:.4f}, RMSE: {rmse_test:.4f}, MAE: {mae_test:.4f}")
    return future_predictions

file_path = "Crude oil.csv"
data = load_data(file_path)
X_train, y_train, X_test, y_test, scaler = preprocess_data(data)
model = train_evaluate_model(X_train, y_train)
future_predictions = predict_and_compare(model, X_test, y_test, scaler)

In [None]:
import numpy as np
import pandas as pd
from prophet import Prophet
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

def load_data(file_path):
    data = pd.read_csv(file_path, usecols=['Date', 'Close'])
    data = data.rename(columns={'Date': 'ds', 'Close': 'y'})
    return data

def train_evaluate_model(data, k=10):
    rmse_scores, mse_scores, mae_scores = [], [], []
    kf = KFold(n_splits=k, shuffle=False)  # shuffle=False برای حفظ ترتیب زمانی
    best_model, best_mse = None, float('inf')
    for fold, (train_idx, val_idx) in enumerate(kf.split(data)):
        print(f"Fold {fold+1}/{k}")
        train_data, val_data = data.iloc[train_idx], data.iloc[val_idx]
        model = Prophet(yearly_seasonality=True, weekly_seasonality=True, daily_seasonality=False)
        model.fit(train_data)
        future_dates = model.make_future_dataframe(periods=30)
        forecast = model.predict(future_dates)
        val_predictions = forecast.tail(len(val_data))['yhat'].values
        val_actual = val_data['y'].values
        mse = mean_squared_error(val_actual, val_predictions)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(val_actual, val_predictions)
        mse_scores.append(mse)
        rmse_scores.append(rmse)
        mae_scores.append(mae)
        print(f"Fold {fold+1} - MSE: {mse:.4f}, RMSE: {rmse:.4f}, MAE: {mae:.4f}")
        if mse < best_mse:
            best_mse = mse
            best_model = model
    print("\nFinal Cross-Validation Results:")
    print(f"Average MSE: {np.mean(mse_scores):.4f}")
    print(f"Average RMSE: {np.mean(rmse_scores):.4f}")
    print(f"Average MAE: {np.mean(mae_scores):.4f}")
    return best_model

def predict_and_compare(model, data, prediction_steps=30):
    future_dates = model.make_future_dataframe(periods=prediction_steps)
    forecast = model.predict(future_dates)
    test_data = data.tail(len(forecast) - prediction_steps)
    test_actual = test_data['y'].values
    test_predictions = forecast.tail(len(test_data))['yhat'].values
    future_predictions = forecast.tail(prediction_steps)['yhat'].values
    plt.figure(figsize=(14, 6))
    plt.plot(test_data.index, test_actual, label="Actual Prices", color="blue")
    plt.plot(test_data.index, test_predictions, label="Predicted Prices (Test Data)", color="green")
    plt.plot(forecast.tail(prediction_steps).index, future_predictions, label="Future Predictions", color="red")
    plt.legend()
    plt.title("Crude Oil Price Forecast (Prophet)")
    plt.xlabel("Date")
    plt.ylabel("Price")
    plt.show()
    mse_test = mean_squared_error(test_actual, test_predictions)
    rmse_test = np.sqrt(mse_test)
    mae_test = mean_absolute_error(test_actual, test_predictions)
    print("\nTest Data Evaluation:")
    print(f"MSE: {mse_test:.4f}, RMSE: {rmse_test:.4f}, MAE: {mae_test:.4f}")
    return future_predictions

file_path = "Crude oil.csv"
data = load_data(file_path)
model = train_evaluate_model(data)
future_predictions = predict_and_compare(model, data)

In [None]:
import numpy as np
import pandas as pd
from statsmodels.tsa.arima.model import ARIMA
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

def load_data(file_path):
    data = pd.read_csv(file_path, usecols=['Date', 'Close'])
    data['Date'] = pd.to_datetime(data['Date'])
    data.set_index('Date', inplace=True)
    return data['Close']

def train_evaluate_model(data, k=10, order=(5,1,0)):
    rmse_scores, mse_scores, mae_scores = [], [], []
    kf = KFold(n_splits=k, shuffle=False)
    best_model, best_mse = None, float('inf')
    for fold, (train_idx, val_idx) in enumerate(kf.split(data)):
        print(f"Fold {fold+1}/{k}")
        train_data, val_data = data.iloc[train_idx], data.iloc[val_idx]
        model = ARIMA(train_data, order=order)
        model_fit = model.fit()
        val_predictions = model_fit.forecast(steps=len(val_data))
        mse = mean_squared_error(val_data, val_predictions)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(val_data, val_predictions)
        mse_scores.append(mse)
        rmse_scores.append(rmse)
        mae_scores.append(mae)
        print(f"Fold {fold+1} - MSE: {mse:.4f}, RMSE: {rmse:.4f}, MAE: {mae:.4f}")
        if mse < best_mse:
            best_mse = mse
            best_model = model_fit
    print("\nFinal Cross-Validation Results:")
    print(f"Average MSE: {np.mean(mse_scores):.4f}")
    print(f"Average RMSE: {np.mean(rmse_scores):.4f}")
    print(f"Average MAE: {np.mean(mae_scores):.4f}")
    return best_model

def predict_and_compare(model, data, prediction_steps=30):
    test_actual = data.tail(len(data) - len(model.arima_resid)).values
    test_predictions = model.forecast(steps=len(test_actual))
    future_predictions = model.forecast(steps=prediction_steps)
    plt.figure(figsize=(14, 6))
    plt.plot(data.index[-len(test_actual):], test_actual, label="Actual Prices", color="blue")
    plt.plot(data.index[-len(test_actual):], test_predictions, label="Predicted Prices (Test Data)", color="green")
    future_dates = pd.date_range(start=data.index[-1], periods=prediction_steps + 1, freq='D')[1:]
    plt.plot(future_dates, future_predictions, label="Future Predictions", color="red")
    plt.legend()
    plt.title("Crude Oil Price Forecast (ARIMA)")
    plt.xlabel("Date")
    plt.ylabel("Price")
    plt.show()
    mse_test = mean_squared_error(test_actual, test_predictions)
    rmse_test = np.sqrt(mse_test)
    mae_test = mean_absolute_error(test_actual, test_predictions)
    print("\nTest Data Evaluation:")
    print(f"MSE: {mse_test:.4f}, RMSE: {rmse_test:.4f}, MAE: {mae_test:.4f}")
    return future_predictions

file_path = "Crude oil.csv"
data = load_data(file_path)
model = train_evaluate_model(data)
future_predictions = predict_and_compare(model, data)

In [None]:
import numpy as np
import pandas as pd
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

def load_data(file_path):
    data = pd.read_csv(file_path, usecols=['Date', 'Close'])
    data['Date'] = pd.to_datetime(data['Date'])
    data.set_index('Date', inplace=True)
    return data['Close']

def train_evaluate_model(data, k=10, order=(1, 1, 1), seasonal_order=(1, 1, 1, 12)):
    rmse_scores, mse_scores, mae_scores = [], [], []
    kf = KFold(n_splits=k, shuffle=False)
    best_model, best_mse = None, float('inf')
    for fold, (train_idx, val_idx) in enumerate(kf.split(data)):
        print(f"Fold {fold+1}/{k}")
        train_data, val_data = data.iloc[train_idx], data.iloc[val_idx]
        model = SARIMAX(train_data, order=order, seasonal_order=seasonal_order)
        model_fit = model.fit(disp=False)
        val_predictions = model_fit.forecast(steps=len(val_data))
        mse = mean_squared_error(val_data, val_predictions)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(val_data, val_predictions)
        mse_scores.append(mse)
        rmse_scores.append(rmse)
        mae_scores.append(mae)
        print(f"Fold {fold+1} - MSE: {mse:.4f}, RMSE: {rmse:.4f}, MAE: {mae:.4f}")
        if mse < best_mse:
            best_mse = mse
            best_model = model_fit
    print("\nFinal Cross-Validation Results:")
    print(f"Average MSE: {np.mean(mse_scores):.4f}")
    print(f"Average RMSE: {np.mean(rmse_scores):.4f}")
    print(f"Average MAE: {np.mean(mae_scores):.4f}")
    return best_model

def predict_and_compare(model, data, prediction_steps=30):
    test_actual = data.tail(len(data) - len(model.resid)).values
    test_predictions = model.forecast(steps=len(test_actual))
    future_predictions = model.forecast(steps=prediction_steps)
    plt.figure(figsize=(14, 6))
    plt.plot(data.index[-len(test_actual):], test_actual, label="Actual Prices", color="blue")
    plt.plot(data.index[-len(test_actual):], test_predictions, label="Predicted Prices (Test Data)", color="green")
    future_dates = pd.date_range(start=data.index[-1], periods=prediction_steps + 1, freq='D')[1:]
    plt.plot(future_dates, future_predictions, label="Future Predictions", color="red")
    plt.legend()
    plt.title("Crude Oil Price Forecast (SARIMA)")
    plt.xlabel("Date")
    plt.ylabel("Price")
    plt.show()
    mse_test = mean_squared_error(test_actual, test_predictions)
    rmse_test = np.sqrt(mse_test)
    mae_test = mean_absolute_error(test_actual, test_predictions)
    print("\nTest Data Evaluation:")
    print(f"MSE: {mse_test:.4f}, RMSE: {rmse_test:.4f}, MAE: {mae_test:.4f}")
    return future_predictions

file_path = "Crude oil.csv"
data = load_data(file_path)
model = train_evaluate_model(data)
future_predictions = predict_and_compare(model, data)

In [None]:
import numpy as np
import pandas as pd
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

def load_data(file_path):
    data = pd.read_csv(file_path, usecols=['Date', 'Close'])
    data['Date'] = pd.to_datetime(data['Date'])
    data.set_index('Date', inplace=True)
    return data['Close'], pd.DataFrame(index=data.index)  # فرض می‌کنیم متغیر خارجی نداریم

def train_evaluate_model(data, exog, k=10, order=(1, 1, 1), seasonal_order=(1, 1, 1, 12)):
    rmse_scores, mse_scores, mae_scores = [], [], []
    kf = KFold(n_splits=k, shuffle=False)
    best_model, best_mse = None, float('inf')
    for fold, (train_idx, val_idx) in enumerate(kf.split(data)):
        print(f"Fold {fold+1}/{k}")
        train_data, val_data = data.iloc[train_idx], data.iloc[val_idx]
        train_exog, val_exog = exog.iloc[train_idx], exog.iloc[val_idx]
        model = SARIMAX(train_data, exog=train_exog, order=order, seasonal_order=seasonal_order)
        model_fit = model.fit(disp=False)
        val_predictions = model_fit.forecast(steps=len(val_data), exog=val_exog)
        mse = mean_squared_error(val_data, val_predictions)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(val_data, val_predictions)
        mse_scores.append(mse)
        rmse_scores.append(rmse)
        mae_scores.append(mae)
        print(f"Fold {fold+1} - MSE: {mse:.4f}, RMSE: {rmse:.4f}, MAE: {mae:.4f}")
        if mse < best_mse:
            best_mse = mse
            best_model = model_fit
    print("\nFinal Cross-Validation Results:")
    print(f"Average MSE: {np.mean(mse_scores):.4f}")
    print(f"Average RMSE: {np.mean(rmse_scores):.4f}")
    print(f"Average MAE: {np.mean(mae_scores):.4f}")
    return best_model

def predict_and_compare(model, data, exog, prediction_steps=30):
    test_actual = data.tail(len(data) - len(model.resid)).values
    test_predictions = model.forecast(steps=len(test_actual), exog=exog.tail(len(test_actual)))
    future_predictions = model.forecast(steps=prediction_steps)
    plt.figure(figsize=(14, 6))
    plt.plot(data.index[-len(test_actual):], test_actual, label="Actual Prices", color="blue")
    plt.plot(data.index[-len(test_actual):], test_predictions, label="Predicted Prices (Test Data)", color="green")
    future_dates = pd.date_range(start=data.index[-1], periods=prediction_steps + 1, freq='D')[1:]
    plt.plot(future_dates, future_predictions, label="Future Predictions", color="red")
    plt.legend()
    plt.title("Crude Oil Price Forecast (SARIMAX)")
    plt.xlabel("Date")
    plt.ylabel("Price")
    plt.show()
    mse_test = mean_squared_error(test_actual, test_predictions)
    rmse_test = np.sqrt(mse_test)
    mae_test = mean_absolute_error(test_actual, test_predictions)
    print("\nTest Data Evaluation:")
    print(f"MSE: {mse_test:.4f}, RMSE: {rmse_test:.4f}, MAE: {mae_test:.4f}")
    return future_predictions

file_path = "Crude oil.csv"
data, exog = load_data(file_path)
model = train_evaluate_model(data, exog)
future_predictions = predict_and_compare(model, data, exog)

In [None]:
import numpy as np
import pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

def load_data(file_path):
    data = pd.read_csv(file_path, usecols=['Date', 'Close'])
    data['Date'] = pd.to_datetime(data['Date'])
    data.set_index('Date', inplace=True)
    return data['Close']

def train_evaluate_model(data, k=10):
    rmse_scores, mse_scores, mae_scores = [], [], []
    kf = KFold(n_splits=k, shuffle=False)
    best_model, best_mse = None, float('inf')
    for fold, (train_idx, val_idx) in enumerate(kf.split(data)):
        print(f"Fold {fold+1}/{k}")
        train_data, val_data = data.iloc[train_idx], data.iloc[val_idx]
        model = ExponentialSmoothing(train_data, seasonal_periods=12, trend='add', seasonal='add')
        model_fit = model.fit()
        val_predictions = model_fit.forecast(len(val_data))
        mse = mean_squared_error(val_data, val_predictions)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(val_data, val_predictions)
        mse_scores.append(mse)
        rmse_scores.append(rmse)
        mae_scores.append(mae)
        print(f"Fold {fold+1} - MSE: {mse:.4f}, RMSE: {rmse:.4f}, MAE: {mae:.4f}")
        if mse < best_mse:
            best_mse = mse
            best_model = model_fit
    print("\nFinal Cross-Validation Results:")
    print(f"Average MSE: {np.mean(mse_scores):.4f}")
    print(f"Average RMSE: {np.mean(rmse_scores):.4f}")
    print(f"Average MAE: {np.mean(mae_scores):.4f}")
    return best_model

def predict_and_compare(model, data, prediction_steps=30):
    test_actual = data.tail(len(data) - len(model.resid)).values
    test_predictions = model.forecast(len(test_actual))
    future_predictions = model.forecast(prediction_steps)
    plt.figure(figsize=(14, 6))
    plt.plot(data.index[-len(test_actual):], test_actual, label="Actual Prices", color="blue")
    plt.plot(data.index[-len(test_actual):], test_predictions, label="Predicted Prices (Test Data)", color="green")
    future_dates = pd.date_range(start=data.index[-1], periods=prediction_steps + 1, freq='D')[1:]
    plt.plot(future_dates, future_predictions, label="Future Predictions", color="red")
    plt.legend()
    plt.title("Crude Oil Price Forecast (Holt-Winters)")
    plt.xlabel("Date")
    plt.ylabel("Price")
    plt.show()
    mse_test = mean_squared_error(test_actual, test_predictions)
    rmse_test = np.sqrt(mse_test)
    mae_test = mean_absolute_error(test_actual, test_predictions)
    print("\nTest Data Evaluation:")
    print(f"MSE: {mse_test:.4f}, RMSE: {rmse_test:.4f}, MAE: {mae_test:.4f}")
    return future_predictions

file_path = "Crude oil.csv"
data = load_data(file_path)
model = train_evaluate_model(data)
future_predictions = predict_and_compare(model, data)

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from prophet import Prophet
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm

# 📌 بارگذاری داده‌ها
file_path = "Crude oil.csv"
df = pd.read_csv(file_path, parse_dates=["Date"], index_col="Date")

# 📌 نرمال‌سازی داده‌ها
scaler = MinMaxScaler()
df["Close"] = scaler.fit_transform(df[["Close"]])

# 📌 تقسیم داده‌ها به داده‌های آموزشی و تست
train_data = df.iloc[:-365]
test_data = df.iloc[-365:]

# 📌 تعریف تابع نمایش نتایج
def plot_results(actual, predicted, title):
    plt.figure(figsize=(12, 6))
    plt.plot(actual.index, actual, label="Actual Prices", color="blue")
    plt.plot(test_data.index, predicted, label=f"Predicted Prices ({title})", color="red")
    plt.legend()
    plt.title(f"Crude Oil Price Prediction - {title}")
    plt.xlabel("Date")
    plt.ylabel("Price")
    plt.show()

# 📌 1️⃣ روش GRU
class GRUModel(nn.Module):
    def __init__(self, input_dim=1, hidden_dim=64, num_layers=2):
        super(GRUModel, self).__init__()
        self.gru = nn.GRU(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)

    def forward(self, x):
        out, _ = self.gru(x)
        return self.fc(out[:, -1, :])

def train_gru():
    model = GRUModel().cuda()
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    train_x = torch.tensor(train_data.values[:-1].reshape(-1, 1, 1), dtype=torch.float32).cuda()
    train_y = torch.tensor(train_data.values[1:].reshape(-1, 1), dtype=torch.float32).cuda()

    dataset = TensorDataset(train_x, train_y)
    loader = DataLoader(dataset, batch_size=32, shuffle=True)

    for epoch in range(10):
        for x_batch, y_batch in loader:
            optimizer.zero_grad()
            y_pred = model(x_batch)
            loss = criterion(y_pred, y_batch)
            loss.backward()
            optimizer.step()

    model.eval()
    predictions = []
    last_x = torch.tensor(train_data.values[-1].reshape(1, 1, 1), dtype=torch.float32).cuda()

    for _ in range(365):
        with torch.no_grad():
            y_pred = model(last_x).cpu().numpy()
        predictions.append(y_pred[0, 0])
        last_x = torch.tensor(y_pred.reshape(1, 1, 1), dtype=torch.float32).cuda()

    return scaler.inverse_transform(np.array(predictions).reshape(-1, 1))

# 📌 2️⃣ روش LSTM
class LSTMModel(nn.Module):
    def __init__(self, input_dim=1, hidden_dim=64, num_layers=2):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)

    def forward(self, x):
        out, _ = self.lstm(x)
        return self.fc(out[:, -1, :])

def train_lstm():
    model = LSTMModel().cuda()
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    train_x = torch.tensor(train_data.values[:-1].reshape(-1, 1, 1), dtype=torch.float32).cuda()
    train_y = torch.tensor(train_data.values[1:].reshape(-1, 1), dtype=torch.float32).cuda()

    dataset = TensorDataset(train_x, train_y)
    loader = DataLoader(dataset, batch_size=32, shuffle=True)

    for epoch in range(10):
        for x_batch, y_batch in loader:
            optimizer.zero_grad()
            y_pred = model(x_batch)
            loss = criterion(y_pred, y_batch)
            loss.backward()
            optimizer.step()

    model.eval()
    predictions = []
    last_x = torch.tensor(train_data.values[-1].reshape(1, 1, 1), dtype=torch.float32).cuda()

    for _ in range(365):
        with torch.no_grad():
            y_pred = model(last_x).cpu().numpy()
        predictions.append(y_pred[0, 0])
        last_x = torch.tensor(y_pred.reshape(1, 1, 1), dtype=torch.float32).cuda()

    return scaler.inverse_transform(np.array(predictions).reshape(-1, 1))

# 📌 3️⃣ روش Prophet
def train_prophet():
    df_prophet = df.reset_index()[["Date", "Close"]].rename(columns={"Date": "ds", "Close": "y"})
    model = Prophet()
    model.fit(df_prophet)
    future = model.make_future_dataframe(periods=365)
    forecast = model.predict(future)
    return forecast["yhat"].values[-365:]

# 📌 4️⃣ روش ARIMA
def train_arima():
    model = ARIMA(train_data, order=(5,1,0))
    fitted_model = model.fit()
    forecast = fitted_model.forecast(steps=365)
    return forecast

# 📌 5️⃣ روش SARIMA
def train_sarima():
    model = SARIMAX(train_data, order=(2,1,2), seasonal_order=(1,1,1,12))
    fitted_model = model.fit()
    forecast = fitted_model.forecast(steps=365)
    return forecast

# 📌 6️⃣ روش Holt-Winters
def train_holt_winters():
    model = ExponentialSmoothing(train_data, trend="add", seasonal="add", seasonal_periods=12)
    fitted_model = model.fit()
    forecast = fitted_model.forecast(steps=365)
    return forecast



In [None]:
# 📌 اجرای مدل‌ها و نمایش نتایج
methods = {
    "GRU": train_gru,
    "LSTM": train_lstm,
    "Prophet": train_prophet,
    "ARIMA": train_arima,
    "SARIMA": train_sarima,
    "Holt-Winters": train_holt_winters
}

for name, method in methods.items():
    print(f"\n🔹 Running {name} Model...")
    predictions = method()
    plot_results(test_data["Close"], predictions, name)
