In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

from sklearn.preprocessing import MinMaxScaler, RobustScaler, OneHotEncoder
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import LabelEncoder, MinMaxScaler
from pycaret.regression import *
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.metrics import mean_absolute_error,mean_squared_error,r2_score
from scipy.stats import randint as sp_randint

In [None]:
data = pd.read_csv('/Users/gim-yeon-u/Desktop/SejongUniv/2024-1/창의학기제2/flights_yeon2.csv')

In [None]:
data

In [None]:
data = data.loc[:, ~data.columns.str.contains('^Unnamed')]
data

In [None]:
data = data.dropna(subset=['Price'])
data

In [None]:
data = pd.DataFrame(data)
data

In [None]:
target = 'Price'

X_rest = data.drop(['Airline', 'Price'], axis=1).values
y = data[target].values.reshape(-1, 1)

label_encoder = LabelEncoder()
airlines_encoded = label_encoder.fit_transform(data['Airline'])
airlines_encoded = airlines_encoded.reshape(-1, 1)

X = np.concatenate([X_rest, airlines_encoded], axis=1)

In [None]:
y

In [None]:
X

In [None]:

scaler_X = RobustScaler()
scaler_y = RobustScaler()
X_scaled = scaler_X.fit_transform(X)
y_scaled = scaler_y.fit_transform(y)

X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_scaled, test_size=0.2, random_state=42)

X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.FloatTensor(y_train)
X_test_tensor = torch.FloatTensor(X_test)
y_test_tensor = torch.FloatTensor(y_test)

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataset = TensorDataset(X_test_tensor,y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=10, shuffle=False)


In [None]:
device = torch.device("mps")
print(device)

In [None]:
class TransformerModel(nn.Module):
    def __init__(self, input_dim, model_dim, num_heads, num_encoder_layers, output_dim):
        super(TransformerModel, self).__init__()
        self.input_linear = nn.Linear(input_dim, model_dim)
        self.transformer = nn.Transformer(
            d_model=model_dim,
            nhead=num_heads,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=0
        )
        self.output_linear = nn.Linear(model_dim, output_dim)

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.input_linear(x)
        x = x.permute(1, 0, 2)
        output = self.transformer.encoder(x)
        output = self.output_linear(output.permute(1, 0, 2).squeeze(1))
        return output

model = TransformerModel(input_dim=X_train_tensor.shape[1], model_dim=128, num_heads=2, num_encoder_layers=2, output_dim=1).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.SmoothL1Loss()

num_epochs = 50
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for batch_x, batch_y in train_loader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)
        
        optimizer.zero_grad()
        predictions = model(batch_x)
        loss = criterion(predictions, batch_y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    
    avg_loss = total_loss / len(train_loader)
    
    model.eval()
    with torch.no_grad():
        val_predictions = model(X_test_tensor.to(device))
        val_loss = criterion(val_predictions, y_test_tensor.to(device))

    print(f'Epoch {epoch+1}, Training Loss: {avg_loss:.4f}, Validation Loss: {val_loss.item():.4f}')

In [None]:
model.eval()
with torch.no_grad():
    predictions = model(X_test_tensor.to(device)).cpu().numpy()
    y_test_np = y_test_tensor.cpu().numpy()

mse = mean_squared_error(y_test_np, predictions)
rmse = np.sqrt(mse)
print(f"Test MSE: {mse:.4f}, RMSE: {rmse:.4f}")

In [None]:
def calculate_mape(y_true, y_pred):
    y_true = y_true + np.finfo(float).eps
    mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
    return mape

model.eval()

with torch.no_grad():
    predictions = model(X_test_tensor.to(device)).cpu().numpy()
    y_test_np = y_test_tensor.cpu().numpy()

mape = calculate_mape(y_test_np, predictions)
print(f"Mean Absolute Percentage Error (MAPE): {mape:.2f}%")

In [None]:
import shap

model.eval()
background_data = X_train_tensor[:300].to(device)
test_data = X_test_tensor[:150].to(device)
test_labels = y_test_tensor[:150].to(device)
explainer = shap.GradientExplainer(model, background_data)
shap_values = explainer.shap_values(test_data)
shap_values = np.squeeze(np.array(shap_values), axis=-1)
shap_values = shap_values.mean(axis=1)
test_numpy = test_data.mean(dim=1).cpu().numpy()

feature_names = [
    'SearchYear', 'SearchMonth', 'SearchDay', 'FlightYear', 'FlightMonth',
    'FlightDay', 'IsFrom', 'Day_left', 'DepartureTime', 'ArrivalTime',
    'AirborneTime', 'Airline_encoded'
]

print(f"Adjusted SHAP values shape: {shap_values.shape}")
print(f"Adjusted test data shape: {test_numpy.shape}")
print(f"Feature names count: {len(feature_names)}")

shap.summary_plot(shap_values, features=test_numpy, feature_names=feature_names)
shap.summary_plot(shap_values, features=test_numpy, feature_names=feature_names,plot_type = 'bar')
