In [1]:
from skyfield.api import load
from skyfield.api import wgs84
import numpy as np
from hilbertcurve.hilbertcurve import HilbertCurve
import psycopg2
import os
from datetime import datetime

# Load TLE data
def get_starlink_satellites(url="https://celestrak.org/NORAD/elements/gp.php?GROUP=starlink&FORMAT=tle"):
    print("Loading TLE data...")
    satellites = load.tle_file(url)
    satellites = [{"sat":s} for s in satellites]
    print(f"Loaded {len(satellites)} Starlink satellites.")
    return satellites

# Connect to the PostgreSQL database
def connect_to_db():
    conn = psycopg2.connect(
        dbname="minecraftindex",
        user="kyjohnso",
        password="Password",
        host='localhost',  # Use the service name from docker-compose.yml
        port=5432
    )
    return conn

# Compute ECEF XYZ positions
def compute_ecef_positions(satellites):
    ts = load.timescale()
    now = ts.now()

    for sat in satellites:
        try:
            geocentric = sat["sat"].at(now)
            subpoint = wgs84.subpoint(geocentric)

            # Get ECEF XYZ coordinates
            sat.update({
                "scc": sat["sat"].model.satnum,
                "name": sat["sat"].name,
                "ecef": geocentric.position.m,
                "timestamp": now.utc_datetime()
            })
        except Exception as e:
            print(f"Error computing position for {sat.name}: {e}")

    return satellites

# Insert satellite data into the database
def ecef_points_to_hilbert(ecef,hilbert_dict):
    
    p=hilbert_dict.get("p",np.ceil(np.log2(hilbert_dict["world_size"]/hilbert_dict["cell_size"])))
    hilbert_dict["p"] = p
    ecef_scale = (
        ecef + hilbert_dict["world_size"]/2
    )/hilbert_dict["cell_size"]
    ecef_scale_int = ecef_scale.astype(np.int64)

    hilbert_curve = HilbertCurve(
        n=hilbert_dict.get("n",3),
        p=p
    )

    return hilbert_curve.distances_from_points(ecef_scale_int)

def hilbert_to_ecef_points(hilbert,hilbert_dict):
    p=hilbert_dict.get("p",np.ceil(np.log2(hilbert_dict["world_size"]/hilbert_dict["cell_size"])))
    
    hilbert_curve = HilbertCurve(
        n=hilbert_dict.get("n",3),
        p=p
    )
    
    ecef_scale = hilbert_curve.points_from_distances(hilbert)

    return np.array(ecef_scale) * hilbert_dict["cell_size"] - hilbert_dict["world_size"]/2

def add_hilbert_to_satellites(satellites,hilbert):
    for i in range(len(satellites)):
        satellites[i]["hilbert_index"]=hilbert[i]
    return satellites

def create_satellites_table(conn):
    cursor = conn.cursor()

    # Create the table if it doesn't exist
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS satellites (
            id SERIAL PRIMARY KEY,                   -- Auto-incrementing primary key
            satnum INT NOT NULL,                     -- Satellite number (cannot be NULL)
            ecef GEOMETRY(POINTZ, 4978) NOT NULL,    -- 3D ECEF coordinates (cannot be NULL)
            observation_time TIMESTAMP NOT NULL,     -- Observation timestamp (cannot be NULL)
            name TEXT NOT NULL,                      -- Satellite name (cannot be NULL)
            hilbert_index BIGINT NOT NULL,           -- Hilbert curve index (cannot be NULL),
            CONSTRAINT satnum_time_hilbert_unique UNIQUE (satnum, observation_time, hilbert_index) -- Optional uniqueness constraint
        );
    """)
    conn.commit()
    cursor.execute("""
            CREATE INDEX IF NOT EXISTS satnum_time_idx_compound
            ON satellites (satnum, observation_time, hilbert_index);
    """)
    conn.commit()
    cursor.execute("""
            CREATE INDEX IF NOT EXISTS time_idx_compound
            ON satellites (observation_time, hilbert_index);
    """)
    conn.commit()
    cursor.execute("""
            CREATE INDEX IF NOT EXISTS satnum_idx_compound
            ON satellites (satnum, hilbert_index);
    """)
    conn.commit()
    cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx
            ON satellites (hilbert_index);
    """)
    conn.commit()

