In [1]:
import polars as pl
import numpy as np
import torch
import math
import time
import heapq
from typing import List

In [2]:
df = pl.read_parquet('./support/tyc2-3.parquet')
print(df)

shape: (2_430_468, 3)
┌────────────┬────────────┬───────────┐
│ RAmdeg     ┆ DEmdeg     ┆ Vmag      │
│ ---        ┆ ---        ┆ ---       │
│ f32        ┆ f32        ┆ f32       │
╞════════════╪════════════╪═══════════╡
│ 2.317505   ┆ 2.231843   ┆ 12.146    │
│ 1.125582   ┆ 2.267394   ┆ 8.50638   │
│ 1.056865   ┆ 1.897829   ┆ 12.026111 │
│ 0.050598   ┆ 1.771443   ┆ 10.44927  │
│ …          ┆ …          ┆ …         │
│ 345.76767  ┆ -88.284042 ┆ 12.423479 │
│ 341.197632 ┆ -88.538872 ┆ 12.06899  │
│ 337.975433 ┆ -88.762932 ┆ 10.423201 │
│ 355.96582  ┆ -88.834259 ┆ 12.03012  │
└────────────┴────────────┴───────────┘


In [17]:
def convert_to_cartesian(distances, ra, dec):
    x = distances * torch.cos(dec) * torch.cos(ra)
    y = distances * torch.cos(dec) * torch.sin(ra)
    z = distances * torch.sin(dec)
    return torch.stack((x, y, z), dim=1)

def calculate_distances(coords):
    dist_matrix = torch.cdist(coords, coords)
    unique_distances = torch.unique(dist_matrix)
    return unique_distances[unique_distances > 0]

def distance_from_magnitude(m, M):
    return 10**((m - M + 5) / 5)

def distance_from_magnitude_tensor(m: torch.Tensor, M: torch.Tensor) -> torch.Tensor:
    return 10**((m - M + 5) / 5)

# one of the possibilities
def tetrahedron_score(coords):
    distances = calculate_distances(coords)
    print(distances)
    mean_distance = torch.mean(distances)
    std_dev = torch.std(distances)

    # Assuming std_dev is small enough, the score will be close to 1
    # Otherwise, it will be closer to 0
    score = math.exp(-std_dev.item() / mean_distance.item())
    return score

def square_score(tensor):
    """Return a measure of how close the points in a tensor are to forming a square,
    as well as the standard deviation of their brightness."""
    
    # Calculate pairwise distances based on the x and y coordinates (first two dimensions)
    spatial_distances = torch.pdist(tensor, p=2)
    print(spatial_distances)
    
    # Sort the distances
    sorted_distances = torch.sort(spatial_distances)[0]
    
    # Take the 4 smallest distances and compute their standard deviation
    std_of_smallest_4 = sorted_distances[:4].std().item()
    
    # # Calculate the standard deviation of the brightness (third dimension)
    # brightness_std = tensor[:, 2].std().item()
    
    return std_of_smallest_4

def measure_squareness_old(tensor):
    """
    :param points: A tensor of shape (4, 2) representing the 4 2D points
    :return: A float indicating the squareness. Closer to 1 means more square.
    """
    
    # Calculate pairwise distances based on the x and y coordinates (first two dimensions)
    spatial_distances = torch.pdist(tensor, p=2)

    # Sort the distances
    distances = torch.sort(spatial_distances)[0]
    
    # Mean of sides and diagonals
    mean_sides = torch.mean(distances[:4])
    mean_diagonal = torch.mean(distances[4:])
    
    # Squareness measure
    squareness = mean_diagonal / mean_sides
    
    # Normalize with sqrt(2) to get values closer to 1 for squares
    return abs(1 - (squareness / torch.sqrt(torch.tensor(2.0)).item()))

def measure_squareness(tensor):
    # Calculate pairwise distances based on the x and y coordinates (first two dimensions)
    spatial_distances = torch.pdist(tensor, p=2)

    # Sort the distances
    distances = torch.sort(spatial_distances)[0]
   
    # Compute the standard deviation for the four shortest distances and the two longest distances
    std_sides = torch.std(distances[:4])
    std_diagonal = torch.std(distances[4:])
    
    # Ideally, for a perfect square, the standard deviations would be 0
    # We use exp(-x) as a measure to get values close to 1 for low standard deviations
    side_uniformity = torch.exp(-std_sides)
    diagonal_uniformity = torch.exp(-std_diagonal)
    
    # Mean of sides and diagonals
    mean_sides = torch.mean(distances[:4])
    mean_diagonal = torch.mean(distances[4:])
    
    # Squareness measure based on side to diagonal ratio
    squareness_ratio = mean_sides / mean_diagonal
    
    # Combine all the measures
    # Normalize with sqrt(2) to get values closer to 1 for squares
    final_squareness = (squareness_ratio / torch.sqrt(torch.tensor(2.0)).item()) * side_uniformity * diagonal_uniformity
    
    return abs(1 - final_squareness.item())



    
