In [2]:
import pandas as pd
from datetime import datetime, timedelta
import georinex as gr
import xarray as xr
from datetime import datetime
import numpy as np

# Define target satellites and epochs
target_sats = ['G02', 'G06', 'G11', 'G14', 'G17', 'G19', 'G30', 'R11', 'R12', 'R22']
start_time = datetime(2022, 1, 20, 1, 0, 0)
end_time = datetime(2022, 1, 20, 7, 0, 0)
epochs = [start_time + timedelta(minutes=5*i) for i in range(72)]  # 01:00:00 to 07:00:00
epoch_times = [t.strftime('%H:%M:%S') for t in epochs]

In [3]:
rinex = gr.load('Data/C2110200.22O')
sp3 = gr.load('Data/COD0MGXFIN_20220200000_01D_05M_ORB.SP3')

In [4]:
# Function to parse IGS CLK file
def parse_clk_to_xarray(file_path):
    clk_data = []
    sv_list = []
    
    try:
        with open(file_path, 'r', encoding='ascii', errors='ignore') as f:
            lines = f.readlines()
            
            # Parse header for satellite list
            header = True
            for line in lines:
                if 'END OF HEADER' in line:
                    header = False
                    break
                if 'PRN LIST' in line:
                    parts = line.split()
                    sv_list.extend(parts[2:])  # Skip first two columns (e.g., 'C06', 'C07', ...)
            
            # Parse data lines for satellite clocks (AS)
            for line in lines:
                if not header and line.startswith('AS'):  # Satellite clock data
                    parts = line.split()
                    sat = parts[1]
                    if sat in sv_list:  # Only include satellites from PRN LIST
                        year, month, day, hour, minute, second = map(int, map(float, parts[2:8]))
                        clk_offset = float(parts[9])  # Clock bias in seconds
                        time = datetime(year, month, day, hour, minute, int(float(parts[7])))
                        clk_data.append({'sv': sat, 'time': time, 'clk': clk_offset})
            
            if not clk_data:
                raise ValueError("No satellite clock data (AS lines) found in CLK file.")
            
            # Convert to DataFrame
            df = pd.DataFrame(clk_data)
            df['time'] = pd.to_datetime(df['time'])
            
            # Pivot to wide format (time x sv)
            df_pivot = df.pivot(index='time', columns='sv', values='clk')
            
            # Convert to xarray with explicit coordinates
            clk_ds = xr.Dataset(
                data_vars={'clk': (['time', 'sv'], df_pivot.values)},
                coords={'time': df_pivot.index, 'sv': df_pivot.columns}
            )
            
            return clk_ds
    
    except Exception as e:
        print(f"Error parsing CLK file: {e}")
        return None

# Example usage with your file
clk_file = 'Data/COD0MGXFIN_20220200000_01D_30S_CLK.CLK'
clk_ds = parse_clk_to_xarray(clk_file)
clock = clk_ds


"""""
if clk_ds is not None:
    # Inspect
    print("CLK Dataset:")
    print(clk_ds)
    print("\nCLK Variables:", list(clk_ds.variables))
    print("CLK Dimensions:", clk_ds.dims)
    
    # Filter and interpolate to 5-minute epochs
    start_time = datetime(2022, 1, 20, 1, 0, 0)
    end_time = datetime(2022, 1, 20, 7, 0, 0)
    epochs = pd.date_range(start=start_time, end=end_time, freq='5min')
    clk_5min = clk_ds.interp(time=epochs)
    
    print("\nInterpolated CLK (01:00:00 sample):")
    print(clk_5min['clk'].sel(time='2022-01-20 01:00:00'))
else:
    print("Falling back to simulated CLK data.")
"""


'""\nif clk_ds is not None:\n    # Inspect\n    print("CLK Dataset:")\n    print(clk_ds)\n    print("\nCLK Variables:", list(clk_ds.variables))\n    print("CLK Dimensions:", clk_ds.dims)\n    \n    # Filter and interpolate to 5-minute epochs\n    start_time = datetime(2022, 1, 20, 1, 0, 0)\n    end_time = datetime(2022, 1, 20, 7, 0, 0)\n    epochs = pd.date_range(start=start_time, end=end_time, freq=\'5min\')\n    clk_5min = clk_ds.interp(time=epochs)\n    \n    print("\nInterpolated CLK (01:00:00 sample):")\n    print(clk_5min[\'clk\'].sel(time=\'2022-01-20 01:00:00\'))\nelse:\n    print("Falling back to simulated CLK data.")\n'

In [5]:
import numpy as np
import xarray as xr
import pandas as pd
from datetime import datetime, timedelta

# Constants
c = 299792458  # Speed of light (m/s)
initial_guess = np.array([4750000.0, -5050000.0, -350000.0, 0.0])  # [x, y, z, clock_bias]

