In [1]:
import traceback
from contextlib import contextmanager
from typing import Generator
from typing import Tuple

import numpy as np
import scipy
from astropy.io import fits
from psycopg2 import DatabaseError, sql
from psycopg2 import extensions
from psycopg2.extensions import cursor, register_adapter, AsIs
from psycopg2.pool import ThreadedConnectionPool
from scipy.optimize import curve_fit

# Register numpy -> postgres value adapter
register_adapter(np.float64, lambda float64: AsIs(float64))
register_adapter(np.int64, lambda int64: AsIs(int64))

# Create connection pool
pool = ThreadedConnectionPool(
    minconn=1,
    maxconn=10,
    host="localhost",  # see compose file
    user="radon",
    password="radon2023",
    port="5452",
    database="radon_sql"
)


@contextmanager
def postgres() -> Generator[cursor, None, None]:
    connection = pool.getconn()
    try:
        cursor = connection.cursor()
        try:
            yield cursor
        finally:
            cursor.close()
        connection.commit()
    except DatabaseError as error:
        print("Rolling back all transactions because of database error")
        print(f"Error: {error}")
        print(f"Traceback: {traceback.format_exc()}")
        connection.rollback()
    finally:
        pool.putconn(connection)

DETAIL:  The database was created using collation version 2.31, but the operating system provides version 2.36.
HINT:  Rebuild all objects in this database that use the default collation and run ALTER DATABASE radon_sql REFRESH COLLATION VERSION, or build PostgreSQL with the right library version.


In [2]:
from typing import List


class SineApproximation:
    def __init__(self, level: int, a: float, b: float, c: float, d: float, rmse: float):
        self.level = level
        self.a = a
        self.b = b
        self.c = c
        self.d = d
        self.rmse = rmse

    def __repr__(self):
        return f"SineApproximation(level={self.level}, a={self.a}, b={self.b}, c={self.c}, d={self.d}, rmse={self.rmse})"


class NormalDistribution:
    def __init__(self, window: int, mean: float, std_dev: float):
        self.window = window
        self.mean = mean
        self.std_dev = std_dev

    def __repr__(self):
        return f"NormalDistribution(window={self.window}, mean={self.mean}, std_dev={self.std_dev})"


class RadonBandResult:
    def __init__(self, band_name: str, degree: int, sine_approximations: List[SineApproximation], normal_distributions: List[NormalDistribution]):
        self.band_name = band_name
        self.degree = degree
        self.sine_approximations = sine_approximations
        self.normal_distributions = normal_distributions

    def __repr__(self):
        return f"RadonBandResult(band_name={self.band_name}, degree={self.degree}, sine_approximations={self.sine_approximations}, normal_distributions={self.normal_distributions})"


class RadonGalaxyResult:
    def __init__(self, source_id: str, bin_id: int, band_results: List[RadonBandResult]):
        self.source_id = source_id
        self.bin_id = bin_id
        self.band_results = band_results

    def __repr__(self):
        return f"RadonGalaxyResult(source_id={self.source_id}, bin_id={self.bin_id}, band_results={self.band_results})"


In [44]:
# Radon variables
THETAS = np.linspace(0, np.pi, 181)
SIN_THETAS = np.sin(THETAS)
COS_THETAS = np.cos(THETAS)
IGNORE_THRESHOLD = 0.00001