def drop_satellites_table(connection):
    """
    Drops the satellites table if it exists in a PostgreSQL database.

    Args:
        connection (psycopg2.extensions.connection): Active database connection.

    Returns:
        None
    """
    drop_table_query = "DROP TABLE IF EXISTS satellites CASCADE;"
    
    try:
        with connection.cursor() as cursor:
            # Execute the drop table query
            cursor.execute(drop_table_query)
            # Commit the transaction
            connection.commit()
            print("Satellites table dropped successfully.")
    except Exception as e:
        connection.rollback()
        print(f"Error dropping satellites table: {e}")

def create_hilbert_points_table(connection):
    """
    Creates the hilbert_points table with hilbert_index as the primary key
    and a corresponding 3D point entry.

    Args:
        connection (psycopg2.extensions.connection): Active database connection.

    Returns:
        None
    """
    create_table_query = """
    CREATE TABLE IF NOT EXISTS hilbert_points (
        hilbert_index BIGINT PRIMARY KEY,       -- Hilbert index as the primary key
        point GEOMETRY(POINTZ, 4978) NOT NULL  -- 3D point (ECEF coordinates) with SRID 4978
    );
    """
    try:
        with connection.cursor() as cursor:
            # Execute the create table query
            cursor.execute(create_table_query)
            # Commit the transaction
            connection.commit()
            print("Hilbert points table created successfully.")
    except Exception as e:
        connection.rollback()
        print(f"Error creating hilbert points table: {e}")

def drop_hilbert_points_table(connection):
    """
    Drops the hilbert_points table if it exists in the database.

    Args:
        connection (psycopg2.extensions.connection): Active database connection.

    Returns:
        None
    """
    drop_table_query = "DROP TABLE IF EXISTS hilbert_points CASCADE;"
    
    try:
        with connection.cursor() as cursor:
            # Execute the drop table query
            cursor.execute(drop_table_query)
            # Commit the transaction
            connection.commit()
            print("Hilbert points table dropped successfully.")
    except Exception as e:
        connection.rollback()
        print(f"Error dropping hilbert points table: {e}")

def insert_hilbert_points(hilbert_points, connection):
    """
    Inserts a list of hilbert points into the hilbert_points table.

    Args:
        connection (psycopg2.extensions.connection): Active database connection.
        hilbert_points (list of dict): List of dictionaries containing:
            - hilbert (int): Hilbert index.
            - ecef (tuple): Tuple of ECEF coordinates (x, y, z) in meters.

    Returns:
        None
    """
    insert_query = """
    INSERT INTO hilbert_points (hilbert_index, point)
    VALUES (%s, ST_SetSRID(ST_MakePoint(%s, %s, %s), 4978))
    ON CONFLICT (hilbert_index) DO NOTHING;
    """
    try:
        with connection.cursor() as cursor:
            # Prepare data for batch insertion
            data = [
                (
                    point["hilbert"], 
                    str(point["ecef"][0]), 
                    str(point["ecef"][1]), 
                    str(point["ecef"][2]),
                )
                for point in hilbert_points
            ]
            # Execute batch insertion
            cursor.executemany(insert_query, data)
            # Commit the transaction
            connection.commit()
            print(f"{len(hilbert_points)} hilbert points inserted successfully.")
    except Exception as e:
        connection.rollback()
        print(f"Error inserting hilbert points: {e}")


def insert_satellites(satellites, connection):

    with connection.cursor() as cursor:
        # Insert the data
        for sat in satellites:
            insert_query = """
                INSERT INTO satellites (satnum, ecef, observation_time, name, hilbert_index)
                VALUES (%s, ST_SetSRID(ST_MakePoint(%s, %s, %s), 4978), %s, %s, %s);
                """
            cursor.execute(
                insert_query, 
                (
                    sat["scc"], 
                    str(sat["ecef"][0]), 
                    str(sat["ecef"][1]), 
                    str(sat["ecef"][2]), 
                    sat["timestamp"], 
                    sat["name"], 
                    sat["hilbert_index"],
                )
            )
        connection.commit()
    cursor.close()
    print("Satellite data inserted into the database.")

