# Neural Network 

Fingerprint: Coulomb

In [3]:
from Coulomb import *
from sklearn.model_selection import train_test_split
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import KFold  


ModuleNotFoundError: No module named 'Coulomb'

In [2]:
X_train,X_test,y_train,y_test = train_test_split(X,y,test_size=0.2,random_state=251)


In [3]:
# Standardize the features
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Normalize the target (hform)
target_scaler = MinMaxScaler()  # You can use StandardScaler if needed
y_train = target_scaler.fit_transform(y_train.reshape(-1, 1) if isinstance(y_train, np.ndarray) else y_train.to_numpy().reshape(-1, 1))
y_test = target_scaler.transform(y_test.reshape(-1, 1) if isinstance(y_test, np.ndarray) else y_test.to_numpy().reshape(-1, 1))

# Convert to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)



In [4]:
# Define the neural network
class RegressionNN(nn.Module):
    def __init__(self, input_dim):
        super(RegressionNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 256)  # Increased neurons
        self.bn1 = nn.BatchNorm1d(256)  # Batch normalization
        self.fc2 = nn.Linear(256, 128)
        self.bn2 = nn.BatchNorm1d(128)
        self.fc3 = nn.Linear(128, 64)
        self.bn3 = nn.BatchNorm1d(64)
        self.fc4 = nn.Linear(64, 1)
        self.dropout = nn.Dropout(p=0.2)  # Dropout to reduce overfitting

    def forward(self, x):
        x = F.leaky_relu(self.bn1(self.fc1(x)))  # LeakyReLU activation
        x = self.dropout(x)
        x = F.leaky_relu(self.bn2(self.fc2(x)))
        x = self.dropout(x)
        x = F.leaky_relu(self.bn3(self.fc3(x)))
        x = self.dropout(x)
        x = self.fc4(x)
        return x



In [5]:

# Define cross-validation training loop with train_test_split
def cross_val_train(model_class, X_train, y_train, epochs, k_folds):
    kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)
    fold_results = []
    best_val_loss = float('inf')
    best_model_state = None

    for fold, (train_idx, val_idx) in enumerate(kfold.split(X_train)):
        print(f"\nFold {fold + 1}/{k_folds}")

        # Use train_test_split to split the fold's training data
        X_fold_train, X_val, y_fold_train, y_val = train_test_split(
            X_train[train_idx], y_train[train_idx], test_size=0.2, random_state=42
        )

        # Initialize model, optimizer, scheduler
        model = model_class(X_fold_train.shape[1])
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)
        criterion = nn.MSELoss()

        for epoch in range(epochs):
            # Training phase
            model.train()
            optimizer.zero_grad()
            outputs = model(X_fold_train)
            loss = criterion(outputs, y_fold_train)
            loss.backward()
            optimizer.step()

            # Step the learning rate scheduler
            scheduler.step()

            # Evaluation phase
            model.eval()
            with torch.no_grad():
                val_outputs = model(X_val)
                val_loss = criterion(val_outputs, y_val)

            # Convert MSE to RMSE for better interpretability
            rmse = torch.sqrt(loss).item()
            val_rmse = torch.sqrt(val_loss).item()

            # Print RMSE every 50 epochs
            if (epoch + 1) % 500 == 0:
                print(f"Epoch [{epoch + 1}/{epochs}], RMSE: {rmse:.4f}, Val RMSE: {val_rmse:.4f}")

        # Store final validation loss for the fold
        fold_results.append(val_loss.item())

        # Save the model state if it's the best so far
        if val_loss.item() < best_val_loss:
            best_val_loss = val_loss.item()
            best_model_state = model.state_dict()

    # Print overall results
    print("\nCross-Validation Results:")
    print(f"Fold Losses: {fold_results}")
    print(f"Mean Validation Loss: {np.mean(fold_results):.4f}")
    print(f"Standard Deviation: {np.std(fold_results):.4f}")

    # Save the best model state
    torch.save(best_model_state, "best_model.pth")
    print("Best model saved as 'best_model.pth'.")

In [6]:
# Initialize loss function
criterion = nn.MSELoss()

# Perform cross-validation
cross_val_train(RegressionNN, X_train, y_train, epochs=1000, k_folds=5)

# Save the model
# Example usage: torch.save(model.state_dict(), "enhanced_regression_model.pth")





Fold 1/5
Epoch [500/1000], RMSE: 0.1842, Val RMSE: 0.1154
Epoch [1000/1000], RMSE: 0.1805, Val RMSE: 0.1152

Fold 2/5
Epoch [500/1000], RMSE: 0.1616, Val RMSE: 0.1088
Epoch [1000/1000], RMSE: 0.1570, Val RMSE: 0.1087

Fold 3/5
Epoch [500/1000], RMSE: 0.1830, Val RMSE: 0.1162
Epoch [1000/1000], RMSE: 0.1797, Val RMSE: 0.1162

Fold 4/5
Epoch [500/1000], RMSE: 0.1572, Val RMSE: 0.1166
Epoch [1000/1000], RMSE: 0.1578, Val RMSE: 0.1165

Fold 5/5
Epoch [500/1000], RMSE: 0.1575, Val RMSE: 0.1179
Epoch [1000/1000], RMSE: 0.1573, Val RMSE: 0.1176

Cross-Validation Results:
Fold Losses: [0.013268917798995972, 0.011825833469629288, 0.013494801707565784, 0.013582686893641949, 0.013836064375936985]
Mean Validation Loss: 0.0132
Standard Deviation: 0.0007
Best model saved as 'best_model.pth'.


