In [1]:
import requests
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import re
from datetime import datetime, timedelta
from sgp4.api import Satrec, WGS72, jday
import nbformat
import time      
import random 
import sqlite3

In [2]:
from dateutil.relativedelta import relativedelta

In [3]:
# Set the information needed to log into Space-Track and acess the query page
USERNAME = "EMAIL"
PASSWORD = "PASSWORD"

LOGIN_URL = "https://www.space-track.org/ajaxauth/login"

In [4]:
EARTH_RADIUS = 6371
SNAPSHOT_TIME = datetime(2025, 1, 20, 0, 0, 0)
DURATION_MINUTES = 1440  # 24 hours
STEP_SECONDS = 60
ORBIT_SUBSET = 5

In [6]:
# -------------------------------------------------------------------
# 1) Log into Space-Track and retrieve TLEs
# -------------------------------------------------------------------

def fetch_tle_text(username, password, query_url):
    """
    Logs into Space-Track using the provided credentials
    and performs a GET request on the specified query_url.
    Returns raw TLE text in 3-line format if successful.
    """
    with requests.Session() as session:
        # Log in
        login_data = {"identity": username, "password": password}
        login_resp = session.post(LOGIN_URL, data=login_data)
        login_resp.raise_for_status()
        
        # Perform the query
        resp = session.get(query_url)
        resp.raise_for_status()
        
        return resp.text

# -------------------------------------------------------------------
# 2) Parse raw TLE text into a DataFrame with Name, Line1, Line2
# -------------------------------------------------------------------

def parse_tle_to_dataframe(tle_text):
    lines = [line.strip() for line in tle_text.strip().splitlines() if line.strip()]
    satellites = []

    if len(lines) % 3 != 0:
        print("Warning: The number of lines is not a multiple of 3. Some TLE entries may be incomplete.")

    for i in range(0, len(lines) - 2, 3):
        name_line = lines[i]
        line1 = lines[i + 1]
        line2 = lines[i + 2]

        if not name_line.startswith("0"):
            print(f"Unexpected format in name line: '{name_line}'. Skipping.")
            continue
        if not line1.startswith("1"):
            print(f"Unexpected format in Line 1: '{line1}'. Skipping.")
            continue
        if not line2.startswith("2"):
            print(f"Unexpected format in Line 2: '{line2}'. Skipping.")
            continue

        name = name_line[1:].strip()  # Remove leading "0"
        satellites.append({"Name": name, "Line1": line1, "Line2": line2})

    return pd.DataFrame(satellites, columns=["Name", "Line1", "Line2"])

# -------------------------------------------------------------------
# 3) Extract extra TLE fields
# -------------------------------------------------------------------

def extract_launch_year(line1):
    """
    Extract 4-digit launch year from TLE's Line1 (international designator).
    """
    try:
        parts = line1.split()
        int_designator = parts[2]
        yy = int(int_designator[:2])
        # TLE convention: if year >= 57 => 19xx, else 20xx
        if yy >= 57:
            return 1900 + yy
        else:
            return 2000 + yy
    except (IndexError, ValueError) as e:
        print(f"Error extracting launch year from Line1: '{line1}'. Error: {e}")
        return None

def extract_tle_epoch(line1):
    """
    Extract datetime from TLE epoch encoded in Line1.
    """
    try:
        match = re.search(r'(\d{2})(\d{3}\.\d+)', line1)
        if not match:
            return None
        
        year = int(match.group(1))
        day_of_year = float(match.group(2))

        if year >= 57:
            year += 1900
        else:
            year += 2000

        base_date = datetime(year, 1, 1)
        epoch_date = base_date + timedelta(
            days=int(day_of_year) - 1,
            seconds=(day_of_year % 1) * 86400
        )
        return epoch_date
    except Exception as e:
        print(f"Error extracting epoch: {e}")
        return None

# -------------------------------------------------------------------
# 4) Propagate orbits via SGP4 (x, y, z in km)
# -------------------------------------------------------------------

def jday(year, mon, day, hr, minute, sec):
    """
    Helper from sgp4.
    Given year, month, day, hour, minute, second -> Julian day, fraction
    """
    # Based on original sgp4.py jday() approach
    # This is typically available in sgp4.api, but we replicate for clarity
    jd = (367.0 * year
          - int((7 * (year + int((mon + 9) / 12.0))) * 0.25)
          + int(275 * mon / 9.0)
          + day + 1721013.5)
    frac = (hr + minute/60.0 + sec/3600.0) / 24.0
    return jd, frac