def main():
    # Load satellite data
    sats = get_starlink_satellites()

    # Compute ECEF positions
    sats = compute_ecef_positions(sats)

    # compute hilbert index
    world_size = 80_000_000
    cell_size= 10_000
    
    hilbert_dict = {
        "world_size":world_size,
        "cell_size":cell_size,
        "n":3,
    }
    hilbert = ecef_points_to_hilbert(np.array([s["ecef"] for s in sats]),hilbert_dict)
    sats = add_hilbert_to_satellites(sats,hilbert)

    hilbert_ecef = hilbert_to_ecef_points(hilbert,hilbert_dict)
    hilbert_points = [{"hilbert":hilbert[i],"ecef":hilbert_ecef[i]} for i in range(len(hilbert))]
    
    # Connect to the database
    conn = connect_to_db()

    # Insert hilbert index into the database
    insert_hilbert_points(hilbert_points,conn)
    
    # Insert data into the database
    insert_satellites(sats, conn)

    # Close the connection
    conn.close()

In [3]:
def query_hilbert_points(connection, query_vector):
    """
    Query the database to find hilbert_index values where the dot product
    of (point - query_vector) with query_vector is positive.

    Args:
        connection: psycopg2 connection object.
        query_vector: Tuple of (x, y, z) coordinates for the query vector.

    Returns:
        List of hilbert_index values.
    """
    query = """
        WITH query_vector AS (
            SELECT %s AS x, %s AS y, %s AS z  -- Query vector components
        ),
        dot_product_results AS (
            SELECT 
                hilbert_index,
                (
                    (ST_X(point) - query_vector.x) * query_vector.x +
                    (ST_Y(point) - query_vector.y) * query_vector.y +
                    (ST_Z(point) - query_vector.z) * query_vector.z
                ) AS dot_product
            FROM hilbert_points, query_vector
        )
        SELECT hilbert_index
        FROM dot_product_results
        WHERE dot_product > 0;
    """
    x, y, z = query_vector
    with connection.cursor() as cursor:
        cursor.execute(query, (x, y, z))
        results = cursor.fetchall()
    return [row[0] for row in results]

In [44]:
def query_by_hilbert_index(hilbert_indices,connection):
    cursor = connection.cursor()
    
    # Create the SQL query
    query = """
        SELECT *
        FROM satellites
        WHERE hilbert_index = ANY(%s);
    """
    
    # Execute the query with the list of indices
    cursor.execute(query, (hilbert_indices,))
    
    # Fetch all matching rows
    results = cursor.fetchall()

    cursor.close()

    return results

In [58]:
def query_with_temp_table(hilbert_indices, conn):
    """
    Creates a temporary table, populates it with hilbert_index values, and performs a join
    with the satellites table to fetch matching rows.
    """
    try:
        cursor = conn.cursor()

        # Drop the temporary table if it exists
        cursor.execute("DROP TABLE IF EXISTS temp_hilbert_indices;")
        
        # Create a temporary table
        cursor.execute("""
            CREATE TEMPORARY TABLE temp_hilbert_indices (
                hilbert_index BIGINT
            ) ON COMMIT DROP;
        """)
        
        # Insert hilbert_index values into the temporary table
        insert_query = "INSERT INTO temp_hilbert_indices (hilbert_index) VALUES (%s)"
        cursor.executemany(insert_query, [(index,) for index in hilbert_indices])
        
        # Perform the join query
        query = """
            SELECT s.*
            FROM satellites s
            INNER JOIN temp_hilbert_indices t
            ON s.hilbert_index = t.hilbert_index;
        """
        cursor.execute(query)
        
        # Fetch the results
        results = cursor.fetchall()

        # Clean up
        cursor.close()

        return results

    except Exception as e:
        print(f"An error occurred: {e}")
        if cursor:
            cursor.close()
        raise