# go from tycho2 to xyz coords
def transform_radecmag_from_numpy(stars):
    torch_tensors = [torch.from_numpy(star) for star in stars]
    zeroes = torch.zeros(len(torch_tensors[2]))
    print("one ", torch_tensors)
    torch_tensors[2] = distance_from_magnitude_tensor(torch_tensors[2], zeroes)
    print("two", torch_tensors)
    coords = convert_to_cartesian(*torch_tensors)
    return coords

def global_normalize_tensor(tensor):
    """Normalize a tensor based on its global min and max values. Also works for multiple tensors"""
    global_min = torch.min(tensor)
    global_max = torch.max(tensor)
    
    normalized = (tensor - global_min) / (global_max - global_min)
    return normalized
    
def radec_normalize_tensor(tensors):
    """Normalize tensors based on their global min and max values, excluding the 3rd column."""

    # Concatenate tensors while excluding the 3rd column
    # Drop the 3rd column from each tensor
    tensor = tensors[:, :2]

    # Compute global min and max excluding the 3rd column
    global_min = torch.min(tensor)
    global_max = torch.max(tensor)

    # Normalize tensors using the computed global min and max
    normalized = (tensor - global_min) / (global_max - global_min)
    return normalized

def mag_score(tensor):
    # Computing the standard deviation
    # stdev = t[:, 2].std()
    max = tensor[:, 2].max()
    min = tensor[:, 2].min()
    return max - min
    
def stars_for_point_and_radius(df, point, radius, max_mag):
    """ point is in the corner, not the center """
    ra, dec = point
    minra = ra
    maxra = ra + radius
    mindec = dec
    maxdec = dec + radius
    return df.filter((pl.col("RAmdeg") < maxra) & (pl.col("RAmdeg") > minra) & (pl.col("DEmdeg") < maxdec) & (pl.col("DEmdeg") > mindec) & (pl.col("Vmag") <= max_mag))

def stars_for_center_and_radius(df, center, radius, max_mag):
    ra, dec = center
    minra = ra - radius/2
    maxra = ra + radius/2
    mindec = dec - radius/2
    maxdec = dec + radius/2
    return df.filter((pl.col("RAmdeg") < maxra) & (pl.col("RAmdeg") > minra) & (pl.col("DEmdeg") < maxdec) & (pl.col("DEmdeg") > mindec) & (pl.col("Vmag") <= max_mag))


def get_grid_points(min_dec=-90, max_dec=90):
    RA_values = [ra for ra in range(0, 361, 4)]  # Increment by 4 for a 2-degree radius
    Dec_values = [dec for dec in range(min_dec, max_dec+1, 4)]  # Increment by 4 for a 2-degree radius
    grid_points = [(ra, dec) for ra in RA_values for dec in Dec_values]
    return grid_points

def get_grid_point_by_idx(idx):
    gp = get_grid_points()
    return gp[idx]

def get_region(df, idx, radius, max_mag):
    center = get_grid_point_by_idx(idx)
    return stars_for_center_and_radius(df, center, radius, max_mag)

resultdf = pl.DataFrame({
    "score": pl.Float64,
    "region": pl.Int64,
    "item": []
})

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class ScoreItem:
    score: float
    region: int=field(compare=False)
    item: Any=field(compare=False)
    


## Code to process each region

In [14]:
from itertools import combinations
from heapq import heappush, heappushpop

first = lambda h: 2**h - 1      # H stands for level height
last = lambda h: first(h + 1)
level = lambda heap, h: heap[first(h):last(h)]
prepare = lambda e, field: str(e).center(field)
def hprint(heap, width=None):
    if width is None:
        width = max(len(str(e)) for e in heap)
    height = int(math.log(len(heap), 2)) + 1
    gap = ' ' * width
    for h in range(height):
        below = 2 ** (height - h - 1)
        field = (2 * below - 1) * width
        print(gap.join(prepare(e, field) for e in level(heap, h)))
        
def process(stars, region, point) -> List[ScoreItem]:
    tensor = torch.tensor(stars.to_numpy())
    print(tensor.shape)
    combs = list(combinations(range(len(stars)), 4))
    indices = torch.tensor(combs, dtype=torch.long)
    tensor_combinations = tensor[indices]
    heap = []
    for entry in tensor_combinations:
        norm_radec = radec_normalize_tensor(entry)
        b = mag_score(entry)
        if b > 1:
            continue
        a = measure_squareness(norm_radec)     
        score = a+b
        item = (a,b,entry)
        if len(heap) < 2:
            heappush(heap, ScoreItem(score, region, item))
        else:
            heappushpop(heap, ScoreItem(score, region, item))
    print(f"Processed {region=} - {point} for length {len(stars)} with {len(tensor_combinations)=}")
    return heap
        