# Geometry matrix computation
def compute_geometry_matrix(rec_pos, sat_pos):
    G = np.zeros((len(sat_pos), 4))
    for i, pos in enumerate(sat_pos):
        dx = pos[0] - rec_pos[0]
        dy = pos[1] - rec_pos[1]
        dz = pos[2] - rec_pos[2]
        r = np.sqrt(dx**2 + dy**2 + dz**2)
        if r < 1e-10: r = 1e-10
        G[i, 0] = -dx / r
        G[i, 1] = -dy / r
        G[i, 2] = -dz / r
        G[i, 3] = c
    return G

# Predicted pseudoranges computation
def compute_predicted_pseudoranges(rec_pos, sat_pos, sat_clocks, rec_clock):
    predicted = np.zeros(len(sat_pos))
    for i, pos in enumerate(sat_pos):
        r = np.sqrt((pos[0] - rec_pos[0])**2 + (pos[1] - rec_pos[1])**2 + (pos[2] - rec_pos[2])**2)
        predicted[i] = r + c * (rec_clock - sat_clocks[i])
    return predicted

# Least squares position solution
def position_least_squares(pseudoranges, sat_positions, sat_clock_biases, initial_guess, max_iter=10, tol=0.1):
    x = initial_guess.copy()
    for iteration in range(max_iter):
        predicted = compute_predicted_pseudoranges(x[:3], sat_positions, sat_clock_biases, x[3] / c)
        residuals = pseudoranges - predicted
        G = compute_geometry_matrix(x[:3], sat_positions)
        rank = np.linalg.matrix_rank(G)
        if rank < 4:
            print(f"Iteration {iteration + 1}: Rank = {rank} (insufficient satellites)")
            return None, None
        delta_x, _, _, _ = np.linalg.lstsq(G, residuals, rcond=None)
        x += delta_x
        if np.linalg.norm(delta_x[:3]) < tol:
            print(f"Converged after {iteration + 1} iterations")
            break
    return x[:3], x[3] / c

# Main processing function for xarray data
def process_epochs(rinex_ds, sp3_ds, clk_ds, start_time, end_time, interval_minutes=5):
    results = []
    
    # Generate time range
    current_time = start_time
    while current_time <= end_time:
        # Extract RINEX pseudoranges (C1) for the current epoch
        rinex_epoch = rinex_ds.sel(time=current_time, method='nearest')
        pseudoranges = rinex_epoch['C1'].values
        valid_svs = rinex_epoch['sv'].values[~np.isnan(pseudoranges)]
        pseudoranges = pseudoranges[~np.isnan(pseudoranges)]
        
        if len(pseudoranges) < 4:
            print(f"Skipping {current_time} - insufficient valid pseudoranges ({len(pseudoranges)})")
            current_time += timedelta(minutes=interval_minutes)
            continue
        
        # Find common satellites across RINEX, SP3, and CLK
        sp3_svs = sp3_ds['sv'].values
        clk_svs = clk_ds['sv'].values
        common_svs = np.intersect1d(np.intersect1d(valid_svs, sp3_svs), clk_svs)
        
        if len(common_svs) < 4:
            print(f"Skipping {current_time} - insufficient common satellites ({len(common_svs)})")
            current_time += timedelta(minutes=interval_minutes)
            continue
        
        # Filter data to common satellites
        rinex_mask = np.isin(valid_svs, common_svs)
        pseudoranges = pseudoranges[rinex_mask]
        valid_svs = valid_svs[rinex_mask]
        
        # Extract SP3 satellite positions for common satellites
        sp3_epoch = sp3_ds.sel(time=current_time, method='nearest')
        sat_positions = sp3_epoch['position'].sel(sv=common_svs, ECEF=['x', 'y', 'z']).values * 1000  # km to m
        
        # Extract CLK satellite clock biases for common satellites
        clk_epoch = clk_ds.sel(time=current_time, method='nearest')
        sat_clock_biases = clk_epoch['clk'].sel(sv=common_svs).values
        
        # Verify data alignment
        if len(sat_positions) != len(pseudoranges) or len(sat_clock_biases) != len(pseudoranges):
            print(f"Skipping {current_time} - data mismatch (sats: {len(sat_positions)}, pr: {len(pseudoranges)}, clk: {len(sat_clock_biases)})")
            current_time += timedelta(minutes=interval_minutes)
            continue
        
        # Calculate position
        estimated_pos, clock_bias = position_least_squares(
            pseudoranges, sat_positions, sat_clock_biases, initial_guess
        )
        
        if estimated_pos is not None:
            r = np.sqrt(np.sum(estimated_pos**2))
            altitude = (r - 6378000) / 1000  # Earth radius ~6378 km
            
            result = {
                'timestamp': current_time,
                'x': estimated_pos[0],
                'y': estimated_pos[1],
                'z': estimated_pos[2],
                'clock_bias_us': clock_bias * 1e6,
                'distance_from_center': r,
                'altitude_km': altitude
            }
            results.append(result)
        else:
            print(f"Failed to compute position for {current_time}")
        
        current_time += timedelta(minutes=interval_minutes)
    
    # Convert results to a pandas DataFrame
    return pd.DataFrame(results)

