In [1]:
# Importing Libraries
import torch
import torch.nn as nn
import math


In [2]:
# Defining components

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, lmax, base=1000):
        super().__init__()
        self.d_model = d_model
        self.lmax = lmax
        self.base = base
        
        # Pre-compute position encodings
        position = torch.arange(0, lmax).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(base) / d_model))
        pe = torch.zeros(lmax, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, t):
        return self.pe[:t.size(1)]

class FeedForwardNetwork(nn.Module):
    def __init__(self, d_in, d_model):
        super().__init__()
        self.mlp = nn.Linear(d_in, d_model)

    def forward(self, x):
        return self.mlp(x)

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.mha = nn.MultiheadAttention(d_model, num_heads)

    def forward(self, x, key_padding_mask=None):
        return self.mha(x, x, x, key_padding_mask=key_padding_mask)[0]

class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super().__init__()
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ff = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x, src_key_padding_mask=None):
        # Apply multi-head attention and add & norm
        x = self.norm1(x + self.mha(x, key_padding_mask=src_key_padding_mask))
        # Apply feed-forward network and add & norm
        x = self.norm2(x + self.ff(x))
        return x

class Encoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, num_layers):
        super().__init__()
        self.layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff) for _ in range(num_layers)])

    def forward(self, x, src_key_padding_mask=None):
        for layer in self.layers:
            x = layer(x, src_key_padding_mask)
        return x

class Decoder(nn.Module):
    def __init__(self, d_model, d_out):
        super().__init__()
        self.linear = nn.Linear(d_model, d_out)

    def forward(self, x):
        return self.linear(x)

class Astromer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, num_layers, lmax, base=1000):
        super().__init__()
        self.positional_encoding = PositionalEncoding(d_model, lmax, base)
        self.fnn = FeedForwardNetwork(1, d_model)
        self.encoder = Encoder(d_model, num_heads, d_ff, num_layers)
        self.decoder = Decoder(d_model, 1)

    def forward(self, times, magnitudes, lengths, mask_prob=0.15):
        # Generate positional encoding
        pe = self.positional_encoding(times)

        # Apply masking to magnitudes
        masked_magnitudes = self.mask_magnitudes(magnitudes, lengths, mask_prob)

        # Combine positional encoding and magnitudes
        x = pe.unsqueeze(0).expand(magnitudes.size(0), -1, -1) + self.fnn(masked_magnitudes.unsqueeze(-1))

        # Create padding mask
        padding_mask = self.create_padding_mask(lengths, times.size(1))

        # Pass through encoder
        encoded = self.encoder(x.transpose(0, 1), src_key_padding_mask=padding_mask)

        # Decode to reconstruct original signal
        reconstructed = self.decoder(encoded.transpose(0, 1)).squeeze(-1)

        return reconstructed

    def mask_magnitudes(self, magnitudes, lengths, mask_prob):
        # Create a mask with probability mask_prob
        mask = torch.rand_like(magnitudes) < mask_prob
        masked_magnitudes = magnitudes.clone()
        
        # Apply mask only to non-padded elements
        for i, length in enumerate(lengths):
            masked_magnitudes[i, :length][mask[i, :length]] = 0  # Set masked values to 0
        
        return masked_magnitudes

    def create_padding_mask(self, lengths, max_length):
        # Create a boolean mask where True values represent padding
        mask = torch.arange(max_length).expand(len(lengths), max_length) >= lengths.unsqueeze(1)
        return mask.to(lengths.device)


In [3]:
# Model setup 

# Hyperparameters for the Astromer model
d_model = 128    # Dimension of the model
num_heads = 4    # Number of attention heads
d_ff = 256       # Dimension of the feed-forward network
num_layers = 3   # Number of encoder layers
lmax = 100       # Maximum sequence length
base = 1000      # Base for positional encoding

# Initialize the Astromer model with the defined hyperparameters
model = Astromer(d_model, num_heads, d_ff, num_layers, lmax, base)

# Set up the optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


I just realized I do not really have any light curve data.

In [4]:
# More imports
import torch.utils.data as data
import numpy as np
import pandas as pd
import os
from sklearn.model_selection import train_test_split

