In [1]:
from datetime import datetime, timedelta
import numpy as np
import julian
from numpy import linalg
from scipy.linalg import sqrtm
from scipy.interpolate import KroghInterpolator


def parse_ephemeris_file(filename: str):
    
    with open(filename, 'r') as f:
        lines = f.readlines()
    
    headers = {}
    for i in range(3):
        line = lines[i].strip()
        if ':' in line:
            key, value = line.split(':', 1)
            headers[key] = value.strip()
        if ' ' in line:
            parts = line.split()
            for part in parts:
                if ':' in part:
                    key, value = part.split(':', 1)
                    headers[key] = value.strip()
    
    if lines[3].strip() != 'UVW':
        raise ValueError("Expected UVW frame specification")
    
    timestamps: List[datetime] = []
    positions: List[np.ndarray] = []
    velocities: List[np.ndarray] = []
    covariances_uwv: List[np.ndarray] = []
    covariances_eme: List[np.ndarray] = []
    
    i = 4
    while i < len(lines):
        state_parts = lines[i].strip().split()
        
        # More precise timestamp parsing
        timestamp_str = state_parts[0]
        year = int(timestamp_str[:4])
        doy = int(timestamp_str[4:7])
        hour = int(timestamp_str[7:9])
        minute = int(timestamp_str[9:11])
        second = int(timestamp_str[11:13])
        millisec = int(timestamp_str[14:17])
        
        timestamp = datetime(year, 1, 1).replace(hour=hour, minute=minute,
                                               second=second, microsecond=millisec*1000) + \
                   timedelta(days=doy-1)
        
        # Use np.float64 for higher precision
        pos = np.array([np.float64(x) for x in state_parts[1:4]])
        vel = np.array([np.float64(x) for x in state_parts[4:7]])
        
        cov_values = []
        for j in range(3):
            cov_values.extend([np.float64(x) for x in lines[i+1+j].strip().split()])
        
        cov_matrix_uwv = np.zeros((6, 6), dtype=np.float64)
        idx = 0
        for row in range(6):
            for col in range(row + 1):
                cov_matrix_uwv[row, col] = cov_values[idx]
                cov_matrix_uwv[col, row] = cov_values[idx]
                idx += 1
        
        
        timestamps.append(timestamp)
        positions.append(pos)
        velocities.append(vel)
        covariances_uwv.append(cov_matrix_uwv)
        
        i += 4
    
    return {
        'headers': headers,
        'timestamps': np.array(timestamps),
        'positions': np.array(positions, dtype=np.float64),
        'velocities': np.array(velocities, dtype=np.float64),
        'covariances': np.array(covariances_uwv, dtype=np.float64),
    }


def reconstruct_covariance_at_time(interpolated_points: np.ndarray):
    """
    Reconstruct the covariance matrix with improved numerical stability.
    """
    
    # Optimized Unscented Transform parameters
    n = 6
    alpha = np.float64(0.001)  # Reduced alpha for better numerical stability
    beta = np.float64(2.0)     # Optimal for Gaussian
    kappa = np.float64(3-n)    # Modified for better stability
    lambda_param = alpha * alpha * (n + kappa) - n

    # Precompute weights for efficiency and precision
    w0_m = lambda_param / (n + lambda_param)
    wn_m = np.float64(0.5) / (n + lambda_param)
    w0_c = w0_m + (1 - alpha * alpha + beta)
    wn_c = wn_m
        
    
    # Calculate mean state with improved precision
    mean_state = np.zeros(6, dtype=np.float64)
    mean_state = interpolated_points[0]
    #mean_state = w0_m * interpolated_points[0]
    #for i in range(1, len(interpolated_points)):
    #    mean_state += wn_m * interpolated_points[i]
    
    # Calculate covariance with improved numerical stability
    covariance = np.zeros((6, 6), dtype=np.float64)
    
    diff_0 = interpolated_points[0] - mean_state
    covariance = w0_c * np.outer(diff_0, diff_0)
    
    for i in range(1, len(interpolated_points)):
        diff = interpolated_points[i] - mean_state
        covariance += wn_c * np.outer(diff, diff)
        
    covariance = (covariance + covariance.T) / 2

    return mean_state, covariance


        