# Define time range for January 20, 2022
start_time = datetime(2022, 1, 20, 1, 5, 0)
end_time = datetime(2022, 1, 20, 7, 0, 0)

# Process all epochs (replace with your actual xarray variables: rinex, sp3, clock)
results_df = process_epochs(rinex, sp3, clock, start_time, end_time, interval_minutes=5)

# Display results as a pandas DataFrame
if not results_df.empty:
    print("\nC211 Satellite Positions (ECEF) from 01:05:00 to 07:00:00 on 2022-01-20:")
    # Format the DataFrame for nicer output
    pd.set_option('display.float_format', '{:.2f}'.format)
    print(results_df.to_string(index=False))
else:
    print("No valid positions computed")

Converged after 4 iterations
Converged after 4 iterations
Converged after 5 iterations
Converged after 4 iterations
Converged after 5 iterations
Converged after 5 iterations
Converged after 5 iterations
Converged after 5 iterations
Converged after 5 iterations
Converged after 5 iterations
Converged after 5 iterations
Converged after 5 iterations
Converged after 5 iterations
Converged after 5 iterations
Converged after 5 iterations
Converged after 5 iterations
Converged after 5 iterations
Converged after 5 iterations
Converged after 4 iterations
Converged after 4 iterations
Converged after 4 iterations
Converged after 4 iterations
Converged after 4 iterations
Converged after 4 iterations
Converged after 5 iterations
Converged after 5 iterations
Converged after 5 iterations
Converged after 6 iterations
Converged after 6 iterations
Converged after 5 iterations
Converged after 5 iterations
Converged after 6 iterations
Converged after 6 iterations
Converged after 6 iterations
Converged afte

In [8]:
import numpy as np
import pandas as pd
from scipy.signal import find_peaks

# Constants
mu = 3.986004418e14  # Earth's gravitational parameter (m^3/s^2)

# Theoretical period using Kepler's Third Law
def calculate_theoretical_period(avg_distance):
    a = avg_distance  # Semi-major axis in meters (assuming circular orbit)
    T_seconds = 2 * np.pi * np.sqrt(a**3 / mu)  # Period in seconds
    T_minutes = T_seconds / 60  # Convert to minutes
    return T_seconds, T_minutes

# Actual period using data (based on radial distance peaks)
def calculate_actual_period(df):
    # Convert timestamps to seconds since start for numerical analysis
    time_seconds = (df['timestamp'] - df['timestamp'].iloc[0]).dt.total_seconds().values
    radial_distance = df['distance_from_center'].values
    
    # Find peaks in radial distance (indicating orbital cycles)
    peaks, _ = find_peaks(radial_distance, distance=10)  # Min distance ~50 min (10 epochs at 5 min)
    
    if len(peaks) < 2:
        print("Warning: Not enough peaks detected for period estimation.")
        return None, None
    
    # Calculate average time between peaks
    peak_times = time_seconds[peaks]
    periods = np.diff(peak_times)
    avg_period_seconds = np.mean(periods)
    avg_period_minutes = avg_period_seconds / 60
    
    return avg_period_seconds, avg_period_minutes

# Assuming your DataFrame is named 'results_df'
# Replace 'results_df' with your actual DataFrame name if different

# Calculate theoretical period
avg_distance = results_df['distance_from_center'].mean()
theoretical_seconds, theoretical_minutes = calculate_theoretical_period(avg_distance)

# Calculate actual period
actual_seconds, actual_minutes = calculate_actual_period(results_df)

# Print comparison
print("\nOrbital Period Comparison for C211 Satellite:")
print(f"Average Distance from Earth's Center: {avg_distance:.2f} m")
print("\nTheoretical Period (Newton's Law):")
print(f"  {theoretical_seconds:.2f} seconds")
print(f"  {theoretical_minutes:.2f} minutes")
print("\nActual Period (Estimated from Data):")
if actual_seconds is not None:
    print(f"  {actual_seconds:.2f} seconds")
    print(f"  {actual_minutes:.2f} minutes")
    print(f"\nDifference (Actual - Theoretical):")
    print(f"  {(actual_seconds - theoretical_seconds):.2f} seconds")
    print(f"  {(actual_minutes - theoretical_minutes):.2f} minutes")
else:
    print("  Could not estimate actual period due to insufficient data peaks.")


Orbital Period Comparison for C211 Satellite:
Average Distance from Earth's Center: 6917263.94 m

Theoretical Period (Newton's Law):
  5725.49 seconds
  95.42 minutes

Actual Period (Estimated from Data):
  5800.00 seconds
  96.67 minutes

Difference (Actual - Theoretical):
  74.51 seconds
  1.24 minutes
