In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from spectrumProcessorPlato import PowerSpectrumProcessor
from IterativeSNR import IterativeSNRAnalyzer
from noiseAddition import NoiseAddition
from bestFilters import BestFilters
from gaussianFitter import GaussianWrapper
from Spacings_Class import Spacings
from Exoplanets import ExoplanetAnalysis
from MassRadExtract import M_R_Est
from MassComparison import MassComparison
from RadiusComparison import StellarParameterComparison
import os

Add noise to the clean spectrums. Send to a new folder for F/S's algorithm to run

In [None]:
spectrum_dir = "algoSpectrum"

for filename in os.listdir(spectrum_dir):
    if filename.endswith(".pow"):
        filepath = os.path.join(spectrum_dir, filename)
        noise_sim = NoiseAddition(filepath, vmag_option="random", manual_vmag=None, num_vmags=6)
        noise_sim.add_noise() 

F/S's run all of the stars with the noise added. Output Yes/Maybe/No. Save all of the results to a csv. The Maybe's and No's are sent to a separate folder for W/L/F

In [None]:
noise_dir = "NosiyData"

analyzer = IterativeSNRAnalyzer(input_directory=r"NoisyData")
analyzer.process_files(percentlimit=14, noiselimit=10, greyarea=2, use_maybe_dir=False)

In [None]:
import pandas as pd
import re

def custom_sort(file_name):
    """Extracts spectrum number and vmag from the filename."""
    match = re.match(r"spectrum_(\d+)_cams24_vmag(\d+\.\d+)\.pow", file_name)
    if match:
        return int(match.group(1)), float(match.group(2))
    return (float('inf'), float('inf'))  # Put unmatched filenames at the end

def sort_csv(file_path):
    """Reads a CSV, sorts it, and replaces the original CSV."""
    df = pd.read_csv(file_path, header=None)  # No header

    df['sort_keys'] = df[0].apply(custom_sort)
    df = df.sort_values(by='sort_keys').drop(columns=['sort_keys'])

    # Save to CSV, overwriting the original file, no header
    df.to_csv(file_path, index=False, header=False)

# Example usage (replace 'your_file.csv' with your actual file path)
sort_csv("iterative analysis output//Results.csv")

Collcet the files from F/S's

In [None]:
noise_dir = "NoisyData"

for filename in os.listdir(noise_dir):
    if filename.endswith(".pow"):
        filepath = os.path.join(noise_dir, filename)
        filter = PowerSpectrumProcessor(filepath)
        filter.load_power_spectrum()
        filter.run_filtering()
        filter.run_baseline_subtraction()
        filter.save_results()

W's algorithm runs the Maybe's and No's

In [None]:
filter_dir = "filterOutput"

for filename in os.listdir(filter_dir):
    if filename.endswith(".csv"):
        filepath = os.path.join(filter_dir, filename)
        best_filters = BestFilters(filepath)
        best_filters.process_and_plot()

Save CSV, with best 3, for L/F to run their section.

In [None]:
import os
import csv

best_dir = "bestFilters"
output_csv = "Results.csv"
temp_csv = "Results_temp.csv"

# Read existing results into a list (since multiple rows may exist per filename)
results_list = []

try:
    with open(output_csv, 'r', newline='') as csvfile:
        reader = csv.DictReader(csvfile)
        fieldnames = reader.fieldnames
        for row in reader:
            results_list.append(row)  # Store each row in a list
except FileNotFoundError:
    print(f"Error: {output_csv} not found. Ensure the file exists.")
    fieldnames = ["filename", "gaussian", "spacings", "nu_max", "error nu_max", "delta nu", "error delta nu"]

# Process new data and update matching rows
for filename in os.listdir(best_dir):
    if filename.endswith(".csv"):
        filepath = os.path.join(best_dir, filename)
        print(f"Processing file: {filepath}")
        
        try:
            gw = GaussianWrapper(filepath)
            gauss_status, nu_max, envelope_indices, best_fit_no = gw.run_wrapper()
        except Exception as e:
            print(f"Error processing GaussianWrapper for {filename}: {e}")
            continue
        
        if len(nu_max) < 2:
            print(f"Warning: nu_max has insufficient elements in file {filename}: {nu_max}")
            nu_max = [None, None]  # Default values to avoid index errors
        
        try:
            s = Spacings(filepath, envelope_indices, best_fit_no)
            spacing_status, delta_nu, error = s.spacings()
        except Exception as e:
            print(f"Error processing Spacings for {filename}: {e}")
            spacing_status, delta_nu, error = None, None, None

        base_filename = os.path.splitext(filename)[0]  # Strip .csv
        
        # Find matching row(s) in Results.csv
        updated = False
        for row in results_list:
            row_base = os.path.splitext(row["filename"])[0]  # Strip .pow from Results.csv entry

            if row_base == base_filename:
                print(f"Updating entry for {filename}")
                # Update only missing values
                row["gaussian"] = row["gaussian"] or gauss_status
                row["spacings"] = row["spacings"] or spacing_status
                row["nu_max"] = row["nu_max"] or nu_max[0]
                row["error nu_max"] = row["error nu_max"] or nu_max[1]
                row["delta nu"] = row["delta nu"] or delta_nu
                row["error delta nu"] = row["error delta nu"] or error
                updated = True

        if not updated:
            print(f"Warning: No matching entry found for {filename} in {output_csv}")