In [5]:
# Generate synthetic light curve data
def generate_synthetic_data(num_samples, max_length, min_length=50):
    all_times = []
    all_magnitudes = []
    
    for _ in range(num_samples):
        # Randomly choose a length for each light curve between min_length and max_length
        length = np.random.randint(min_length, max_length + 1)
        
        # Generate MJD (Modified Julian Date) times
        # Start from January 1, 2000 (MJD 51544) and span about 10 years
        start_mjd = 51544
        end_mjd = start_mjd + 3652  # Approximately 10 years
        times = np.sort(np.random.uniform(start_mjd, end_mjd, length))
        
        # Generate magnitudes based on MJD times
        # Use a sine wave with some added Gaussian noise to simulate periodic variability
        magnitudes = np.sin((times - start_mjd) / 50) + np.random.normal(0, 0.1, length)
        
        all_times.append(times)
        all_magnitudes.append(magnitudes)
    
    return all_times, all_magnitudes

# Create a custom Dataset for light curves
class LightCurveDataset(data.Dataset):
    def __init__(self, times, magnitudes, max_length):
        self.times = times
        self.magnitudes = magnitudes
        self.max_length = max_length
    
    def __len__(self):
        return len(self.times)
    
    def __getitem__(self, idx):
        time = self.times[idx]
        magnitude = self.magnitudes[idx]
        
        # Pad sequences to max_length
        padded_time = torch.zeros(self.max_length)
        padded_magnitude = torch.zeros(self.max_length)
        
        length = len(time)
        padded_time[:length] = torch.tensor(time, dtype=torch.float32)
        padded_magnitude[:length] = torch.tensor(magnitude, dtype=torch.float32)
        
        return padded_time, padded_magnitude, length

# Function to create DataLoaders for training and testing
def create_dataloaders(batch_size=32, num_samples=1000, max_length=100, test_size=0.2):
    # Generate synthetic data
    times, magnitudes = generate_synthetic_data(num_samples, max_length)
    
    # Split data into training and testing sets
    times_train, times_test, mag_train, mag_test = train_test_split(times, magnitudes, test_size=test_size, random_state=42)
    
    # Create datasets
    train_dataset = LightCurveDataset(times_train, mag_train, max_length)
    test_dataset = LightCurveDataset(times_test, mag_test, max_length)
    
    # Create data loaders
    train_loader = data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    return train_loader, test_loader

# Save synthetic data to CSV file
def save_synthetic_data(times, magnitudes, filename):
    data_dir = "../Data"
    os.makedirs(data_dir, exist_ok=True)
    
    # Create a DataFrame with sample_id, time_mjd, and magnitude columns
    df = pd.DataFrame({
        "sample_id": np.repeat(range(len(times)), [len(t) for t in times]),
        "time_mjd": np.concatenate(times),
        "magnitude": np.concatenate(magnitudes)
    })
    
    # Save the DataFrame to a CSV file
    output_path = os.path.join(data_dir, filename)
    df.to_csv(output_path, index=False)
    print(f"Saved synthetic data to {output_path}")



In [6]:
# Generate and save synthetic data
num_samples = 1000
max_length = 100
times, magnitudes = generate_synthetic_data(num_samples, max_length)
save_synthetic_data(times, magnitudes, "synthetic_light_curves.csv")

# Create DataLoaders
train_loader, test_loader = create_dataloaders(batch_size=32, num_samples=num_samples, max_length=max_length)

Saved synthetic data to ../Data/synthetic_light_curves.csv