In [62]:
def query_with_temp_table_indexed(hilbert_indices, conn):
    """
    Creates a temporary table with an index on hilbert_index, populates it, 
    and performs a join with the satellites table to fetch matching rows.
    """
    try:
        cursor = conn.cursor()

        # Drop the temporary table if it exists
        cursor.execute("DROP TABLE IF EXISTS temp_hilbert_indices;")
        
        # Create a temporary table
        cursor.execute("""
            CREATE TEMPORARY TABLE temp_hilbert_indices (
                hilbert_index BIGINT
            ) ON COMMIT DROP;
        """)

        # Create an index on hilbert_index in the temporary table
        cursor.execute("""
            CREATE INDEX temp_hilbert_index_idx ON temp_hilbert_indices (hilbert_index);
        """)
        
        # Insert hilbert_index values into the temporary table
        insert_query = "INSERT INTO temp_hilbert_indices (hilbert_index) VALUES (%s)"
        cursor.executemany(insert_query, [(index,) for index in hilbert_indices])
        
        # Perform the join query
        query = """
            SELECT s.*
            FROM satellites s
            INNER JOIN temp_hilbert_indices t
            ON s.hilbert_index = t.hilbert_index;
        """
        cursor.execute(query)
        
        # Fetch the results
        results = cursor.fetchall()

        # Clean up
        cursor.close()

        return results

    except Exception as e:
        print(f"An error occurred: {e}")
        if cursor:
            cursor.close()
        raise


In [60]:
conn = connect_to_db()

In [55]:
sub_idx = query_hilbert_points(conn, [6000000,10000,2000000])
len(sub_idx)

6223

In [64]:
sub_idx = sorted(sub_idx)

In [41]:
len(sub_idx)

10960

In [42]:
len(set(sub_idx))

10960

In [56]:
%%timeit
rows = query_by_hilbert_index(sub_idx, conn)

3.58 s ± 90.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [61]:
%%timeit
rows_join = query_with_temp_table(sub_idx, conn)

3.18 s ± 61.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [66]:
rows_join_index = query_with_temp_table_indexed(sub_idx, conn)

In [69]:
len(rows_join_index)

879470

In [74]:
config = {
    "hilbert":hilbert_dict,
    "postgis":{
        "dbname":"minecraftindex",
        "user":"kyjohnso",
        "password":"Password",
        "host":'localhost',  # change to postgis if running in a container
        "port":5432,
    },
    "satellite":{
        "tle_url":"https://celestrak.org/NORAD/elements/gp.php?GROUP=starlink&FORMAT=tle",
        "query_interval_seconds":10,
    }
}  

In [80]:
import datetime, pytz

ModuleNotFoundError: No module named 'pytz'

In [81]:
datetime.datetime(2024,5,5).replace(tzinfo=datetime.UTC)

datetime.datetime(2024, 5, 5, 0, 0, tzinfo=datetime.timezone.utc)

In [76]:
80000000/100000

800.0

In [75]:
import yaml
with open("./starlink_config.yml","w") as f:
    f.write(
        yaml.safe_dump(config)
    )


In [None]:
sats = get_starlink_satellites()

In [None]:
sats[0]

In [None]:
sats = compute_ecef_positions(sats)
sats[0]

In [15]:
world_size = 80_000_000
cell_size= 100_000

hilbert_dict = {
    "world_size":world_size,
    "cell_size":cell_size,
    "n":3,
}
np.ceil(np.log2(hilbert_dict["world_size"]/hilbert_dict["cell_size"]))
# hilbert_dict

np.float64(10.0)

In [19]:
((350000+6700000)/100000)**3

350402.625

In [None]:
hilbert = ecef_points_to_hilbert(np.array([s["ecef"] for s in sats]),hilbert_dict)

In [None]:
hilbert_ecef = hilbert_to_ecef_points(hilbert,hilbert_dict)
hilbert_points = [{"hilbert":hilbert[i],"ecef":hilbert_ecef[i]} for i in range(len(hilbert))]
hilbert_points[0]

In [None]:
sats = add_hilbert_to_satellites(sats,hilbert)
sats[0]

In [22]:
create_satellites_table(conn)
create_hilbert_points_table(conn)

Hilbert points table created successfully.


In [21]:
drop_satellites_table(conn)
drop_hilbert_points_table(conn)

Satellites table dropped successfully.
Hilbert points table dropped successfully.


In [None]:
insert_hilbert_points(hilbert_points,conn)

In [None]:
len(hilbert_points)

In [None]:
query = f"SELECT COUNT(*) FROM hilbert_points;"
cursor = conn.cursor()
cursor.execute(query)
cursor.fetchone()

In [None]:
insert_satellites(sats,conn)

In [None]:
query = f"SELECT COUNT(*) FROM satellites;"
cursor = conn.cursor()
cursor.execute(query)
cursor.fetchone()

In [None]:
len(sats)