In [2]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
import csv
import re
import seaborn as sns

# Setting the seed and making PyTorch deterministic.
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

def mean_absolute_percentage_error(y_true, y_pred): 
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

def create_dataset(dataset, look_back=1, time_steps=1):
    X, Y = [], []
    for i in range(len(dataset) - time_steps - look_back + 1):
        a = dataset[i:(i + time_steps * look_back), 0].reshape(time_steps, look_back)
        X.append(a)
        Y.append(dataset[i + time_steps * look_back - 1, 0])
    return np.array(X), np.array(Y)

def extract_lat_lon(filename):
    lat_str = filename.split("lat_")[1].split("_")[0]
    lon_str = filename.split("lon_")[1].split(".")[0]
    lat = float(lat_str)
    lon = float(lon_str)
    return lat, lon

def extract_date_from_filename(filename):
    date_match = re.search(r"(\d+)d", filename)
    if date_match:
        return date_match.group(1)
    return None

folder_path = 'csv-bias'

with open(f"Statistics for Taiwania 1  Pytorch Main Areas.csv", "w", newline="") as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow([f'Statistics for Taiwania'])
    
    for file_name in os.listdir(folder_path):
        if file_name.endswith(".csv"): 
            file_path = os.path.join(folder_path, file_name)
            print(f'Processing file: {file_path}')
            if 'dbiastg' in file_name:
                tag = 'dbiastg'
            else:
                tag = 'biastg'

            writer.writerow([file_name])           
            date = extract_date_from_filename(file_name)
            if date:
                writer.writerow(['date', date,tag])

            data = pd.read_csv(file_path, index_col='time', parse_dates=True)
            data = data[tag]

            if data.notnull().any():  
                data = data.asfreq('B').fillna(method='ffill') 
                data = data.dropna()
                scaler = MinMaxScaler(feature_range=(0, 1))
                data = scaler.fit_transform(data.values.reshape(-1, 1))

                train_size = int(len(data) * 0.8)
                test_size = len(data) - train_size
                train_data, test_data = data[0:train_size,:], data[train_size:len(data),:]

                look_back = 1
                time_steps = 3
                X_train, Y_train = create_dataset(train_data, look_back, time_steps)
                X_test, Y_test = create_dataset(test_data, look_back, time_steps)

                X_train = torch.tensor(X_train, dtype=torch.float32)
                Y_train = torch.tensor(Y_train, dtype=torch.float32)
                X_test = torch.tensor(X_test, dtype=torch.float32)
                Y_test = torch.tensor(Y_test, dtype=torch.float32)
                
            train_dataset = TensorDataset(X_train, Y_train)
            test_dataset = TensorDataset(X_test, Y_test)
            train_dataloader = DataLoader(train_dataset, batch_size=1, shuffle=False)
            test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False)

            class BiLSTM(nn.Module):
                def __init__(self, input_size, hidden_size, num_layers):
                    super(BiLSTM, self).__init__()
                    self.hidden_size = hidden_size
                    self.num_layers = num_layers
                    self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
                    self.fc = nn.Linear(hidden_size * 2, 1)

                def forward(self, x):
                    h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device)
                    c0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device)

                    out, _ = self.lstm(x, (h0, c0))
                    out = self.fc(out[:, -1, :])
                    return out.squeeze()

            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            model = BiLSTM(input_size=look_back, hidden_size=4, num_layers=3).to(device)
            criterion = nn.MSELoss()
            optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

            num_epochs = 100
            model.train()
            for epoch in range(num_epochs):
                for inputs, targets in train_dataloader:
                    inputs = inputs.to(device)
                    targets = targets.to(device)

                    optimizer.zero_grad()
                    outputs = model(inputs)
                    loss = criterion(outputs, targets.squeeze())
                    loss.backward()
                    optimizer.step()
            
            torch.save(model.state_dict(), f'{file_name}_model.pth')
            model.eval()
            train_predict, test_predict = [], []
            with torch.no_grad():
                for inputs, targets in train_dataloader:
                    inputs = inputs.to(device)
                    outputs = model(inputs)
                    train_predict.append(outputs.detach().cpu().numpy().reshape(-1, 1))

                for inputs, targets in test_dataloader:
                    inputs = inputs.to(device)
                    outputs = model(inputs)
                    test_predict.append(outputs.detach().cpu().numpy().reshape(-1, 1))

            train_predict = np.concatenate(train_predict, axis=0)
            test_predict = np.concatenate(test_predict, axis=0)

            train_predict = scaler.inverse_transform(train_predict)
            Y_train = scaler.inverse_transform(Y_train.numpy().reshape(-1, 1))
            test_predict = scaler.inverse_transform(test_predict)
            Y_test = scaler.inverse_transform(Y_test.numpy().reshape(-1, 1))

            mae = mean_absolute_error(Y_test, test_predict)
            mse = mean_squared_error(Y_test, test_predict)
            rmse = np.sqrt(mse)
            mbe = np.mean(Y_test - test_predict)
            mape = mean_absolute_percentage_error(Y_test, test_predict)

            print(f"RMSE: {rmse:.2f}")
            writer.writerow(['RMSE', rmse])
            print(f"MAE: {mae:.2f}")
            writer.writerow(['MAE', mae])
            print(f"MBE: {mbe:.2f}")
            writer.writerow(['MBE', mbe])
            print(f"MAPE: {mape:.2f}%")
            writer.writerow(['MAPE', mape])

Processing file: csv-bias\biastg_lat_0.0_lon_180.0.csv
RMSE: 0.03
MAE: 0.02
MBE: 0.01
MAPE: 14.41%
Processing file: csv-bias\dbiastg_lat_0.0_lon_180.0.csv
RMSE: 0.00
MAE: 0.00
MBE: -0.00
MAPE: 1.85%
