# Initial test with denoising autoencoders

In [None]:
import configparser
import os
import random

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, quantile_transform
from tensorflow.keras import layers, losses
from tensorflow.keras.datasets import fashion_mnist
from tensorflow.keras.models import Model

In [None]:
config = configparser.RawConfigParser()
config.read("Config.properties")

In [None]:
wavelengths_path = config.get("path_variables", "wavelengthspath")
datapath = config.get("path_variables", "datapath")

In [None]:
wavelengths = pd.read_csv(wavelengths_path, header=None).iloc[0].values[:-1]

In [None]:
def get_csv_files(input_folder):
    return [
        os.path.join(input_folder, f)
        for f in os.listdir(input_folder)
        if (os.path.isfile(os.path.join(input_folder, f)) and f.endswith(".csv"))
    ]

In [None]:
reddening_files = get_csv_files(datapath)

In [None]:
def select_sources(files, percentage=0.5, index_col=None):
    result_df = pd.DataFrame()
    for f in files:
        curr_df = pd.read_csv(f)
        curr_els = len(curr_df)
        random_numbers = random.sample(
            range(0, curr_els), int(np.ceil(curr_els * percentage))
        )

        selected_df = curr_df[curr_df.index.isin(random_numbers)]
        if index_col and index_col in selected_df.columns:
            selected_df = selected_df.set_index(index_col)
        result_df = pd.concat([result_df, selected_df])
        break
    return selected_df

In [None]:
files_to_select = [reddening_files[0]]

In [None]:
res_df = select_sources(files_to_select, index_col="source_id", percentage=0.05)

In [None]:
res_df = res_df.dropna()
res_df = res_df[~res_df["redden_spectra"].str.contains("inf") == True]
res_df = res_df[~res_df["original_spectra"].str.contains("inf") == True]

In [None]:
def string_to_float_list(input_string):
    data = input_string.replace("(", "")
    data = data.replace(")", "")
    data = data.split(",")
    return np.array(list(map(float, data)))

In [None]:
def apply_gaussian(data, sigma=1):
    from scipy.ndimage import gaussian_filter1d

    gaussian_data = []
    smooth_data = []
    for spectrum in data:
        smooth_spectrum = gaussian_filter1d(spectrum, sigma=sigma)
        result_spectrum = spectrum / smooth_spectrum
        gaussian_data.append(result_spectrum)
        smooth_data.append(smooth_spectrum)
    return gaussian_data, smooth_data

In [None]:
def split_train_test(data_df, normalize=False, save_idx=False, gaussian=False):
    indexes = data_df.index.tolist()
    train_idx, test_idx = train_test_split(indexes)
    train_idx.sort()
    test_idx.sort()

    data_df = data_df.sort_index()
    train_data = data_df[data_df.index.isin(train_idx)]
    test_data = data_df[data_df.index.isin(test_idx)]

    train_noisy_values = train_data["redden_spectra"].to_numpy()
    train_noisy_values = np.array([string_to_float_list(x) for x in train_noisy_values])

    train_values = train_data["original_spectra"]
    train_values = np.array([string_to_float_list(x) for x in train_values])

    test_noisy_values = test_data["redden_spectra"]
    test_noisy_values = np.array([string_to_float_list(x) for x in test_noisy_values])

    test_values = test_data["original_spectra"]
    test_values = np.array([string_to_float_list(x) for x in test_values])

    if normalize:
        train_noisy_values = [x / np.mean(x) for x in train_noisy_values]
        train_values = [x / np.mean(x) for x in train_values]
        test_noisy_values = [x / np.mean(x) for x in test_noisy_values]
        test_values = [x / np.mean(x) for x in test_values]

    elif gaussian:
        train_noisy_values = apply_gaussian(train_noisy_values)
        train_values = apply_gaussian(train_values)
        test_noisy_values = apply_gaussian(test_noisy_values)
        test_values = apply_gaussian(test_values)

    if not save_idx:
        return train_noisy_values, train_values, test_noisy_values, test_values
    return (
        train_noisy_values,
        train_values,
        test_noisy_values,
        test_values,
        train_idx,
        test_idx,
    )

In [None]:
# x_train_noisy, x_train, x_test_noisy, x_test = split_train_test(res_df, normalize=True)
x_train_noisy, x_train, x_test_noisy, x_test, train_idx, test_idx = split_train_test(
    res_df, normalize=False, save_idx=True
)

In [None]:
plt.plot(wavelengths, x_train_noisy[0], label="Redden")
plt.plot(wavelengths, x_train[0], label="Original")
plt.legend(loc="upper right")
plt.plot()

In [None]:
current_idx = train_idx[0]
original_spectra = string_to_float_list(res_df.loc[current_idx, "original_spectra"])
redden_spectra = string_to_float_list(res_df.loc[current_idx, "redden_spectra"])

plt.plot(wavelengths, redden_spectra, label="Redden")
plt.plot(wavelengths, original_spectra, label="Original")
plt.legend(loc="upper right")
plt.plot()