# Write updated results back to the CSV file
try:
    with open(temp_csv, 'w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(results_list)
    os.replace(temp_csv, output_csv)
    print(f"Updated results saved to {output_csv}")
except Exception as e:
    print(f"Error writing to CSV file: {e}")

In [None]:
csvpath = 'Results.csv'
spectra_folder_path = 'NoisyData'

processor = M_R_Est(csvpath, spectra_folder_path, ExoplanetAnalysis)
processor.run()

In [None]:
import pandas as pd
import os
import re

def extract_pow_data(filepath):
    """
    Extracts nu max, delta nu, actual mass, and actual radius from a .pow file.

    Args:
        filepath (str): The path to the .pow file.

    Returns:
        dict: A dictionary containing the extracted data, or None if not found.
              Keys: 'nu_max', 'delta_nu', 'actual_mass', 'actual_radius'.
    """
    data = {}
    try:
        with open(filepath, 'r', encoding='latin-1') as f:
            for line in f:
                line = line.strip()
                if line.startswith('#'):
                    parts = line[1:].strip().split(':')
                    if len(parts) == 2:
                        key = parts[0].strip()
                        value_str = parts[1].strip().split()[0]  # Take only the first word as value
                        try:
                            if key == 'nu_max':
                                data['nu_max'] = float(value_str)
                            elif key == 'delta_nu':
                                data['delta_nu'] = float(value_str)
                            elif key == 'Mact':
                                data['actual_mass'] = float(value_str)
                            elif key == 'Radius':
                                data['actual_radius'] = float(value_str)
                        except ValueError:
                            print(f"Warning: Could not convert value to float for key '{key}' in line: {line} in {filepath}")
    except FileNotFoundError:
        print(f"Error: File not found: {filepath}")
        return None
    return data

def add_pow_data_to_csv(csv_filepath, pow_files_directory):
    """
    Reads a CSV file, extracts data from corresponding .pow files,
    and adds the data as new columns to the CSV.

    Args:
        csv_filepath (str): The path to the CSV file.
        pow_files_directory (str): The directory containing the .pow files.
    """
    try:
        df = pd.read_csv(csv_filepath)
    except FileNotFoundError:
        print(f"Error: CSV file not found: {csv_filepath}")
        return

    nu_max_list = []
    delta_nu_list = []
    actual_mass_list = []
    actual_radius_list = []

    for index, row in df.iterrows():
        pow_filename = row['filename']  # Get the filename directly from the 'filename' column
        pow_filepath = os.path.join(pow_files_directory, pow_filename)

        extracted_data = extract_pow_data(pow_filepath)

        if extracted_data:
            nu_max_list.append(extracted_data.get('nu_max'))
            delta_nu_list.append(extracted_data.get('delta_nu'))
            actual_mass_list.append(extracted_data.get('actual_mass'))
            actual_radius_list.append(extracted_data.get('actual_radius'))
        else:
            nu_max_list.append(None)
            delta_nu_list.append(None)
            actual_mass_list.append(None)
            actual_radius_list.append(None)

    df['actual_nu_max'] = nu_max_list  # Changed column name to avoid potential confusion with 'nu_max' guess
    df['actual_delta_nu'] = delta_nu_list # Changed column name
    df['actual_mass'] = actual_mass_list
    df['actual_radius'] = actual_radius_list

    df.to_csv(csv_filepath, index=False)
    print(f"Successfully added data from .pow files to {csv_filepath}")

# Example usage (assuming you have a 'Results.csv' with a 'filename' column
# and a directory named 'NoisyData' containing your .pow files)
csv_file = 'Results.csv'
pow_dir = 'NoisyData'

add_pow_data_to_csv(csv_file, pow_dir)

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

# Load the CSV file
file_path = "Results.csv"  # Update with your actual file path
df = pd.read_csv(file_path)

# Drop rows with any missing values
df = df.dropna()

# Define the columns to compare and their corresponding error columns
comparison_columns = {
    "nu_max": ("actual_nu_max", "error nu_max"),
    "delta nu": ("actual_delta_nu", "error delta nu"),
    "mass guess": ("actual_mass", "mass error"),
    "radius guess": ("actual_radius", "radius error")
}

# Ensure 'gaussian' and 'spacing' columns exist and filter only rows where both are True
if "gaussian" in df.columns and "spacings" in df.columns:
    df_filtered = df[(df["gaussian"] == True) & (df["spacings"] == True)].copy()  # Filter the dataframe and create a copy to avoid SettingWithCopyWarning
    # Create subplots
    fig, axes = plt.subplots(2, 2, figsize=(12, 10))
    axes = axes.flatten()

    # Generate scatter plots with y=x reference line and error bars
    for i, (obtained, (actual, error)) in enumerate(comparison_columns.items()):
        ax = axes[i]
        x = df_filtered[actual]
        y = df_filtered[obtained]
        y_err = df_filtered[error]  # Get the error values

        # Scatter plot with error bars
        ax.errorbar(x, y, fmt='x', color='blue', label='Values', alpha=0.7, capsize=3)  # added yerr

        # y = x reference line
        min_val = min(x.min(), y.min())
        max_val = max(x.max(), y.max())
        buffer = (max_val - min_val) * 0.05  # 5% buffer to avoid points at the edges
        line_range = np.linspace(min_val - buffer, max_val + buffer, 100)
        ax.plot(line_range, line_range, color='red', linestyle='dashed', label='Optimal fit')

        # Set x and y limits
        ax.set_xlim(min_val - buffer, max_val + buffer)
        ax.set_ylim(min_val - buffer, max_val + buffer)

        # Labels and title
        ax.set_xlabel(f"Actual {obtained}")
        ax.set_ylabel(f"Obtained {obtained}")
        ax.set_title(f"{obtained} vs. Actual {obtained}")
        ax.legend()
        ax.grid(True)

    # Adjust layout and show plot
    plt.tight_layout()
    plt.show()

else:
    print("Columns 'gaussian' or 'spacing' not found. Exiting.")
    exit()