# Define the circle mask
SHAPE = (40, 40)
CENTER = [dim // 2 for dim in SHAPE]
RADIUS = SHAPE[0] // 2
Y_COORDS, X_COORDS = np.ogrid[-CENTER[0]:SHAPE[0] - CENTER[0], -CENTER[1]:SHAPE[1] - CENTER[1]]
MASK = X_COORDS * X_COORDS + Y_COORDS * Y_COORDS <= RADIUS * RADIUS

# Normal distribution fitting variables
DISTRIBUTION_WINDOWS = [15, 30, 60, 90]


def radon_transform(image: np.ndarray) -> np.ndarray:
    """ Assuming image is already circle-masked """
    sinogram = np.zeros((40, len(THETAS)))
    for i, (sin_theta, cos_theta) in enumerate(zip(SIN_THETAS, COS_THETAS)):
        matrix = np.array([
            [cos_theta, sin_theta, -20 * (cos_theta + sin_theta - 1)],
            [-sin_theta, cos_theta, -20 * (cos_theta - sin_theta - 1)],
            [0, 0, 1]
        ])
        y_coords, x_coords = np.indices((40, 40))
        homogeneous_coords = np.stack((x_coords, y_coords, np.ones_like(x_coords)), axis=-1)
        homogeneous_coords_2d = homogeneous_coords.reshape((-1, 3))
        transformed_coords = np.dot(homogeneous_coords_2d, matrix.T)
        transformed_coords_2d = transformed_coords[:, :2] / transformed_coords[:, 2, np.newaxis]
        transformed_coords_3d = transformed_coords_2d.reshape((40, 40, 2))
        interpolated_image = scipy.ndimage.map_coordinates(image, transformed_coords_3d.transpose((2, 0, 1)))
        sums = np.sum(interpolated_image, axis=0)
        sinogram[:, i] = sums
    return sinogram


def filter_mean(x, threshold):
    index = int((1 - threshold) * x.shape[0])
    return np.mean(np.sort(x, axis=0)[index:], axis=0)


def filter_sinogram(sinogram):
    """ Returns a dictionary of filtered sinograms, with the key being the filter level """
    return {
        0: np.max(sinogram, axis=0),
        0.05: filter_mean(sinogram, 0.05),
        0.1: filter_mean(sinogram, 0.1),
        0.2: filter_mean(sinogram, 0.2),
        0.3: filter_mean(sinogram, 0.3)
    }


def expand_data(sinogram_1d: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """ Expands 1D sinogram data from [0, 180] to [-180, 180 * 2] degrees (wraps) """
    extended_data = np.concatenate((sinogram_1d[:-1], sinogram_1d, sinogram_1d[1:]))
    n = len(sinogram_1d)
    return np.arange(-(n - 1), (n - 1) * 2 + 1), extended_data


def sine_function(x, a, b, c, d):
    return a * np.sin(b * x + c) + d


def fit_sine(xs, ys):
    ff = np.fft.fftfreq(len(xs), (xs[1] - xs[0]))  # assume uniform spacing
    Fyy = abs(np.fft.fft(ys))
    guess_freq = abs(ff[np.argmax(Fyy[1:]) + 1])  # excluding the zero frequency "peak", which is related to offset
    guess_amp = np.std(ys) * 2.0 ** 0.5
    guess_offset = np.mean(ys)
    guess = np.array([guess_amp, 2.0 * np.pi * guess_freq, 0.0, guess_offset])
    return curve_fit(sine_function, xs, ys, p0=guess)

In [50]:
def process(metadata_entry) -> RadonGalaxyResult:
    source_id, bin_id = metadata_entry
    print(f"Processing galaxy b{bin_id}/{source_id}...")

    # Fetch FITS file from disk
    fits_path = f"/home/neo/data/fits/b{bin_id}/{source_id}.fits"
    with fits.open(fits_path) as hdu_list:
        data = hdu_list[0].data
        hdu_list.close()

    # Apply radon transform to get max rotations for each band
    band_results: List[RadonBandResult] = []
    for i, band in enumerate("griz"):
        print(f"Processing band {band}...")

        # Check if there is any data (threshold) in this channel
        # - or if there are any NadNs, we want to ignore NaNs because it can propagate to the sinogram
        if not data[i].any():
            print("WARNING: No band data exists, skipping")
            continue
        elif np.any(np.isnan(data[i])):
            print("WARNING: Band data contains NaNs, skipping")
            continue

        # Perform radon transform
        sinogram = radon_transform(data[i] * MASK)
        if not sinogram.any():
            print("WARNING: No sinogram data exists, skipping")
            continue
        elif np.any(np.isnan(sinogram)):
            print("WARNING: Sinogram data contains NaNs, skipping")
            continue

        # Compute rotations & errors
        offset, rotation = np.unravel_index(sinogram.argmax(), sinogram.shape)

        # Apply filters to sinogram
        filtered_sinogram_map = filter_sinogram(sinogram)

        # Calculate sine approximation at each filter level
        xs = range(sinogram.shape[1])
        sine_approximations: List[SineApproximation] = []
        for level, sinogram_1d in filtered_sinogram_map.items():
            try:
                expanded_xs, expanded_ys = expand_data(sinogram_1d)
                popt, _ = fit_sine(expanded_xs, expanded_ys)
                sin_ys = sine_function(xs, *popt)
                rmse = np.sqrt(np.mean((sinogram_1d - sin_ys) ** 2))
                sine_approximations.append(SineApproximation(level, *popt, rmse))
            except:
                print(f"WARNING: Failed to fit sine approximation for level {level}, skipping")

        # Fit a normal distribution to each window
        normal_distributions: List[NormalDistribution] = []
        expanded_xs, expanded_ys = expand_data(filtered_sinogram_map[0])  # 0 = max filter
        for window in DISTRIBUTION_WINDOWS:
            # Mask the data to only include the window
            peak_mask = (expanded_xs >= rotation - window) & (expanded_xs <= rotation + window)
            xs_near_peak, ys_near_peak = expanded_xs[peak_mask], expanded_ys[peak_mask]

            # Normalize the intensity
            ys_near_peak_normalized = ys_near_peak / np.sum(ys_near_peak)

            # Calculated weighted mean and standard deviation
            mean = np.average(xs_near_peak, weights=ys_near_peak_normalized)
            std = np.sqrt(np.average((xs_near_peak - mean) ** 2, weights=ys_near_peak_normalized))

            # Check NaNs
            if np.isnan(mean) or np.isnan(std):
                print(f"WARNING: Normal distribution mean or std is NaN, skipping window {window}")
            else:
                # Append to list
                normal_distributions.append(NormalDistribution(window, mean, std))

        band_results.append(RadonBandResult(band, rotation, sine_approximations, normal_distributions))

    return RadonGalaxyResult(source_id, bin_id, band_results)


In [51]:
x = process((2268328776282890496, 2214))

Processing galaxy b2214/2268328776282890496...
Processing band g...
Processing band r...
Processing band i...
Processing band z...


In [43]:
x

RadonGalaxyResult(source_id=2268328776282890496, bin_id=2214, band_results=[RadonBandResult(band_name=g, degree=52, sine_approximations=[SineApproximation(level=0, a=0.017276303819657945, b=0.07048883728001312, c=-1.582068057268492, d=0.37810722549743264, rmse=0.019302179092066395), SineApproximation(level=0.05, a=0.01534398000295663, b=0.0705834940854214, c=-1.6308896943753057, d=0.371066351139268, rmse=0.01712145676945295), SineApproximation(level=0.1, a=0.013084716315371775, b=0.034351900317254255, c=-0.6101084221578754, d=0.34671930373454735, rmse=0.012744036397377362), SineApproximation(level=0.2, a=0.0110226392162924, b=0.03519138717009286, c=-0.9190209019376827, d=0.2882810685913787, rmse=0.008181744542696505), SineApproximation(level=0.3, a=0.008520532284098058, b=0.035361866485611466, c=-0.9950304228230398, d=0.23445723280892353, rmse=0.004112752780471211)], normal_distributions=[NormalDistribution(window=15, mean=51.58144951725376, std_dev=8.779594083478013), NormalDistributi

In [41]:
with postgres() as cursor:
    # insert_result(cursor, x)
    cursor.execute("""
    SELECT * FROM sine_approximations;
    """)
    results = cursor.fetchall()
    print(results)

[]


In [16]:
x.band_results[0].sine_approximations

[SineApproximation(level=0, a=0.0, b=0.011614020900516796, c=0.0, d=0.0, rmse=0.0),
 SineApproximation(level=0.05, a=0.0, b=0.011614020900516796, c=0.0, d=0.0, rmse=0.0),
 SineApproximation(level=0.1, a=0.0, b=0.011614020900516796, c=0.0, d=0.0, rmse=0.0),
 SineApproximation(level=0.2, a=0.0, b=0.011614020900516796, c=0.0, d=0.0, rmse=0.0),
 SineApproximation(level=0.3, a=0.0, b=0.011614020900516796, c=0.0, d=0.0, rmse=0.0)]

In [29]:
temp_data[0][1] = 3

In [42]:
# check if temp_data has nans
np.all(temp_data - 0.000000000001 > 0)

False

In [32]:
np.max(temp_data)

3.0

In [9]:
temp_data.max()

0.007988871