In [7]:
def train(model, optimizer, train_loader, test_loader, num_epochs):
    # Define loss function
    criterion = nn.MSELoss(reduction='none')
    
    # Initialize best test loss and model saving parameters
    best_test_loss = float('inf')
    model_dir = '../Models'
    os.makedirs(model_dir, exist_ok=True)
    best_model_path = os.path.join(model_dir, 'base-astromer.pth')
    
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        train_loss = 0.0
        for times, magnitudes, lengths in train_loader:
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward pass
            reconstructed = model(times, magnitudes, lengths)
            
            # Calculate loss only for non-padded elements
            loss = criterion(reconstructed, magnitudes)
            mask = torch.arange(magnitudes.size(1)).expand(magnitudes.size(0), magnitudes.size(1)) < lengths.unsqueeze(1)
            loss = (loss * mask.float()).sum() / mask.sum()
            
            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        
        # Calculate average training loss for the epoch
        train_loss /= len(train_loader)
        
        # Evaluation phase
        model.eval()
        test_loss = 0.0
        with torch.no_grad():
            for times, magnitudes, lengths in test_loader:
                # Forward pass
                reconstructed = model(times, magnitudes, lengths)
                
                # Calculate loss only for non-padded elements
                loss = criterion(reconstructed, magnitudes)
                mask = torch.arange(magnitudes.size(1)).expand(magnitudes.size(0), magnitudes.size(1)) < lengths.unsqueeze(1)
                loss = (loss * mask.float()).sum() / mask.sum()
                test_loss += loss.item()
        
        # Calculate average test loss for the epoch
        test_loss /= len(test_loader)
        
        # Print epoch results
        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}")
        
        # Save the model if it has the best test loss so far
        if test_loss < best_test_loss:
            best_test_loss = test_loss
            torch.save(model.state_dict(), best_model_path)
            print(f"New best model saved to {best_model_path}")
    
    print(f"Training completed. Best model saved with test loss: {best_test_loss:.4f}")


In [8]:
# Train the model
train(model, optimizer, train_loader, test_loader, num_epochs=10)

Epoch 1/10, Train Loss: 0.8101, Test Loss: 0.1632
New best model saved to ../Models/base-astromer.pth
Epoch 2/10, Train Loss: 0.0994, Test Loss: 0.0839
New best model saved to ../Models/base-astromer.pth
Epoch 3/10, Train Loss: 0.0810, Test Loss: 0.0824
New best model saved to ../Models/base-astromer.pth
Epoch 4/10, Train Loss: 0.0783, Test Loss: 0.0767
New best model saved to ../Models/base-astromer.pth
Epoch 5/10, Train Loss: 0.0759, Test Loss: 0.0769
Epoch 6/10, Train Loss: 0.0762, Test Loss: 0.0759
New best model saved to ../Models/base-astromer.pth
Epoch 7/10, Train Loss: 0.0762, Test Loss: 0.0760
Epoch 8/10, Train Loss: 0.0769, Test Loss: 0.0761
Epoch 9/10, Train Loss: 0.0766, Test Loss: 0.0751
New best model saved to ../Models/base-astromer.pth
Epoch 10/10, Train Loss: 0.0745, Test Loss: 0.0741
New best model saved to ../Models/base-astromer.pth
Training completed. Best model saved with test loss: 0.0741


In [9]:
# Test inference time
import time

def test_inference_time(model, test_loader, num_runs=100):
    """
    Measure the average inference time per sample for a given model.

    Args:
        model: The neural network model to test.
        test_loader: DataLoader containing the test dataset.
        num_runs (int): Number of inference runs to average over (default: 100).
    """
    # Set model to evaluation mode
    model.eval()
    # Get the device (CPU or GPU) that the model is on
    device = next(model.parameters()).device
    total_time = 0
    total_samples = 0
    
    with torch.no_grad():  # Disable gradient computation for inference
        for times, magnitudes, lengths in test_loader:
            # Move input data to the same device as the model
            times, magnitudes, lengths = times.to(device), magnitudes.to(device), lengths.to(device)
            batch_size = times.size(0)
            
            # Perform a warm-up run to ensure GPU is ready
            _ = model(times, magnitudes, lengths)
            
            # Timed runs
            start_time = time.time()
            for _ in range(num_runs):
                _ = model(times, magnitudes, lengths)
            end_time = time.time()
            
            # Calculate average inference time for this batch
            inference_time = (end_time - start_time) / num_runs
            total_time += inference_time * batch_size
            total_samples += batch_size
    
    # Calculate overall average inference time per sample
    avg_inference_time = total_time / total_samples
    print(f"Average inference time per sample: {avg_inference_time*1000:.2f} ms")

# Test inference time using the trained model and test data loader
test_inference_time(model, test_loader)


Average inference time per sample: 20.86 ms