def get_satellite_position(line1, line2, when):
    """
    Returns the satellite's ECI position [km] (x,y,z) at the given datetime 'when'.
    """
    try:
        sat = Satrec.twoline2rv(line1, line2, WGS72)
        jd, fr = jday(
            when.year, when.month, when.day,
            when.hour, when.minute, when.second + when.microsecond * 1e-6
        )
        error_code, r, v = sat.sgp4(jd, fr)
        if error_code != 0:
            print(f"SGP4 Error {error_code} for Satellite: {sat.satnum}")
            return None
        return r  # [x, y, z] in km
    except Exception as e:
        print(f"Exception during propagation: {e}")
        return None

def get_satellite_tracks(line1, line2, start_datetime, duration_minutes=90, step_seconds=30):
    """
    Propagate satellite positions over a duration, returning lists of x, y, z in km.
    """
    sat = Satrec.twoline2rv(line1, line2, WGS72)
    positions_x, positions_y, positions_z = [], [], []

    total_steps = int((duration_minutes * 60) / step_seconds)
    for step in range(total_steps + 1):
        current_time = start_datetime + timedelta(seconds=step * step_seconds)
        jd, fr = jday(
            current_time.year, current_time.month, current_time.day,
            current_time.hour, current_time.minute, current_time.second + current_time.microsecond * 1e-6
        )
        e, r, v = sat.sgp4(jd, fr)
        if e == 0:
            positions_x.append(r[0])
            positions_y.append(r[1])
            positions_z.append(r[2])
        else:
            positions_x.append(np.nan)
            positions_y.append(np.nan)
            positions_z.append(np.nan)

    return positions_x, positions_y, positions_z

# -------------------------------------------------------------------
# 5) Parameterized function to pull TLE data for any query URL
# -------------------------------------------------------------------

def pull_tle_data_for_query(username, password, query_url):
    """
    Fetch TLE data from a custom query URL, parse, and return
    a DataFrame plus position arrays, tracks, etc.

    Returns:
      filtered_df (DataFrame): columns Name, Line1, Line2, Launch Year, Epoch
      xs, ys, zs (list): positions at single snapshot time
      launch_years (list): repeated from df
      orbits_data (list of dicts): track data for the first ORBIT_SUBSET satellites
    """
    # 1) Fetch
    tle_text = fetch_tle_text(username, password, query_url=query_url)

    # 2) Parse
    df = parse_tle_to_dataframe(tle_text)

    # 3) Enrich with Launch Year & Epoch
    df['Launch Year'] = df['Line1'].apply(extract_launch_year)
    df['Epoch'] = df['Line1'].apply(extract_tle_epoch)

    # For historical data, we won't filter by recency
    filtered_df = df

    # 5) Get x, y, z for each satellite at a single snapshot time
    xs, ys, zs = [], [], []
    launch_years = []
    for _, row in filtered_df.iterrows():
        pos = get_satellite_position(row['Line1'], row['Line2'], SNAPSHOT_TIME)
        if pos is not None:
            xs.append(pos[0])
            ys.append(pos[1])
            zs.append(pos[2])
            launch_years.append(row['Launch Year'])
        else:
            xs.append(np.nan)
            ys.append(np.nan)
            zs.append(np.nan)
            launch_years.append(np.nan)

    # 6) Generate orbital tracks for a subset (optional)
    # This is separate "time series" data
    subset_df = filtered_df.head(ORBIT_SUBSET)
    orbits_data = []
    for idx, row in subset_df.iterrows():
        track_x, track_y, track_z = get_satellite_tracks(
            row['Line1'], row['Line2'],
            SNAPSHOT_TIME,
            duration_minutes=DURATION_MINUTES,
            step_seconds=STEP_SECONDS
        )
        orbits_data.append({
            'Index': idx,
            'Name': row['Name'],
            'Launch Year': row['Launch Year'],
            'X_track': track_x,
            'Y_track': track_y,
            'Z_track': track_z
        })

    return filtered_df, xs, ys, zs, launch_years, orbits_data

# -------------------------------------------------------------------
# 6) Utility function to build monthly Space-Track URLs
# -------------------------------------------------------------------

