In [1]:
# =========================================================================
# === NOTEBOOK CONFIGURATION ===
# Set this to False to quickly load the best parameters for the autoencoder.
# Set this to True to run a full Optuna study to find new best parameters.
# RUN_FULL_OPTUNA_STUDY = True
# =========================================================================

In [2]:
from google.colab import drive
drive.mount('/content/drive')
!pip install -U optuna
!pip install -U plotly
!pip install -U kaleido
!plotly_get_chrome
import pandas as pd
import numpy as np
import requests
import gzip
import json
import io
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import optuna
from plotly.io import show
from datetime import datetime
from optuna.trial import TrialState
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset

Mounted at /content/drive
Collecting optuna
  Downloading optuna-4.5.0-py3-none-any.whl.metadata (17 kB)
Collecting colorlog (from optuna)
  Downloading colorlog-6.9.0-py3-none-any.whl.metadata (10 kB)
Downloading optuna-4.5.0-py3-none-any.whl (400 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m400.9/400.9 kB[0m [31m8.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading colorlog-6.9.0-py3-none-any.whl (11 kB)
Installing collected packages: colorlog, optuna
Successfully installed colorlog-6.9.0 optuna-4.5.0
Collecting plotly
  Downloading plotly-6.3.0-py3-none-any.whl.metadata (8.5 kB)
Downloading plotly-6.3.0-py3-none-any.whl (9.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.8/9.8 MB[0m [31m56.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: plotly
  Attempting uninstall: plotly
    Found existing installation: plotly 5.24.1
    Uninstalling plotly-5.24.1:
      Successfully uninstalled plotly-5.24.1
Successfully installed 

In [3]:
# --- Dataset Download  ---
def download_hyg_dataset():
    """
    Attempts to download the gzipped CSV file from a list of URLs.

    Returns:
        pd.DataFrame: A pandas DataFrame of the HYG data if successful, None otherwise.
    """
    # URLs to official public HYG data set repository and backup copy hosted on Google Drive.
    HYG_URLS = ['https://codeberg.org/astronexus/hyg/media/branch/main/data/hyg/CURRENT/hyg_v42.csv.gz',
                'https://drive.google.com/uc?export=download&id=1U2apsUPjQR_DllzF74y-pV3KjVTK3FJW']

    hyg_file = None
    print("\nStarting data pipeline: Attempting to download HYG star data...")

    for url in HYG_URLS:
        try:
            print(f"Trying URL: {url}")
            response = requests.get(url, timeout=30)
            response.raise_for_status()
            hyg_file = io.BytesIO(response.content)
            hyg_df = pd.read_csv(hyg_file, compression='gzip')
            print("Download successful.")
            return hyg_df
        except requests.exceptions.RequestException as e:
            print(f"Error downloading from {url}: {e}")
            print("Trying next URL...")
        except Exception as e:
            print(f"An unexpected error occurred during data processing: {e}")
            return None

    print("\nAll download attempts failed. Please check your internet connection or the URLs.")
    return None

# Download the data
df = download_hyg_dataset()


Starting data pipeline: Attempting to download HYG star data...
Trying URL: https://codeberg.org/astronexus/hyg/media/branch/main/data/hyg/CURRENT/hyg_v42.csv.gz
Download successful.


In [4]:
# --- Dataset Inspection ---
# Set pandas display options to see all columns
pd.set_option('display.max_columns', None)

print("\n--- HYG Dataset Head (First 5 Rows) ---")
print(df.head())

print("\n--- Dataset Info ---")
df.info()

unique_spect_count = df['spect'].nunique(dropna=True)
print(f"\nThere are {unique_spect_count} unique spectral types in the dataset.")

# Count the occurrences of each spectral type
spect_counts = df['spect'].value_counts()

# This will show spectral types with the lowest counts
rarest_spect_types = spect_counts.tail(10)
print("\n--- Rarest Spectral Types Sample ---")
print(rarest_spect_types)

# Get a summary of how many types appear only a few times
types_with_one_occurrence = (spect_counts == 1).sum()
types_with_two_occurrences = (spect_counts == 2).sum()

print(f"\nNumber of spectral types that appear only once: {types_with_one_occurrence}")
print(f"Number of spectral types that appear only twice: {types_with_two_occurrences}")


--- HYG Dataset Head (First 5 Rows) ---
   id  hip        hd  hr   gl   bf proper        ra        dec      dist  \
0   0  NaN       NaN NaN  NaN  NaN    Sol  0.000000   0.000000    0.0000   
1   1  1.0  224700.0 NaN  NaN  NaN    NaN  0.000060   1.089009  219.7802   
2   2  2.0  224690.0 NaN  NaN  NaN    NaN  0.000283 -19.498840   47.9616   
3   3  3.0  224699.0 NaN  NaN  NaN    NaN  0.000335  38.859279  442.4779   
4   4  4.0  224707.0 NaN  NaN  NaN    NaN  0.000569 -51.893546  134.2282   

     pmra  pmdec   rv    mag  absmag spect     ci           x         y  \
0    0.00   0.00  0.0 -26.70   4.850   G2V  0.656    0.000005  0.000000   
1   -5.20  -1.88  0.0   9.10   2.390    F5  0.482  219.740502  0.003449   
2  181.21  -0.93  0.0   9.27   5.866   K3V  0.999   45.210918  0.003365   
3    5.24  -2.91  0.0   6.61  -1.619    B9 -0.019  344.552785  0.030213   
4   62.85   0.16  0.0   8.06   2.421   F0V  0.370   82.835513  0.012476   

            z            vx        vy            vz

In [5]:
# --- 2. Data Preprocessing and Feature Selection ---

print(f"Original dataset size: {len(df)}")

# For simplicity, only use stars with non-null values for the chosen features.
features = ['absmag', 'ci', 'spect']
df_filtered = df.dropna(subset=features)
print(f"Filtered dataset size after dropping rows with missing features: {len(df_filtered)}")

# Prepare numerical and categorical data
numerical_features = df_filtered[['absmag', 'ci']]

# Scale the numerical features
scaler = StandardScaler()
scaled_numerical_features = scaler.fit_transform(numerical_features)

# Split the data into training and validation sets
# We split the numerical features, the encoded spectral types, and the star info
X_num_train, X_num_val = train_test_split(scaled_numerical_features, test_size=0.2, random_state=42)

# Convert the NumPy arrays to PyTorch tensors
X_num_train = torch.tensor(X_num_train, dtype=torch.float32)
X_num_val = torch.tensor(X_num_val, dtype=torch.float32)

# Create TensorDatasets for training and validation
# Note that we are passing both the numerical and categorical tensors
train_dataset = TensorDataset(X_num_train)
val_dataset = TensorDataset(X_num_val)

# Create DataLoaders for training and validation
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=False)

print(f"\nTraining set size: {len(X_num_train)} samples")
print(f"Validation set size: {len(X_num_val)} samples")

Original dataset size: 119626
Filtered dataset size after dropping rows with missing features: 115368

Training set size: 92294 samples
Validation set size: 23074 samples


In [6]:
def print_model_parameters(model):
    """
    Prints the name and shape of each parameter in a PyTorch model.
    """
    print("\n--- Model Parameters ---")
    for name, param in model.named_parameters():
        if param.requires_grad:
            print(f"Layer: {name:<25} | Shape: {list(param.shape)}")
    print("------------------------\n")

def save_checkpoint(trial, model, best_val_loss, file_path):
    """
    Saves a dictionary containing the model's state, hyperparameters, and trial info.

    Args:
        trial (optuna.trial.Trial): The current Optuna trial object.
        model (torch.nn.Module): The model to save.
        best_val_loss (float): The best validation loss achieved so far.
        file_path (str): The full path for the checkpoint file.
    """
    # Create the checkpoint dictionary
    checkpoint = {
        'trial_number': trial.number,
        'hyperparameters': trial.params,
        'model_state_dict': model.state_dict(),
        'val_loss': best_val_loss
    }

    # Save the checkpoint to a uniquely named file
    torch.save(checkpoint, file_path)
    print(f"Checkpoint saved for Trial {trial.number} with a new best Val Loss of {best_val_loss:.4f} to {file_path}")

def save_study_metadata(study: optuna.study.Study, file_path: str):
    """
    Saves the metadata of an Optuna study to a JSON file.

    Args:
        study (optuna.study.Study): The Optuna study object to save.
        file_path (str): The full path to the output JSON file in Google Drive.
    """
    try:
        # Get the current time for the log
        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

        # Extract essential information from the best trial
        best_trial = study.best_trial
        best_params = best_trial.params
        best_value = best_trial.value

        # Create a dictionary to hold the metadata
        metadata = {
            "study_name": study.study_name,
            "timestamp": timestamp,
            "best_value": best_value,
            "best_params": best_params,
            "state": str(study.best_trial.state),
            "user_attributes": study.best_trial.user_attrs,
            "study_direction": str(study.direction),
            "trials_completed": len(study.trials)
        }

        # Convert the dictionary to a JSON string
        json_metadata = json.dumps(metadata, indent=4, sort_keys=True)

        # Write the JSON string to the specified file
        with open(file_path, 'w') as f:
            f.write(json_metadata)

        print(f"Study metadata successfully saved to: {file_path}")

    except Exception as e:
        print(f"An error occurred while saving study metadata: {e}")

In [7]:
class Autoencoder(nn.Module):
    def __init__(self,
                 num_numerical_features: int,
                 n_encoder_layers: int,
                 n_hidden_neurons: int,
                 dropout_rate: float,
                 latent_dim: int=2):

        super(Autoencoder, self).__init__()

        # Save this for the forward pass
        self.num_numerical_features = num_numerical_features

        # Dynamically build the encoder
        encoder_layers = []
        # Store layer dimensions to mirror in the decoder
        layer_dims = [num_numerical_features] # Start with the number of numerical features as input
        for i in range(n_encoder_layers):
            # The input dimension for the first encoder layer is num_numerical_features
            input_dim = layer_dims[-1]
            # The output dimension is calculated based on n_hidden_neurons and latent_dim
            output_dim = n_hidden_neurons // (2**i) if i < n_encoder_layers - 1 else latent_dim
            encoder_layers.append(nn.Linear(input_dim, output_dim))
            encoder_layers.append(nn.ReLU())
            encoder_layers.append(nn.Dropout(dropout_rate))
            layer_dims.append(output_dim)
        self.encoder = nn.Sequential(*encoder_layers)

        # Dynamically build the decoder by reversing the layer dimensions
        decoder_layers = []
        # The decoder starts from the latent dimension
        reversed_dims = list(reversed(layer_dims))
        for i in range(len(reversed_dims) - 1):
            decoder_layers.append(nn.Linear(reversed_dims[i], reversed_dims[i+1]))
            # Add ReLU for all but the final layer
            if i < len(reversed_dims) - 2:
                decoder_layers.append(nn.ReLU())

        self.decoder = nn.Sequential(*decoder_layers)


    def forward(self, x_num):
        # Pass the tensor through the encoder to get the latent representation
        latent_coords = self.encoder(x_num)

        # Pass the latent representation through the decoder to get a reconstruction
        reconstruction = self.decoder(latent_coords)

        # The decoder now reconstructs the original numerical features directly
        return latent_coords, reconstruction

In [9]:
# --- 5. Main Execution and Export ---
def objective(trial, device, study_name):

    # Hyperparameters to be tuned by Optuna
    learning_rate = trial.suggest_float("learning_rate", 1e-4, 1e-2, log=True)
    dropout_rate = trial.suggest_float("dropout_rate", 0.0, 0.05)
    n_encoder_layers = trial.suggest_int("n_encoder_layers", 3, 4)
    n_hidden_neurons = trial.suggest_int("n_hidden_neurons", 64, 192, step=16)

    # Define the model with the suggested hyperparameters
    model = Autoencoder(
            num_numerical_features=len(features) - 1,  # 'absmag', 'ci' - 'spect'
            n_encoder_layers=n_encoder_layers,
            n_hidden_neurons=n_hidden_neurons,
            dropout_rate=dropout_rate
            ).to(device)

    # Define the optimizer and scheduler
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='min',
    factor=0.5,
    patience=20)

    print_model_parameters(model)

    # Training Loop with Early Stopping
    patience = 25
    epochs_no_improve = 0
    best_val_loss = float('inf')
    epochs = 100 # Set a reasonable max number of epochs per trial

    for epoch in range(epochs):
        # Training
        model.train()
        for batch in train_dataloader:
            batch = batch[0].to(device)

            optimizer.zero_grad()

            # Corrected forward pass with new returns
            latent_coords, reconstructed_num = model(batch)

            numerical_loss = F.mse_loss(batch, reconstructed_num)
            loss = numerical_loss
            loss.backward()
            optimizer.step()

        # Validation
        model.eval()
        total_val_loss = 0
        with torch.no_grad():
            for batch in val_dataloader:
                batch = batch[0].to(device)

                # Corrected forward pass with new returns
                latent_coords, reconstructed_num = model(batch)

                val_loss = F.mse_loss(batch, reconstructed_num)

                total_val_loss += val_loss.item() * batch.size(0)

        avg_val_loss = total_val_loss / len(val_dataloader)

        # Early Stopping and Pruning
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            epochs_no_improve = 0
            checkpoint_path = f'/content/drive/MyDrive/Checkpoints/{study_name}_trial_{trial.number}_best.pth'
            os.makedirs(os.path.dirname(checkpoint_path), exist_ok=True)
            save_checkpoint(trial, model, best_val_loss, checkpoint_path)

        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                print(f"Trial {trial.number}: Early stopping triggered.")
                break

        # Report the loss to Optuna
        trial.report(avg_val_loss, epoch)

        # Prune the trial if it's not performing well
        if trial.should_prune():
            print(f"Trial {trial.number}: Pruning trial at epoch {epoch + 1}.")
            raise optuna.exceptions.TrialPruned()

        print(f"Trial {trial.number}, Epoch [{epoch + 1}/{epochs}], Val Loss: {avg_val_loss:.4f}")

    return best_val_loss

if __name__ == '__main__':
    study_name = 'AE_HRParams_Latent_Wide_r2'
    # Set up the device (CPU or GPU)
    if torch.cuda.is_available():
        device = torch.device("cuda")
        print("GPU is available and will be used.")
    else:
        device = torch.device("cpu")
        print("GPU not available, using CPU.")
    # Hyperparameter Tuning with Optuna
    # This will find the best hyperparameters by running multiple trials.
    # The 'objective' function is defined separately and contains the training loop.
    study = optuna.create_study(
            direction="minimize",
            storage=f"sqlite:////content/drive/MyDrive/Data/{study_name}.db",
            study_name=f"{study_name}",
            load_if_exists=True)
    study.optimize(lambda trial: objective(trial, device, study_name), n_trials=5)

    # Call the function to save the metadata
    output_path = f'/content/drive/MyDrive/Logs/{study_name}.json'
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    save_study_metadata(study, output_path)

GPU not available, using CPU.


[I 2025-09-22 00:14:34,160] Using an existing study with name 'AE_HRParams_Latent_Wide_r2' instead of creating a new one.



--- Model Parameters ---
Layer: encoder.0.weight          | Shape: [144, 2]
Layer: encoder.0.bias            | Shape: [144]
Layer: encoder.3.weight          | Shape: [72, 144]
Layer: encoder.3.bias            | Shape: [72]
Layer: encoder.6.weight          | Shape: [36, 72]
Layer: encoder.6.bias            | Shape: [36]
Layer: encoder.9.weight          | Shape: [2, 36]
Layer: encoder.9.bias            | Shape: [2]
Layer: decoder.0.weight          | Shape: [36, 2]
Layer: decoder.0.bias            | Shape: [36]
Layer: decoder.2.weight          | Shape: [72, 36]
Layer: decoder.2.bias            | Shape: [72]
Layer: decoder.4.weight          | Shape: [144, 72]
Layer: decoder.4.bias            | Shape: [144]
Layer: decoder.6.weight          | Shape: [2, 144]
Layer: decoder.6.bias            | Shape: [2]
------------------------



[I 2025-09-22 00:14:46,952] Trial 50 pruned. 


Checkpoint saved for Trial 50 with a new best Val Loss of 0.3739 to /content/drive/MyDrive/Checkpoints/AE_HRParams_Latent_Wide_r2_trial_50_best.pth
Trial 50: Pruning trial at epoch 1.

--- Model Parameters ---
Layer: encoder.0.weight          | Shape: [128, 2]
Layer: encoder.0.bias            | Shape: [128]
Layer: encoder.3.weight          | Shape: [64, 128]
Layer: encoder.3.bias            | Shape: [64]
Layer: encoder.6.weight          | Shape: [32, 64]
Layer: encoder.6.bias            | Shape: [32]
Layer: encoder.9.weight          | Shape: [2, 32]
Layer: encoder.9.bias            | Shape: [2]
Layer: decoder.0.weight          | Shape: [32, 2]
Layer: decoder.0.bias            | Shape: [32]
Layer: decoder.2.weight          | Shape: [64, 32]
Layer: decoder.2.bias            | Shape: [64]
Layer: decoder.4.weight          | Shape: [128, 64]
Layer: decoder.4.bias            | Shape: [128]
Layer: decoder.6.weight          | Shape: [2, 128]
Layer: decoder.6.bias            | Shape: [2]
------

[I 2025-09-22 00:17:45,068] Trial 51 pruned. 


Trial 51: Pruning trial at epoch 32.

--- Model Parameters ---
Layer: encoder.0.weight          | Shape: [144, 2]
Layer: encoder.0.bias            | Shape: [144]
Layer: encoder.3.weight          | Shape: [72, 144]
Layer: encoder.3.bias            | Shape: [72]
Layer: encoder.6.weight          | Shape: [36, 72]
Layer: encoder.6.bias            | Shape: [36]
Layer: encoder.9.weight          | Shape: [2, 36]
Layer: encoder.9.bias            | Shape: [2]
Layer: decoder.0.weight          | Shape: [36, 2]
Layer: decoder.0.bias            | Shape: [36]
Layer: decoder.2.weight          | Shape: [72, 36]
Layer: decoder.2.bias            | Shape: [72]
Layer: decoder.4.weight          | Shape: [144, 72]
Layer: decoder.4.bias            | Shape: [144]
Layer: decoder.6.weight          | Shape: [2, 144]
Layer: decoder.6.bias            | Shape: [2]
------------------------



[I 2025-09-22 00:17:50,574] Trial 52 pruned. 


Checkpoint saved for Trial 52 with a new best Val Loss of 3.6427 to /content/drive/MyDrive/Checkpoints/AE_HRParams_Latent_Wide_r2_trial_52_best.pth
Trial 52: Pruning trial at epoch 1.

--- Model Parameters ---
Layer: encoder.0.weight          | Shape: [128, 2]
Layer: encoder.0.bias            | Shape: [128]
Layer: encoder.3.weight          | Shape: [64, 128]
Layer: encoder.3.bias            | Shape: [64]
Layer: encoder.6.weight          | Shape: [32, 64]
Layer: encoder.6.bias            | Shape: [32]
Layer: encoder.9.weight          | Shape: [2, 32]
Layer: encoder.9.bias            | Shape: [2]
Layer: decoder.0.weight          | Shape: [32, 2]
Layer: decoder.0.bias            | Shape: [32]
Layer: decoder.2.weight          | Shape: [64, 32]
Layer: decoder.2.bias            | Shape: [64]
Layer: decoder.4.weight          | Shape: [128, 64]
Layer: decoder.4.bias            | Shape: [128]
Layer: decoder.6.weight          | Shape: [2, 128]
Layer: decoder.6.bias            | Shape: [2]
------

[I 2025-09-22 00:17:55,385] Trial 53 pruned. 


Checkpoint saved for Trial 53 with a new best Val Loss of 0.2138 to /content/drive/MyDrive/Checkpoints/AE_HRParams_Latent_Wide_r2_trial_53_best.pth
Trial 53: Pruning trial at epoch 1.

--- Model Parameters ---
Layer: encoder.0.weight          | Shape: [112, 2]
Layer: encoder.0.bias            | Shape: [112]
Layer: encoder.3.weight          | Shape: [56, 112]
Layer: encoder.3.bias            | Shape: [56]
Layer: encoder.6.weight          | Shape: [28, 56]
Layer: encoder.6.bias            | Shape: [28]
Layer: encoder.9.weight          | Shape: [2, 28]
Layer: encoder.9.bias            | Shape: [2]
Layer: decoder.0.weight          | Shape: [28, 2]
Layer: decoder.0.bias            | Shape: [28]
Layer: decoder.2.weight          | Shape: [56, 28]
Layer: decoder.2.bias            | Shape: [56]
Layer: decoder.4.weight          | Shape: [112, 56]
Layer: decoder.4.bias            | Shape: [112]
Layer: decoder.6.weight          | Shape: [2, 112]
Layer: decoder.6.bias            | Shape: [2]
------

[I 2025-09-22 00:18:01,349] Trial 54 pruned. 


Checkpoint saved for Trial 54 with a new best Val Loss of 0.4693 to /content/drive/MyDrive/Checkpoints/AE_HRParams_Latent_Wide_r2_trial_54_best.pth
Trial 54: Pruning trial at epoch 1.
Study metadata successfully saved to: /content/drive/MyDrive/Logs/AE_HRParams_Latent_Wide_r2.json


In [10]:
fig_1 = optuna.visualization.plot_optimization_history(study)
fig_1.write_image(f'/content/drive/MyDrive/Plots/{study_name}_history.png')
show(fig_1)
fig_2 = optuna.visualization.plot_param_importances(study)
fig_2.write_image(f'/content/drive/MyDrive/Plots/{study_name}_importances.png')
show(fig_2)
fig_3 = optuna.visualization.plot_parallel_coordinate(study)
fig_3.write_image(f'/content/drive/MyDrive/Plots/{study_name}_parallel.png')
show(fig_3)

In [14]:
# Get the best trial's parameters and value
best_params = study.best_trial.params
best_loss = study.best_trial.value
best_trial = study.best_trial.number
best_weights_url = f'/content/drive/MyDrive/Checkpoints/{study_name}_trial_{best_trial}_best.pth'
best_weights = torch.load(best_weights_url)['model_state_dict']
print(f'\nBest study trial:', best_trial)
print(f"Best validation loss: {best_loss}")
print(f'Best hyperparameters found by Optuna:', best_params)

# 2. Re-instantiate and Train the Best Model
# We create a new model with the best parameters found by Optuna
best_model = Autoencoder(
             num_numerical_features=len(features)-1,
             n_encoder_layers=best_params['n_encoder_layers'],
             n_hidden_neurons=best_params['n_hidden_neurons'],
             dropout_rate=best_params['dropout_rate'])

# Load the best model weights that were saved during the tuning process
best_model.load_state_dict(best_weights)

# The best model is already trained. No need to re-train.
print(f"Best model loaded from '{best_weights_url}'")

# 3. Generate Coordinates for ALL Stars
# This is a crucial step to get the full star map, not just the validation set.
print("Generating 2D coordinates for all stars.")
best_model.eval() # Set model to evaluation mode

# Re-create the full tensor datasets for a clean, deterministic output.
scaled_numerical_features = scaler.fit_transform(df_filtered[['absmag', 'ci']])

full_dataset = TensorDataset(torch.tensor(scaled_numerical_features, dtype=torch.float32))
full_dataloader = DataLoader(full_dataset, batch_size=64, shuffle=False)

all_coords = []
with torch.no_grad():
    for batch in full_dataloader:
        coords, _ = best_model(batch[0])
        all_coords.extend(coords.numpy().tolist())


Best study trial: 47
Best validation loss: 0.0010102221465261958
Best hyperparameters found by Optuna: {'learning_rate': 0.0012580304169680257, 'dropout_rate': 0.0002484875027900211, 'n_encoder_layers': 2, 'n_hidden_neurons': 112}
Best model loaded from '/content/drive/MyDrive/Checkpoints/AE_HRParams_Latent_Wide_r2_trial_47_best.pth'
Generating 2D coordinates for all stars.


In [15]:
def process_spectral_type(spect_string):
    """
    Cleans and simplifies the spectral type string for visualization.

    Args:
        spect_string (str): The raw spectral type string.

    Returns:
        str: The processed spectral type string (e.g., 'G2').
    """
    if pd.isna(spect_string):
        return 'UNKNOWN'

    spect_string = spect_string.upper().strip()

    # Handle stars with multiple values (e.g., 'F3/F5V') by taking the first one
    if '/' in spect_string:
        spect_string = spect_string.split('/')[0]

    # Take the first two characters.
    simplified = spect_string[:2]

    # Check if the second character is not a digit.
    if len(simplified) < 2 or not simplified[1].isdigit():
        # Assign a default of '5' for stars with no number (e.g., 'M', 'K', 'O')
        simplified = simplified[0] + '5'

    return simplified

In [16]:
def export_star_data_to_csv_gz(df_filtered, all_coords, output_path):
    """
    Combines filtered star data with latent space coordinates and exports it
    to a compressed CSV file.

    This function removes unnecessary fields and includes only the data
    required for the front-end visualization, resulting in a significantly
    smaller and more efficient file.

    Args:
        df_filtered (pd.DataFrame): DataFrame containing filtered star data
                                     (e.g., from the HYG database).
        all_coords (list of lists): The latent space coordinates for each star.
        output_path (str): The full path to save the gzipped CSV file.
    """
    # 1. Create a DataFrame for the latent space coordinates
    latent_df = pd.DataFrame(all_coords, columns=['latent_x', 'latent_y',])

    # Reset index of the filtered DataFrame for a clean merge
    df_filtered = df_filtered.reset_index(drop=True)

    # 2. Combine the original star data with the new latent space data
    combined_df = pd.concat([df_filtered, latent_df], axis=1)

    # 3. Process the 'spect' column using the new function
    combined_df['spect'] = combined_df['spect'].apply(process_spectral_type)

    # 4. Select and reorder only the essential columns
    essential_df = combined_df[[
        'id',
        'latent_x',
        'latent_y',
        'x',
        'y',
        'z',
        'absmag',
        'spect'
    ]]

    # 5. Round the floating-point numbers to reduce file size
    essential_df = essential_df.round({
        'latent_x': 4,
        'latent_y': 4,
        'x': 4,
        'y': 4,
        'z': 4,
        'absmag': 4
    })

    # 6. Save the DataFrame directly to a gzipped CSV file
    essential_df.to_csv(output_path, index=False, compression='gzip')

    print(f"Essential star data saved to '{output_path}'")

export_star_data_to_csv_gz(df_filtered, all_coords, f'/content/drive/MyDrive/Data/{study_name}_front.csv.gz')

Essential star data saved to '/content/drive/MyDrive/Data/AE_HRParams_Latent_Wide_r2_front.csv.gz'