## Code to save progress

In [15]:
def add_to_result_and_save(resultdf, heap: List[ScoreItem], filename):
    for entry in heap:
        thisdf = pl.DataFrame({
            "score": entry.score, 
            "region": entry.region, 
            "score1": entry.item[0],
            "score2": entry.item[1],
            "stars": [entry.item[2].tolist()],
        })
        if resultdf.is_empty():
            resultdf = thisdf
        else:
            resultdf = pl.concat([resultdf, thisdf])
    resultdf.write_parquet(filename)
    return resultdf

## Generate a grid of regions and process each region

In [None]:
grid_points = get_grid_points(-63,63)
print("Total grid points is:", len(grid_points))
global_heap=[]
result_filename = 'result_squareness.parquet'
resultdf = pl.DataFrame()
try:
    resultdf = pl.read_parquet(result_filename)
except:
    print("no previoous results")
    pass
print(f"Loaded results: {resultdf.head()}")

def process_regions(resultdf, grid, start=0, end=0):
    if end == 0:
        end = len(grid)
    
    zipped_list = list(zip(range(len(grid)), grid))
    grid_points = zipped_list[start:end]
    
    for idx, point in grid_points:
        stars = stars_for_point_and_radius(df, point=point, radius=2, max_mag=10)
        if len(stars) > 0:
            best = heapq.nsmallest(1, global_heap)
            beststr = f"{best[0].score:.5f}" if best else 0
            print(f"\n\n---- Region: {idx}, {len(stars)} stars, best is {beststr} global_heap is now {len(global_heap)} --------")
            heap = process(stars, idx, point)
            if heap:
                hprint(heap if heap else []) 
            else:
                print("no results")
            print("************************")
            for h in heap:
                heappush(global_heap, h)
            # add_result(resultdf, top_10)
            resultdf = add_to_result_and_save(resultdf, heap, result_filename)
            print(resultdf.sort("score").head(5) if not resultdf.is_empty() else "no score")
    #time.sleep(10)
process_regions(resultdf, grid_points, start=0)

Total grid points is: 2912
no previoous results
Loaded results: shape: (0, 0)
┌┐
╞╡
└┘


---- Region: 0, 11 stars, best is 0 global_heap is now 0 --------
torch.Size([11, 3])
Processed region=0 - (0, -63) for length 11 with len(tensor_combinations)=330
                                                                                                                                                                                                                                                      ScoreItem(score=tensor(1.0284), region=0, item=(0.6410225033760071, tensor(0.3874), tensor([[  0.6367, -61.3918,   9.6756],
        [  0.8915, -61.7731,   9.9925],
        [  0.3719, -62.0458,   9.9809],
        [  0.7321, -62.8722,   9.6050]])))                                                                                                                                                                                                                                                      
ScoreIte

In [10]:
a = [[352.9835510253906, -61.950279235839844, 9.794389724731445], [352.44195556640625, -61.28193664550781, 9.73677921295166], [352.8981628417969, -62.92292022705078, 9.376359939575195], [353.7196960449219, -62.507789611816406, 9.891069412231445]]
square_score(radec_normalize_tensor(torch.Tensor(a)))

tensor([0.0021, 0.0023, 0.0022, 0.0041, 0.0042, 0.0022])


0.00011394966713851318

In [11]:



# Test
points = torch.tensor([[0, 0], [0, 1], [1, 0], [1, 1]], dtype=torch.float32)
print(measure_squareness(points))  # This should be close to 1 for a square
print(measure_squareness(torch.tensor(a)))  # This should be close to 1 for a square



tensor(0.)
tensor(0.2734)


In [None]:
# importing mplot3d toolkits
from mpl_toolkits import mplot3d
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import gridspec
import torch
fig = plt.figure(figsize=(12, 6))
gs = gridspec.GridSpec(1, 2, width_ratios=[1, 1])  # set width ratio for each subplot
coords = torch.tensor(a)
# First subplot with one viewing angle
ax1 = fig.add_subplot(gs[0], projection='3d')
ax1.scatter(coords[:, 0], coords[:, 1], coords[:, 2], c='green')
ax1.view_init(30, 30)  # Set elevation and azimuth
ax1.set_title('View 1')
ax1.set_xlabel('X')
ax1.set_ylabel('Y')
ax1.set_zlabel('Z')

# Second subplot with another viewing angle
ax2 = fig.add_subplot(gs[1], projection='3d')
ax2.scatter(coords[:, 0], coords[:, 1], coords[:, 2], c='green')
ax2.view_init(30, 120)  # Different elevation and azimuth
ax2.set_title('View 2')
ax2.set_xlabel('X')
ax2.set_ylabel('Y')
ax2.set_zlabel('Z')

plt.suptitle('xyz projection for stars from different views')
plt.tight_layout()
plt.show()