def build_monthly_query_url(start_date, end_date):
    """
    Builds a Space-Track query URL to get all TLEs (class=tle_archive)
    with EPOCH between start_date and end_date.
    """
    start_str = start_date.strftime("%Y-%m-%d")
    end_str   = end_date.strftime("%Y-%m-%d")

    base_url = "https://www.space-track.org/basicspacedata/query/class/gp/"
    # Using range syntax: EPOCH/START--END
    query_url = (
        f"{base_url}"
        f"EPOCH/{start_str}--{end_str}/"
        "orderby/EPOCH%20asc/format/3le"
    )
    return query_url

# -------------------------------------------------------------------
# 7) Main loop: fetch monthly TLE data, THROTTLED
# -------------------------------------------------------------------

def main():
    """
    Runs from 2000-01-01 up to now, one month at a time.
    Returns a single DataFrame with all satellite records
    (including single-snapshot positions X, Y, Z).
    """
    current_date = datetime(2000, 1, 1)
    last_date = datetime.now()

    # We'll accumulate each month's data in a list
    all_data_months = []

    while current_date < last_date:
        month_start = current_date.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
        next_month_start = month_start + relativedelta(months=1)

        query_url = build_monthly_query_url(month_start, next_month_start)

        # ----- Perform the query for this month -----
        df, xs, ys, zs, launch_years, orbits_data = pull_tle_data_for_query(
            USERNAME,
            PASSWORD,
            query_url
        )
        print(f"Month: {month_start.strftime('%Y-%m')} -> TLE DF size: {len(df)}")

        # Combine monthly TLE df with the snapshot position columns
        # so we keep them in one DataFrame
        month_df = df.copy()  # start with Name, Line1, Line2, Launch Year, Epoch
        month_df['X'] = xs
        month_df['Y'] = ys
        month_df['Z'] = zs
        # We also record which snapshot time we used for these positions
        month_df['Snapshot_Time'] = SNAPSHOT_TIME

        # Now store this monthly DataFrame in a list
        all_data_months.append(month_df)

        # ------------- IMPORTANT THROTTLING -------------
        time.sleep(60 + random.random())  # to keep well below 30 requests/min

        current_date = next_month_start

    # After the loop, combine all months into a single DataFrame
    if all_data_months:
        final_df = pd.concat(all_data_months, ignore_index=True)
    else:
        # Edge case if no data was fetched
        final_df = pd.DataFrame(columns=["Name","Line1","Line2","Launch Year",
                                         "Epoch","X","Y","Z","Snapshot_Time"])

    # Return the final DataFrame so the caller can save it or process it further
    return final_df

In [7]:
big_df = main()

print("Total rows in final DataFrame:", len(big_df))
# For demonstration, let's save it as CSV:
big_df.to_csv("all_tle_snapshots.csv", index=False)
print("Saved all TLE snapshots to CSV: all_tle_snapshots.csv")

# If you also want an SQL approach:
print("Creating SQLite table from the DataFrame...")
conn = sqlite3.connect("tle_snapshots.db")
big_df.to_sql("satellite_positions", conn, if_exists="replace", index=False)
conn.close()
print("Wrote 'satellite_positions' table to tle_snapshots.db")

SGP4 Error 1 for Satellite: 24620
SGP4 Error 1 for Satellite: 23575
SGP4 Error 1 for Satellite: 7847
SGP4 Error 1 for Satellite: 22564
SGP4 Error 1 for Satellite: 5999
SGP4 Error 1 for Satellite: 25663
SGP4 Error 1 for Satellite: 25796
SGP4 Error 1 for Satellite: 1962
SGP4 Error 1 for Satellite: 7842
SGP4 Error 1 for Satellite: 24240
SGP4 Error 1 for Satellite: 24111
SGP4 Error 1 for Satellite: 26044
SGP4 Error 1 for Satellite: 7969
SGP4 Error 1 for Satellite: 24368
SGP4 Error 1 for Satellite: 25950
SGP4 Error 1 for Satellite: 22346
SGP4 Error 1 for Satellite: 24024
SGP4 Error 1 for Satellite: 26055
SGP4 Error 1 for Satellite: 20759
SGP4 Error 1 for Satellite: 13964
SGP4 Error 1 for Satellite: 16885
SGP4 Error 1 for Satellite: 25220
Month: 2000-01 -> TLE DF size: 23
SGP4 Error 1 for Satellite: 24614
SGP4 Error 1 for Satellite: 25176
SGP4 Error 1 for Satellite: 26068
SGP4 Error 1 for Satellite: 25858
SGP4 Error 1 for Satellite: 24331
SGP4 Error 1 for Satellite: 22890
SGP4 Error 1 for Sa