# Improved Analysis for Better R² Score

In [2]:
import os
import sys
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torch.nn.functional as F
from torch.optim.lr_scheduler import ReduceLROnPlateau, CosineAnnealingLR
from sklearn.feature_selection import VarianceThreshold, SelectKBest, f_regression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
from sklearn.ensemble import RandomForestRegressor
import xlsxwriter

if 'xlsxwriter' not in sys.modules:
    !pip install xlsxwriter

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def resolve_path_gdrive(relativePath):
    if os.path.exists('/content/drive'):
        return '/content/drive/MyDrive/work/gdrive-workspaces/git/nn_catalyst/' + relativePath
    else:
        from utils import get_project_root
        return get_project_root() + "/" + relativePath

# Load the data
descriptors_path = 'descriptors.csv'
targets_path = 'compiled_data.csv'

descriptors_df = pd.read_csv(resolve_path_gdrive(descriptors_path))
targets_df = pd.read_csv(resolve_path_gdrive(targets_path))

# Select target column (dipole_n)
selected_cols = 5
targets_df = targets_df.iloc[:, [0, selected_cols]]

ModuleNotFoundError: No module named 'utils'

In [None]:
# Keep only numeric columns
descriptors_numeric = descriptors_df.select_dtypes(include=['number'])
targets_numeric = targets_df.select_dtypes(include=['number'])

# Merge the numeric dataframes on the common label column
numeric_data = pd.merge(descriptors_numeric, targets_numeric, left_on='Label', right_on='mol_num')
numeric_data = numeric_data.drop(columns=['Label', 'mol_num'])

# Separate features and targets
X = numeric_data.iloc[:, :-1]
y = numeric_data.iloc[:, -1]

# Apply variance threshold
selector = VarianceThreshold(threshold=0.01)
X_high_variance = selector.fit_transform(X)

# Apply feature selection
k_best = 500  # You can adjust this value
selector = SelectKBest(score_func=f_regression, k=k_best)
X_selected = selector.fit_transform(X_high_variance, y)

# Convert to numpy arrays
X = X_selected
y = y.values.reshape(-1, 1)

# Split the data into training, validation, and test sets
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.2, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# Standardize the data
scaler_X = StandardScaler().fit(X_train)
scaler_y = StandardScaler().fit(y_train)

X_train = scaler_X.transform(X_train)
X_val = scaler_X.transform(X_val)
X_test = scaler_X.transform(X_test)

y_train = scaler_y.transform(y_train)
y_val = scaler_y.transform(y_val)
y_test = scaler_y.transform(y_test)

# Convert the data to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32, device=device)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32, device=device)
X_val_tensor = torch.tensor(X_val, dtype=torch.float32, device=device)
y_val_tensor = torch.tensor(y_val, dtype=torch.float32, device=device)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32, device=device)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32, device=device)

# Create DataLoader for batch processing
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [None]:
class ImprovedRegressionNetwork(nn.Module):
    def __init__(self, input_dim, hidden_dim1, hidden_dim2, hidden_dim3, output_dim, dropout_rate=0.3):
        super(ImprovedRegressionNetwork, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim1)
        self.bn1 = nn.BatchNorm1d(hidden_dim1)
        self.fc2 = nn.Linear(hidden_dim1, hidden_dim2)
        self.bn2 = nn.BatchNorm1d(hidden_dim2)
        self.fc3 = nn.Linear(hidden_dim2, hidden_dim3)
        self.bn3 = nn.BatchNorm1d(hidden_dim3)
        self.fc4 = nn.Linear(hidden_dim3, output_dim)
        self.dropout = nn.Dropout(dropout_rate)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.bn1(self.fc1(x)))
        x = self.dropout(x)
        x = self.relu(self.bn2(self.fc2(x)))
        x = self.dropout(x)
        x = self.relu(self.bn3(self.fc3(x)))
        x = self.dropout(x)
        x = self.fc4(x)
        return x

def train_and_evaluate():
    model = ImprovedRegressionNetwork(X_train.shape[1], 1024, 512, 256, 1).to(device)
    criterion = nn.MSELoss()
    optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-5)
    scheduler = CosineAnnealingLR(optimizer, T_max=100, eta_min=1e-6)

    best_val_loss = float('inf')
    patience = 20
    patience_counter = 0
    num_epochs = 300

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for inputs, targets in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs).squeeze()
            loss = criterion(outputs, targets.squeeze())
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        scheduler.step()
        
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for inputs, targets in val_loader:
                outputs = model(inputs).squeeze()
                loss = criterion(outputs, targets.squeeze())
                val_loss += loss.item()
        val_loss /= len(val_loader)
        
        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {running_loss/len(train_loader):.4f}, Val Loss: {val_loss:.4f}')
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_model.pth')
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print('Early stopping triggered')
                break

    model.load_state_dict(torch.load('best_model.pth'))
    return model

# Train the model
model = train_and_evaluate()

# Evaluate the model
model.eval()
predictions = []
actuals = []
with torch.no_grad():
    for inputs, targets in test_loader:
        outputs = model(inputs).squeeze()
        predictions.extend(outputs.cpu().numpy())
        actuals.extend(targets.squeeze().cpu().numpy())

predictions = scaler_y.inverse_transform(np.array(predictions).reshape(-1, 1)).flatten()
actuals = scaler_y.inverse_transform(np.array(actuals).reshape(-1, 1)).flatten()

r2 = r2_score(actuals, predictions)
rmse = np.sqrt(mean_squared_error(actuals, predictions))
mae = mean_absolute_error(actuals, predictions)

print(f'R² Score: {r2:.4f}')
print(f'RMSE: {rmse:.4f}')
print(f'MAE: {mae:.4f}')

# Create Excel file with results
output_path = 'improved_model_predictions.xlsx'
writer = pd.ExcelWriter(output_path, engine='xlsxwriter')
workbook = writer.book
worksheet = workbook.add_worksheet('Results')

results_df = pd.DataFrame({'Actual': actuals, 'Predicted': predictions})
results_df.to_excel(writer, sheet_name='Results', index=False)

chart = workbook.add_chart({'type': 'scatter'})
chart.add_series({
    'name': 'Predictions vs Actuals',
    'categories': ['Results', 1, 0, len(actuals), 0],
    'values': ['Results', 1, 1, len(predictions), 1],
    'marker': {'type': 'circle', 'size': 7},
})

chart.set_title({'name': 'Predictions vs Actuals'})
chart.set_x_axis({'name': 'Actual Values'})
chart.set_y_axis({'name': 'Predicted Values'})
worksheet.insert_chart('E2', chart)

worksheet.write('A1', 'Actual')
worksheet.write('B1', 'Predicted')
worksheet.write('D1', 'Metrics')
worksheet.write('D2', 'R²')
worksheet.write('D3', 'RMSE')
worksheet.write('D4', 'MAE')
worksheet.write('E2', r2)
worksheet.write('E3', rmse)
worksheet.write('E4', mae)

writer.close()

print(f'Results saved to {output_path}')