def generate_and_propagate_sigma_points(data):
    """
    Generate and propagate sigma points with improved numerical stability.
    """
    try:
        # Use high precision for Julian date conversion
        julian_dates = np.array([julian.to_jd(ts) for ts in data['timestamps']], dtype=np.float64)
        
        state_vectors = np.hstack((data['positions'], data['velocities']))
        covariances = data['covariances']
        
        sigma_points_dict = {}
        
        # Optimized Unscented Transform parameters
        n = 6
        alpha = np.float64(0.001)  # Reduced alpha for better numerical stability
        beta = np.float64(2.0)     # Optimal for Gaussian
        kappa = np.float64(3-n)    # Modified for better stability
        lambda_param = alpha * alpha * (n + kappa) - n
        
        # Precompute weights for efficiency and precision
        w0_m = lambda_param / (n + lambda_param)
        wn_m = np.float64(0.5) / (n + lambda_param)
        w0_c = w0_m + (1 - alpha * alpha + beta)
        wn_c = wn_m
        
        weights = {
            'mean': {'w0': w0_m, 'wn': wn_m},
            'covariance': {'w0': w0_c, 'wn': wn_c}
        }
        
        for idx, (jd, timestamp) in enumerate(zip(julian_dates, data['timestamps'])):
            try:
                state_vector = state_vectors[idx]
                covariance = covariances[idx]
                
                # Ensure symmetry of covariance matrix
                covariance = (covariance + covariance.T) / 2
                
                # Scale covariance matrix with improved numerical stability
                scaled_cov = (n+lambda_param) * covariance
                
                # Try Cholesky first, fall back to modified sqrtm if needed
                try:
                    L = np.linalg.cholesky(scaled_cov)
                except np.linalg.LinAlgError:
                    L = sqrtm(scaled_cov)
                
                sigma_0 = state_vector
                sigma_n = sigma_0[:, np.newaxis] + L
                sigma_2n = sigma_0[:, np.newaxis] - L
                
                all_sigma_points = np.vstack([
                                    sigma_0,
                                    sigma_n.T,
                                    sigma_2n.T
                                ])
                
                sigma_points_dict[jd] = {
                    'sigma_points': all_sigma_points,
                    'weights': weights,
                    'epoch': timestamp,
                    'state_vector': state_vector,
                    'covariance': covariance
                    }
                
            except Exception as e:
                print(f"Warning: Failed to process timestamp {timestamp}: {str(e)}")
                continue
        
        if not sigma_points_dict:
            raise ValueError("No sigma points were successfully generated")
        
        return sigma_points_dict
    
    except Exception as e:
        print(f"Error in generate_and_propagate_sigma_points: {str(e)}")
        return None



In [2]:

def create_chunked_krogh_interpolator(x: np.ndarray, y: np.ndarray, 
                                    chunk_size: int = 14, 
                                    overlap: int = 8):
    """
    Create a chunked Krogh interpolation function to avoid numerical instability
    when interpolating over many points.
    
    Args:
        x: Independent variable values (e.g., times)
        y: Dependent variable values to interpolate
        chunk_size: Number of points to use in each interpolation chunk
        overlap: Number of points to overlap between chunks
        
    Returns:
        List of interpolators and their valid ranges
    """
    if len(x) <= chunk_size:
        # If data is smaller than chunk size, just use a single interpolator
        interp = KroghInterpolator(x, y)
        return [{'interpolator': interp, 'range': (x[0], x[-1])}]
    
    # Split data into overlapping chunks
    interpolators = []
    
    i = 0
    while i < len(x):
        end_idx = min(i + chunk_size, len(x))
        chunk_x = x[i:end_idx]
        chunk_y = y[i:end_idx]
        
        # Create interpolator for this chunk
        interp = KroghInterpolator(chunk_x, chunk_y)
        
        # Record the valid range for this chunk (slightly narrower than the actual chunk)
        # to ensure smooth transitions between chunks
        if i == 0:
            # First chunk - use from beginning to just before end
            valid_range = (chunk_x[0], chunk_x[-2] if len(chunk_x) > 2 else chunk_x[-1])
        elif end_idx == len(x):
            # Last chunk - use from just after start to end
            valid_range = (chunk_x[1] if len(chunk_x) > 1 else chunk_x[0], chunk_x[-1])
        else:
            # Middle chunks - use middle portion, leaving overlap areas for adjacent chunks
            valid_range = (chunk_x[1] if len(chunk_x) > 1 else chunk_x[0], 
                          chunk_x[-2] if len(chunk_x) > 2 else chunk_x[-1])
        
        interpolators.append({'interpolator': interp, 'range': valid_range})
        
        # Move to next chunk with overlap, ensuring we make progress
        i = max(end_idx - overlap, i + 1)
    
    return interpolators
    
    # Create a function that selects the appropriate chunk and interpolates

