# Asteroid Class Prediction Leveraging Parallelization Techniques 

<div style="text-align: right">  
<strong>Uday Kiran Dasari</strong> 
</div>
<br/>
<center>
<img src="Asteroid.jpg" width=700 />
</center>
<br/>

**Link to Dataset used**: [**Asteroid Dataset**](https://www.kaggle.com/datasets/sakhawat18/asteroid-dataset/data)

### Dataset Description

- **SPK-ID:** Object primary SPK-ID
- **Object ID:** Object internal database ID
- **Object fullname:** Object full name/designation
- **pdes:** Object primary designation
- **name:** Object IAU name
- **NEO:** Near-Earth Object (NEO) flag
- **PHA:** Potentially Hazardous Asteroid (PHA) flag
- **H:** Absolute magnitude parameter
- **Diameter:** Object diameter (from equivalent sphere) km Unit
- **Albedo:** Geometric albedo
- **Diameter_sigma:** 1-sigma uncertainty in object diameter km Unit
- **Orbit_id:** Orbit solution ID
- **Epoch:** Epoch of osculation in modified Julian day form
- **Equinox:** Equinox of reference frame
- **e:** Eccentricity
- **a:** Semi-major axis au Unit
- **q:** Perihelion distance au Unit
- **i:** Inclination; angle with respect to x-y ecliptic plane
- **tp:** Time of perihelion passage TDB Unit
- **moid_ld:** Earth Minimum Orbit Intersection Distance au Unit
-y ecliptic plane

In [1]:
# Basic imports
import os
import time
import joblib
import numpy as np
import pandas as pd
from tqdm import tqdm    
    
# Visualization imports
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
import multiprocessing
    
# Scikit-learn imports
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder
    
# Dask-related imports
import dask
import dask.array as da
import dask.dataframe as dd
from dask_ml.impute import SimpleImputer
from dask.distributed import config
from dask_ml.model_selection import train_test_split
    
# Set the daemon configuration for Dask workers
dask.config.set({'distributed.worker.daemon': False})
    
# PyTorch imports
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, TensorDataset
import torch.distributed as dist
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.multiprocessing as mp

In [2]:
def dataloading(path='./dataset.csv'):
    astro_ds = dd.read_csv(path)
    # Dropping irrelavant columns early in the process
    columns_to_drop = ['id', 'full_name', 'pdes', 'name', 'prefix', 'neo', 'pha', 'orbit_id', 'equinox']
    astro_ds = astro_ds.drop(columns_to_drop, axis=1)
    print("Data Loaded and dropped irrelavant columns!!")
    return astro_ds

def nullvalue_handling(astro_ds):
    # Prepare features for imputation (excluding the target 'class')
    X = astro_ds.drop(['class'], axis=1)
    y = astro_ds['class']
    # Applying SimpleImputer from Dask-ML to handle missing values in parallel
    imputer = SimpleImputer(strategy='median')

    # Since Dask works with lazy evaluation, use compute() to perform the computation
    X_imputed = imputer.fit_transform(X)
    # Ensure that imputation and other transformations are computed efficiently

    with tqdm(total=2, desc="Computing DataFrames") as pbar:
        X_imputed = X_imputed.compute()
        pbar.update(1)  # Update progress after computing X_imputed
        y = y.compute()
        pbar.update(1)  # Update progress after computing y
    print("Null Value Handling Done!")
    return X_imputed,y

def feature_importance(X_imputed,y):
    # Split data into training and test sets
    X_train, X_test, y_train, y_test = train_test_split(X_imputed, y, test_size=0.2,shuffle =True ,random_state=42)
    # Initialize the ExtraTreesClassifier
    # We can adjust n_estimators, max_depth, and other parameters as needed
    etc = ExtraTreesClassifier(n_estimators=100, random_state=42, n_jobs=-1)
    
    # To leverage parallel computation with multiprocessing when performing model fitting
    with joblib.parallel_backend('multiprocessing'):
        #Fit the model
        etc.fit(X_train, y_train)
    # Compute and print feature importances
    feature_importances = etc.feature_importances_
    
    threshold = np.mean(feature_importances)  # Define your threshold here
    
    # Selecting features with importance greater than the threshold
    selected_features = [feature for feature, importance in zip(X_train.columns, feature_importances) if importance > threshold]
    
    # Include 'class' in selected features
    selected_features.append('class')
    
    # Filter the original Dask DataFrame to include only selected features
    astro_ds_filtered = astro_ds[selected_features]
    print("Feature Extraction done!")
    return astro_ds_filtered


def dataprep(X_train,X_test,y_train,y_test):
    # Scaling the data using Dask-ML's StandardScaler
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    # Encoding the target label using LabelEncoder
    encoder = LabelEncoder()
    y_train_encoded = encoder.fit_transform(y_train)
    y_test_encoded = encoder.transform(y_test)
    print("Scaling and Encoding Done!")
    return X_train_scaled,X_test_scaled,y_train_encoded,y_test_encoded

def tensordataset(X_train, y_train, X_test, y_test):
    # Convert test and train sets to tensors
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)

    y_train_tensor = torch.tensor(y_train, dtype=torch.long)
    y_test_tensor = torch.tensor(y_test, dtype=torch.long)
    
    #Generation of Train and Test Tensor dataset    
    train_dataset=TensorDataset(X_train_tensor,y_train_tensor)
    val_dataset=TensorDataset(X_test_tensor,y_test_tensor)
    input_size=X_train_tensor.shape[1]
    
    print(f"Tensor Conversion and dataset prep done!!Input Size:{input_size}")
    return train_dataset,val_dataset,input_size