In [7]:
# Load the best model and train it on the full training set
def train_on_full_data(model_class, X_train, y_train, X_test, y_test, criterion, epochs=100):
    model = model_class(X_train.shape[1])
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)

    # Load the best model state
    model.load_state_dict(torch.load("best_model.pth"))

    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        outputs = model(X_train)
        loss = criterion(outputs, y_train)
        loss.backward()
        optimizer.step()

        scheduler.step()

        if (epoch + 1) % 100 == 0:
            rmse = torch.sqrt(loss).item()
            print(f"Epoch [{epoch + 1}/{epochs}], RMSE: {rmse:.4f}")

    print("Training on full dataset completed.")

    # Evaluate on test set
    model.eval()
    with torch.no_grad():
        test_outputs = model(X_test)
        test_loss = criterion(test_outputs, y_test)
        test_rmse = torch.sqrt(test_loss).item()
        print(f"\nTest RMSE: {test_rmse:.4f}")

    # Save the final model
    torch.save(model.state_dict(), "final_model.pth")
    print("Final model saved as 'final_model.pth'.")

# Train the best model on the full training set and evaluate on test set
train_on_full_data(RegressionNN, X_train, y_train, X_test, y_test, criterion, epochs=1000)

  model.load_state_dict(torch.load("best_model.pth"))


Epoch [100/1000], RMSE: 0.1154
Epoch [200/1000], RMSE: 0.1105
Epoch [300/1000], RMSE: 0.1091
Epoch [400/1000], RMSE: 0.1096
Epoch [500/1000], RMSE: 0.1085
Epoch [600/1000], RMSE: 0.1095
Epoch [700/1000], RMSE: 0.1089
Epoch [800/1000], RMSE: 0.1094
Epoch [900/1000], RMSE: 0.1094
Epoch [1000/1000], RMSE: 0.1087
Training on full dataset completed.

Test RMSE: 0.1088
Final model saved as 'final_model.pth'.


# Prepare submission to Kaggle

In [8]:
cmats_test = cmats_train
cmats_test = np.zeros((len(test),max_number_of_atoms**2))
for i,atoms in enumerate(test.atoms):
    if i%1000 == 0:
        print(i)
    cmats_test[i,:] = cm.create(atoms)
print(len(cmats_test))

0
1000
2000
3000
4000


In [12]:
X_test_kaggle = pd.DataFrame(data=cmats_test, index=test.id)
X_test_kaggle_scaled = scaler.transform(X_test_kaggle)
X_test_kaggle_scaled.shape

(4000, 400)

In [13]:
# Prepare Kaggle submission
def prepare_kaggle_submission(model, X_test_scaled, test_ids, target_scaler, output_file):
    # Convert standardized X_test_kaggle to PyTorch tensor
    X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)

    # Predict with the trained model
    model.eval()
    with torch.no_grad():
        y_pred_scaled = model(X_test_tensor).numpy()

    # Descend the scaled predictions to the original scale
    y_pred_original = target_scaler.inverse_transform(y_pred_scaled)

    # Create the submission DataFrame
    submission_df = pd.DataFrame({
        "id": test_ids,
        "hform": y_pred_original.flatten()  # Ensure hform is a 1D array
    })

    # Save to CSV
    submission_df.to_csv(output_file, index=False)
    print(f"Submission file saved as: {output_file}")

In [14]:
# File path for submission
output_file = "submission_test.csv"

# Ensure the final model is loaded
final_model = RegressionNN(X_train.shape[1])
final_model.load_state_dict(torch.load("final_model.pth"))

# Prepare the Kaggle submission
prepare_kaggle_submission(final_model, X_test_kaggle_scaled, test.id, target_scaler, output_file)

Submission file saved as: submission_test.csv


  final_model.load_state_dict(torch.load("final_model.pth"))


In [16]:
# Evaluate submission preparation on known test data
def evaluate_submission_pipeline(model, X_test, y_test, target_scaler):
    # Convert PyTorch tensor back to numpy for scaling compatibility
    X_test_numpy = X_test.numpy()
    
    # Predict using the trained model
    model.eval()
    with torch.no_grad():
        y_pred_scaled = model(X_test).numpy()

    # Descend predictions back to original scale
    y_pred_original = target_scaler.inverse_transform(y_pred_scaled)
    y_test_original = target_scaler.inverse_transform(y_test.numpy())

    # Calculate RMSE on original scale
    rmse = np.sqrt(np.mean((y_pred_original - y_test_original) ** 2))
    print(f"RMSE on original scale: {rmse:.4f}")

    # Display some predictions vs. actuals
    comparison_df = pd.DataFrame({
        "Actual": y_test_original.flatten(),
        "Predicted": y_pred_original.flatten()
    }).head(10)
    print("\nSample Predictions vs Actuals:")
    print(comparison_df)

    return comparison_df, rmse

# Ensure the final model is loaded
final_model = RegressionNN(X_train.shape[1])
final_model.load_state_dict(torch.load("final_model.pth"))

# Evaluate submission pipeline on known data
comparison_df, rmse = evaluate_submission_pipeline(final_model, X_test, y_test, target_scaler)


RMSE on original scale: 0.5914

Sample Predictions vs Actuals:
     Actual  Predicted
0 -0.033324  -0.432615
1 -1.398727  -1.353785
2 -1.187238  -1.172646
3 -0.948828  -0.948329
4  0.066191  -0.031278
5 -1.202062  -1.796983
6 -0.741397  -0.465481
7 -3.545803  -1.047422
8 -0.285965  -0.537511
9 -1.561886  -1.168741


  final_model.load_state_dict(torch.load("final_model.pth"))


[0.11333410441875458]
[0.11041968315839767]