def interpolate_sigma_pointsKI(sigma_points_dict):
    """
    Create high-precision interpolation splines for sigma point trajectories.
    """
    # Get and sort Julian dates with high precision
    julian_dates = np.array(sorted(sigma_points_dict.keys()), dtype=np.float64)
    
    n_sigma_points = 13
    
    # Initialize with explicit types
    positions_by_point = [[] for _ in range(n_sigma_points)]
    velocities_by_point = [[] for _ in range(n_sigma_points)]
    
    # Extract position and velocity components with high precision
    for jd in julian_dates:
        sigma_points = sigma_points_dict[jd]['sigma_points'].astype(np.float64)
        for i in range(n_sigma_points):
            positions_by_point[i].append(sigma_points[i][:3])
            velocities_by_point[i].append(sigma_points[i][3:])
    
    # Convert to numpy arrays with high precision
    positions_by_point = [np.array(pos, dtype=np.float64) for pos in positions_by_point]
    velocities_by_point = [np.array(vel, dtype=np.float64) for vel in velocities_by_point]
    
    position_splines = []
    velocity_splines = []
    
    
    for i in range(n_sigma_points):
        # Position splines
        pos_splines_i = []
        for j in range(3):
            pos_data = positions_by_point[i][:, j]
            valid_mask = np.isfinite(pos_data)
            if np.any(valid_mask):
                # Use not-a-knot cubic splines for better accuracy
                spline = create_chunked_krogh_interpolator(julian_dates[valid_mask], 
                                    pos_data[valid_mask],
                                   chunk_size= 14, 
                                    overlap= 8)
                pos_splines_i.append(spline)
            else:
                pos_splines_i.append(None)
        position_splines.append(pos_splines_i)
        
        # Velocity splines
        vel_splines_i = []
        for j in range(3):
            vel_data = velocities_by_point[i][:, j]
            valid_mask = np.isfinite(vel_data)
            if np.any(valid_mask):
                spline = create_chunked_krogh_interpolator(julian_dates[valid_mask], 
                                    vel_data[valid_mask],
                                   chunk_size= 14, 
                                    overlap= 8)
                vel_splines_i.append(spline)
            else:
                vel_splines_i.append(None)
        velocity_splines.append(vel_splines_i)
    
    return {
        'positions': position_splines,
        'velocities': velocity_splines,
        'time_range': (julian_dates[0], julian_dates[-1])
    }


def get_interpolated_sigma_points_KI(interpolated_splines,
                                  julian_date):
    """
    Get interpolated sigma points with high precision using Krogh interpolation.
    
    Selects the optimal interpolator when in overlap regions - preferring 
    interpolators where the point is in the middle of their range rather
    than at the edges.
    """
    start_time, end_time = interpolated_splines['time_range']
    if not (start_time <= julian_date <= end_time):
        raise ValueError(f"Requested time {julian_date} is outside the interpolation range "
                        f"[{start_time}, {end_time}]")
    
    n_sigma_points = 13
    interpolated_points = np.zeros((n_sigma_points, 6), dtype=np.float64)
    
    for i in range(n_sigma_points):
        # Interpolate positions with high precision
        for j in range(3):
            if interpolated_splines['positions'][i][j] is not None:
                splines = interpolated_splines['positions'][i][j]
                
                # Find all applicable splines for this time
                applicable_splines = []
                for idx, spline_info in enumerate(splines):
                    lower, upper = spline_info['range']
                    if lower <= julian_date <= upper:
                        # Calculate how central the point is within this spline's range
                        # (0.5 means it's in the middle, 0 or 1 means it's at an edge)
                        centrality = (julian_date - lower) / (upper - lower) 
                        # Prefer points that are more central (closer to 0.5)
                        score = 1 - abs(centrality - 0.5) * 2  # Converts to 0-1 scale where 1 is most central
                        applicable_splines.append((idx, score, spline_info))
                
                # If we found applicable splines, use the most central one
                if applicable_splines:
                    # Sort by score (descending)
                    applicable_splines.sort(key=lambda x: x[1], reverse=True)
                    best_spline = applicable_splines[0][2]
                    interpolated_points[i, j] = best_spline['interpolator'](julian_date)
                else:
                    # If no spline's range contains this point, use the closest one
                    if julian_date < start_time:
                        interpolated_points[i, j] = splines[0]['interpolator'](julian_date)
                    else:
                        interpolated_points[i, j] = splines[-1]['interpolator'](julian_date)
        
        # Interpolate velocities with high precision - using the same approach
        for j in range(3):
            if interpolated_splines['velocities'][i][j] is not None:
                splines = interpolated_splines['velocities'][i][j]
                
                # Find all applicable splines for this time
                applicable_splines = []
                for idx, spline_info in enumerate(splines):
                    lower, upper = spline_info['range']
                    if lower <= julian_date <= upper:
                        # Calculate how central the point is within this spline's range
                        centrality = (julian_date - lower) / (upper - lower) 
                        # Prefer points that are more central
                        score = 1 - abs(centrality - 0.5) * 2
                        applicable_splines.append((idx, score, spline_info))
                
                # If we found applicable splines, use the most central one
                if applicable_splines:
                    applicable_splines.sort(key=lambda x: x[1], reverse=True)
                    best_spline = applicable_splines[0][2]
                    interpolated_points[i, j+3] = best_spline['interpolator'](julian_date)
                else:
                    # If no spline's range contains this point, use the closest one
                    if julian_date < start_time:
                        interpolated_points[i, j+3] = splines[0]['interpolator'](julian_date)
                    else:
                        interpolated_points[i, j+3] = splines[-1]['interpolator'](julian_date)
    
    return interpolated_points