def plot_metrics(train_losses, val_losses, train_accuracies, val_accuracies):
    plt.figure(figsize=(12, 6))

    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label='Training Accuracy')
    plt.plot(val_accuracies, label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy (%)')
    plt.legend()

    plt.show()

In [3]:
class SimpleNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)  # Linear layer mapping input to hidden layer
        self.relu = nn.ReLU()  # ReLU activation function
        self.fc2 = nn.Linear(hidden_size, num_classes)  # Linear layer mapping hidden layer to output classes

    def forward(self, x):
        out = self.fc1(x)  # Pass input through first linear layer
        out = self.relu(out)  # Apply ReLU activation function
        out = self.fc2(out)  # Pass through second linear layer to get class scores
        return out

class RobustNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(RobustNN, self).__init__()
        self.layer1 = nn.Linear(input_size, hidden_size)  # First linear layer
        self.bn1 = nn.BatchNorm1d(hidden_size)  # Batch normalization for first hidden layer
        self.relu = nn.ReLU()  # ReLU activation function
        self.dropout = nn.Dropout(0.5)  # Dropout for regularization with 50% probability
        self.layer2 = nn.Linear(hidden_size, hidden_size // 2)  # Second linear layer reducing size
        self.bn2 = nn.BatchNorm1d(hidden_size // 2)  # Batch normalization for second hidden layer
        self.layer3 = nn.Linear(hidden_size // 2, num_classes)  # Final linear layer to output classes

    def forward(self, x):
        x = self.layer1(x)  # Pass input through the first layer
        x = self.bn1(x)  # Apply batch normalization
        x = self.relu(x)  # Apply ReLU activation
        x = self.dropout(x)  # Apply dropout
        x = self.layer2(x)  # Pass through the second linear layer
        x = self.bn2(x)  # Apply batch normalization
        x = self.relu(x)  # Apply ReLU activation
        x = self.layer3(x)  # Output layer to get class scores
        return x

In [4]:
def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '65150'
    torch.distributed.init_process_group('nccl', rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)  # set the current CUDA device using the rank
    print(f"[Rank {rank}] Initialization complete. Using world size {world_size}.")
def cleanup():
    dist.destroy_process_group()

In [5]:
def train(rank, world_size, input_size, train_dataset, val_dataset, batch_size=32, num_epochs=1):
    setup(rank, world_size)  # Initialize DDP environment

    # Initialize model and move it to the specified device (GPU)
    model = RobustNN(input_size=input_size, hidden_size=13, num_classes=13).to(rank)
    # Wrapped in DDP to synchronize gradients
    ddp_model = DDP(model, device_ids=[rank])
    
    # Create samplers and loaders for training and validation datasets
    train_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank, shuffle=False)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False, sampler=train_sampler, pin_memory=False, num_workers=0)
    val_sampler = DistributedSampler(val_dataset, num_replicas=world_size, rank=rank, shuffle=False)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, sampler=val_sampler, pin_memory=False, num_workers=0)
    
    # Setup optimizer and loss function
    optimizer = optim.Adam(ddp_model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()

    # Metrics to keep track of progress
    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []
    epoch_times = []
    
    total_start_time = time.time()
    # Training loop
    for epoch in tqdm(range(num_epochs)):
        epoch_start_time = time.time()
        train_sampler.set_epoch(epoch)  # Ensures proper shuffling per epoch
        train_loss = 0.0
        correct = 0
        total = 0
        ddp_model.train()  # Set model to training mode

        for batch_idx, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(rank), labels.to(rank)  # Move data to the correct device
            optimizer.zero_grad()  # Clear gradients
            outputs = ddp_model(inputs)  # Forward pass
            loss = criterion(outputs, labels)  # Compute loss
            loss.backward()  # Backpropagate error
            optimizer.step()  # Update weights

            train_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        # Log training metrics
        epoch_loss = train_loss / len(train_loader)
        train_losses.append(epoch_loss)
        train_accuracies.append(100 * correct / total)
        print(f"[Rank {rank}] Epoch {epoch+1} average loss: {epoch_loss}")

        # Validation step
        ddp_model.eval()  # Set model to evaluation mode
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(rank), labels.to(rank)
                outputs = ddp_model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        # Log validation metrics
        val_losses.append(val_loss / len(val_loader))
        val_accuracies.append(100 * correct / total)
        print(f"[Rank {rank}] Epoch {epoch+1} validation loss: {val_loss / len(val_loader)}")
        print(f"[Rank {rank}] Epoch {epoch+1} validation accuracy: {100 * correct / total:.4f}")

        ddp_model.train()  # Set model back to training mode
        epoch_time = time.time() - epoch_start_time
        epoch_times.append(epoch_time)
        print(f"Rank {rank} and Epoch {epoch+1} Time Taken:{epoch_time:.2f} seconds")

    total_training_time = time.time() - total_start_time
    print(f"Rank {rank}: Total training time: {total_training_time:.2f} seconds")
    
    cleanup()  # Clean up DDP setup
    #if rank == 0:  # Plot metrics if this is the main process
    #    plot_metrics(train_losses, val_losses, train_accuracies, val_accuracies)

    print("Training Done!")

In [6]:
if __name__ == '__main__':
    # Load dataset from CSV
    astro_ds = dataloading(path='./dataset.csv')
    
    # Handle null values and separate features and labels
    X_imputed, y = nullvalue_handling(astro_ds)
    
    # Select important features based on some criteria
    astro_ds_filtered = feature_importance(X_imputed, y)
    
    # Re-handle null values after filtering features
    X_imputed, y = nullvalue_handling(astro_ds_filtered)
    
    # Split data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X_imputed, y, test_size=0.2, shuffle=True, random_state=42)
    
    # Prepare data by scaling features and encoding labels
    X_train_scaled, X_test_scaled, y_train_encoded, y_test_encoded = dataprep(X_train, X_test, y_train, y_test)
    
    # Convert datasets to tensor datasets suitable for PyTorch
    train_dataset, val_dataset, input_size = tensordataset(X_train_scaled, y_train_encoded, X_test_scaled, y_test_encoded)

    # Start timing the distributed training setup
    start_time = time.time()
    
    # Determine the number of GPUs available
    world_size = torch.cuda.device_count()
    print(f"World Size: {world_size}")
    
    processes = []
    # Create a separate process for each GPU to handle training
    for rank in range(world_size):
        p = torch.multiprocessing.Process(target=train, args=(rank, world_size, input_size, train_dataset, val_dataset, 32, 15))
        p.start()
        processes.append(p)
    
    # Wait for all processes to finish
    for p in processes:
        p.join()

    # Calculate and print the total elapsed time for training
    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Total Elapsed time: {elapsed_time:.2f} seconds")


Data Loaded and dropped irrelavant columns!!


Computing DataFrames: 100%|██████████| 2/2 [00:03<00:00,  1.97s/it]


Null Value Handling Done!
Feature Extraction done!


Computing DataFrames: 100%|██████████| 2/2 [00:03<00:00,  1.71s/it]


Null Value Handling Done!
Scaling and Encoding Done!
Tensor Conversion and dataset prep done!!Input Size:11
World Size: 1
[Rank 0] Initialization complete. Using world size 1.


  0%|          | 0/15 [00:00<?, ?it/s]

[Rank 0] Epoch 1 average loss: 0.3031605256538253
[Rank 0] Epoch 1 validation loss: 0.24297098016456214
[Rank 0] Epoch 1 validation accuracy: 92.0237
Rank 0 and Epoch 1 Time Taken:58.58 seconds


  7%|▋         | 1/15 [00:58<13:40, 58.58s/it]

[Rank 0] Epoch 2 average loss: 0.2462173060830692
[Rank 0] Epoch 2 validation loss: 0.23976676132090974
[Rank 0] Epoch 2 validation accuracy: 92.1317
Rank 0 and Epoch 2 Time Taken:83.05 seconds


 13%|█▎        | 2/15 [02:21<15:48, 72.98s/it]

[Rank 0] Epoch 3 average loss: 0.24186384169942293
[Rank 0] Epoch 3 validation loss: 0.2350219040668374
[Rank 0] Epoch 3 validation accuracy: 92.0623
Rank 0 and Epoch 3 Time Taken:62.00 seconds


 20%|██        | 3/15 [03:23<13:35, 67.97s/it]

[Rank 0] Epoch 4 average loss: 0.23940203913037145
[Rank 0] Epoch 4 validation loss: 0.23801030514810306
[Rank 0] Epoch 4 validation accuracy: 92.0680
Rank 0 and Epoch 4 Time Taken:57.85 seconds


 27%|██▋       | 4/15 [04:21<11:43, 63.97s/it]

[Rank 0] Epoch 5 average loss: 0.23639223494190312
[Rank 0] Epoch 5 validation loss: 0.261679640406128
[Rank 0] Epoch 5 validation accuracy: 91.9397
Rank 0 and Epoch 5 Time Taken:59.01 seconds


 33%|███▎      | 5/15 [05:20<10:21, 62.19s/it]

[Rank 0] Epoch 6 average loss: 0.2333888474295245
[Rank 0] Epoch 6 validation loss: 0.31872281319758833
[Rank 0] Epoch 6 validation accuracy: 90.1901
Rank 0 and Epoch 6 Time Taken:58.58 seconds


 40%|████      | 6/15 [06:19<09:08, 60.96s/it]

[Rank 0] Epoch 7 average loss: 0.23009176170804754
[Rank 0] Epoch 7 validation loss: 0.3217087206883103
[Rank 0] Epoch 7 validation accuracy: 90.3758
Rank 0 and Epoch 7 Time Taken:58.20 seconds


 47%|████▋     | 7/15 [07:17<08:00, 60.06s/it]

[Rank 0] Epoch 8 average loss: 0.22806045590040072
[Rank 0] Epoch 8 validation loss: 0.30172603170607726
[Rank 0] Epoch 8 validation accuracy: 91.2464
Rank 0 and Epoch 8 Time Taken:58.25 seconds


 53%|█████▎    | 8/15 [08:15<06:56, 59.48s/it]

[Rank 0] Epoch 9 average loss: 0.22800579647325442
[Rank 0] Epoch 9 validation loss: 0.3463103831078011
[Rank 0] Epoch 9 validation accuracy: 90.9043
Rank 0 and Epoch 9 Time Taken:58.45 seconds


 60%|██████    | 9/15 [09:13<05:54, 59.16s/it]

[Rank 0] Epoch 10 average loss: 0.22632484066371136
[Rank 0] Epoch 10 validation loss: 0.3506210121694418
[Rank 0] Epoch 10 validation accuracy: 90.1828
Rank 0 and Epoch 10 Time Taken:58.23 seconds


 67%|██████▋   | 10/15 [10:12<04:54, 58.87s/it]

[Rank 0] Epoch 11 average loss: 0.22597560106673745
[Rank 0] Epoch 11 validation loss: 0.37038494929389293
[Rank 0] Epoch 11 validation accuracy: 90.4165
Rank 0 and Epoch 11 Time Taken:58.29 seconds


 73%|███████▎  | 11/15 [11:10<03:54, 58.69s/it]

[Rank 0] Epoch 12 average loss: 0.22481657928580262
[Rank 0] Epoch 12 validation loss: 0.3871530116265389
[Rank 0] Epoch 12 validation accuracy: 90.2840
Rank 0 and Epoch 12 Time Taken:58.25 seconds


 80%|████████  | 12/15 [12:08<02:55, 58.56s/it]

[Rank 0] Epoch 13 average loss: 0.22538005609670475
[Rank 0] Epoch 13 validation loss: 0.39002264216212684
[Rank 0] Epoch 13 validation accuracy: 90.4901
Rank 0 and Epoch 13 Time Taken:58.31 seconds


 87%|████████▋ | 13/15 [13:07<01:56, 58.49s/it]

[Rank 0] Epoch 14 average loss: 0.22445262109146003
[Rank 0] Epoch 14 validation loss: 0.38923822485760506
[Rank 0] Epoch 14 validation accuracy: 90.2094
Rank 0 and Epoch 14 Time Taken:57.90 seconds


 93%|█████████▎| 14/15 [14:04<00:58, 58.31s/it]

[Rank 0] Epoch 15 average loss: 0.2235642375779405
[Rank 0] Epoch 15 validation loss: 0.42239211968142687
[Rank 0] Epoch 15 validation accuracy: 90.4624
Rank 0 and Epoch 15 Time Taken:58.33 seconds


100%|██████████| 15/15 [15:03<00:00, 60.22s/it]


Rank 0: Total training time: 903.32 seconds
Training Done!
Total Elapsed time: 909.73 seconds