In [4]:
try:
    filename = 'SpaceX/MEME_47375_STARLINK-2094_0371242_Operational_1423140180_UNCLASSIFIED.txt'
    
    # Parse ephemeris file with validation
    print(f"Parsing ephemeris file: {filename}")
    data = parse_ephemeris_file(filename)
    if data is None:
        raise ValueError("Failed to parse ephemeris file")  

except Exception as e:
    print(f"Error in main: {str(e)}")
    raise



Parsing ephemeris file: SpaceX/MEME_47375_STARLINK-2094_0371242_Operational_1423140180_UNCLASSIFIED.txt


In [5]:
# Generate sigma points with validation
sigma_points_dict = generate_and_propagate_sigma_points(data)
if sigma_points_dict is None:
    raise ValueError("Failed to generate sigma points")

# Create interpolation splines
interpolated_splinesKI = interpolate_sigma_pointsKI(sigma_points_dict)


In [6]:
#just to get JDs
julian_dates = np.array([julian.to_jd(ts) for ts in data['timestamps']], dtype=np.float64)


In [7]:
#boundary of data 
jd_start=julian_dates[0]
jd_end=julian_dates[-1]

#any values between jd_start and jd_end
jd=julian_dates[0]+1/86400

# METHOD 2: Krogh interpolation (using sigma points)
interpolated_points = get_interpolated_sigma_points_KI(interpolated_splinesKI,jd )
interp_mean, interp_cov = reconstruct_covariance_at_time(interpolated_points)
print(interp_cov)

[[ 5.36212576e-07 -3.89360380e-07  2.24578081e-10  9.22153844e-10
  -4.70590643e-10  8.41867405e-13]
 [-3.89360380e-07  7.83264118e-07  6.12165133e-12 -9.04725435e-10
   4.12498662e-10 -3.22523119e-13]
 [ 2.24578081e-10  6.12165133e-12  1.26446529e-06 -9.82477462e-15
   1.60555565e-13  1.70171911e-09]
 [ 9.22153844e-10 -9.04725435e-10 -9.82477462e-15  2.09318813e-12
  -8.26358766e-13  6.18798637e-16]
 [-4.70590643e-10  4.12498662e-10  1.60555565e-13 -8.26358766e-13
   5.02552868e-13 -2.21761651e-16]
 [ 8.41867405e-13 -3.22523119e-13  1.70171911e-09  6.18798637e-16
  -2.21761651e-16  5.42035153e-12]]


In [8]:
#boundary of data 
jd_start=julian_dates[0]
jd_end=julian_dates[-1]

#any values between jd_start and jd_end
jd=julian_dates[0]

# METHOD 2: Krogh interpolation (using sigma points)
interpolated_points = get_interpolated_sigma_points_KI(interpolated_splinesKI,jd )
interp_mean, interp_cov = reconstruct_covariance_at_time(interpolated_points)
print(interp_cov)

[[ 5.35222150e-07 -3.88257092e-07  2.23386181e-10  9.20279894e-10
  -4.69679110e-10  8.42965307e-13]
 [-3.88257092e-07  7.81584930e-07  6.88366829e-12 -9.02407155e-10
   4.11433007e-10 -3.23976066e-13]
 [ 2.23386181e-10  6.88366829e-12  1.26106628e-06 -9.95123677e-15
   1.58979792e-13  1.69781253e-09]
 [ 9.20279894e-10 -9.02407155e-10 -9.95123677e-15  2.08934260e-12
  -8.24578970e-13  6.23262868e-16]
 [-4.69679110e-10  4.11433007e-10  1.58979792e-13 -8.24578970e-13
   5.01737831e-13 -2.26327602e-16]
 [ 8.42965307e-13 -3.23976066e-13  1.69781253e-09  6.23262868e-16
  -2.26327602e-16  5.42395019e-12]]
