# Анализ рентгеновских транзиентов СРГ

In [None]:
import itertools
from enum import Enum
from pathlib import Path
import warnings
from typing import Optional

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from sklearnex import patch_sklearn
from tqdm.auto import tqdm

np.random.seed(42)


# Charts configurations
np.set_printoptions(precision=3)
sns.set('talk', 'whitegrid', 'deep', font_scale=1.0,
        rc={"lines.linewidth": 2, 'grid.linestyle': '--'})
pd.set_option('display.max_rows', 400, 'display.max_columns', None, 'display.max_colwidth', 100)
plt.rcParams.update({'font.size': 24})
warnings.filterwarnings('ignore')

%matplotlib inline
%config InlineBackend.figure_format = 'retina'


# Sklearn acceleration
patch_sklearn()


# Enums
TRANSIENT_SURVEY = Enum("TRANSIENT_SURVEY",
                        ["NONE", "eXVAR", "TDEs2", "FDS4", "TDEs4", "QPE", "eXVAGN3", "TDEs5", "TDEs2r7"])
TRANSIENT_CLASS = Enum("TRANSIENT_CLASS", ["NONE", "TDE", "QSO"])


# Paths
ASSEMBLED_DATA_PATH = Path("assembled_data/")
SRG_DATA_PATH = Path("srg_data/")
TRITON_DATA_PATH = Path("triton_data/")


ArrayOrSeries = np.ndarray or pd.Series


class Preconditions:
    """Класс со статичными методами для проверки условий.

    Если условие не выполняется, выбрасывается исключение ValueError
    с заданным сообщением.
    """

    @staticmethod
    def check(condition: bool, message: str):
        if not condition:
            raise ValueError(message)

    @staticmethod
    def check_pre(precondition: bool, condition: bool, message: str):
        if not precondition:
            return

        Preconditions.check(condition, message)

## Подготовка данных

Чтение данных из системы разметки

In [None]:
def mag_ab_from_flux_nanomagies(flux):
    return 22.5 - 2.5 * np.log10(flux)


def mag_ab_from_flux_jy(flux):
    return -2.5 * np.log10(flux / 3631)


def asinhmag_dm(flux: ArrayOrSeries, *, flux_err: ArrayOrSeries = None,
                flux_ivar: ArrayOrSeries = None, dm: float = 0):
    """Calculate asinh magnitude with dm shift from fluxes and errors or ivars.

    :param flux: flux [nanomaggies]
    :param flux_ivar: inverse variance of flux [1/nanomaggies**2]
    :param flux_err: flux error [nanomaggies]
    :param dm: magnitude shift

    :return: caclulated magnitudes
    """
    Preconditions.check((flux_err is not None) ^ (flux_ivar is not None), 'Specify only flux_err or flux_ivar.')

    f = flux / 1e9 * np.power(10, 0.4 * dm)
    if flux_ivar is not None:
        b = np.power(flux_ivar, -0.5) / 1e9 * np.power(10, 0.4 * dm)
    else:
        b = flux_err / 1e9 * np.power(10, 0.4 * dm)

    f, b = f.astype(np.float64), b.astype(np.float64)  # otherwise type error like
    # TypeError: loop of ufunc does not support argument 0
    # of type numpy.float64 which has no callable arcsinh method

    return (np.arcsinh(f / (2 * b)) + np.log(b)) * (-2.5 / np.log(10))


# divide panstarrs fluxes and errors by 3621e-9


def calc_ls_mags_ab(features: pd.DataFrame) -> pd.DataFrame:
    features["ls_mag_g"] = mag_ab_from_flux_nanomagies(features["ls_flux_g"])
    features["ls_mag_r"] = mag_ab_from_flux_nanomagies(features["ls_flux_r"])
    features["ls_mag_z"] = mag_ab_from_flux_nanomagies(features["ls_flux_z"])
    features["ls_mag_w1"] = mag_ab_from_flux_nanomagies(features["ls_flux_w1"])
    features["ls_mag_w2"] = mag_ab_from_flux_nanomagies(features["ls_flux_w2"])
    return features


def calc_ls_mags(features: pd.DataFrame) -> pd.DataFrame:
    features["ls_mag_g"] = asinhmag_dm(features["ls_flux_g"], flux_ivar=features["ls_flux_ivar_g"])
    features["ls_mag_r"] = asinhmag_dm(features["ls_flux_r"], flux_ivar=features["ls_flux_ivar_r"])
    features["ls_mag_z"] = asinhmag_dm(features["ls_flux_z"], flux_ivar=features["ls_flux_ivar_z"])
    features["ls_mag_w1"] = asinhmag_dm(features["ls_flux_w1"], flux_ivar=features["ls_flux_ivar_w1"])
    features["ls_mag_w2"] = asinhmag_dm(features["ls_flux_w2"], flux_ivar=features["ls_flux_ivar_w2"])
    return features


def cals_sdss_mags_ab(features: pd.DataFrame) -> pd.DataFrame:
    features["sdss_cModelMag_u"] = mag_ab_from_flux_nanomagies(features["sdss_cModelFlux_u"])
    features["sdss_cModelMag_g"] = mag_ab_from_flux_nanomagies(features["sdss_cModelFlux_g"])
    features["sdss_cModelMag_r"] = mag_ab_from_flux_nanomagies(features["sdss_cModelFlux_r"])
    features["sdss_cModelMag_i"] = mag_ab_from_flux_nanomagies(features["sdss_cModelFlux_i"])
    features["sdss_cModelMag_z"] = mag_ab_from_flux_nanomagies(features["sdss_cModelFlux_z"])
    return features


def cals_sdss_mags(features: pd.DataFrame) -> pd.DataFrame:
    features["sdss_cModelMag_u"] = asinhmag_dm(features["sdss_cModelFlux_u"], flux_ivar=features["sdss_cModelFluxIvar_u"])
    features["sdss_cModelMag_g"] = asinhmag_dm(features["sdss_cModelFlux_g"], flux_ivar=features["sdss_cModelFluxIvar_g"])
    features["sdss_cModelMag_r"] = asinhmag_dm(features["sdss_cModelFlux_r"], flux_ivar=features["sdss_cModelFluxIvar_r"])
    features["sdss_cModelMag_i"] = asinhmag_dm(features["sdss_cModelFlux_i"], flux_ivar=features["sdss_cModelFluxIvar_i"])
    features["sdss_cModelMag_z"] = asinhmag_dm(features["sdss_cModelFlux_z"], flux_ivar=features["sdss_cModelFluxIvar_z"])
    return features


def calc_ps_mags_ab(features: pd.DataFrame) -> pd.DataFrame:
    features["ps_gKronMag"] = mag_ab_from_flux_jy(features["ps_gKronFlux"])
    features["ps_rKronMag"] = mag_ab_from_flux_jy(features["ps_rKronFlux"])
    features["ps_iKronMag"] = mag_ab_from_flux_jy(features["ps_iKronFlux"])
    features["ps_zKronMag"] = mag_ab_from_flux_jy(features["ps_zKronFlux"])
    features["ps_yKronMag"] = mag_ab_from_flux_jy(features["ps_yKronFlux"])
    return features


def calc_ps_mags(features: pd.DataFrame) -> pd.DataFrame:
    features["ps_gKronMag"] = asinhmag_dm(features["ps_gKronFlux"] / 3621e-9, flux_err=features["ps_gKronFluxErr"] / 3621e-9)
    features["ps_rKronMag"] = asinhmag_dm(features["ps_rKronFlux"] / 3621e-9, flux_err=features["ps_rKronFluxErr"] / 3621e-9)
    features["ps_iKronMag"] = asinhmag_dm(features["ps_iKronFlux"] / 3621e-9, flux_err=features["ps_iKronFluxErr"] / 3621e-9)
    features["ps_zKronMag"] = asinhmag_dm(features["ps_zKronFlux"] / 3621e-9, flux_err=features["ps_zKronFluxErr"] / 3621e-9)
    features["ps_yKronMag"] = asinhmag_dm(features["ps_yKronFlux"] / 3621e-9, flux_err=features["ps_yKronFluxErr"] / 3621e-9)
    return features


def calc_ls_colors(features: pd.DataFrame) -> pd.DataFrame:
    features["ls_g-r_color"] = features["ls_mag_g"] - features["ls_mag_r"]
    features["ls_g-z_color"] = features["ls_mag_g"] - features["ls_mag_z"]
    features["ls_g-w1_color"] = features["ls_mag_g"] - features["ls_mag_w1"]
    features["ls_g-w2_color"] = features["ls_mag_g"] - features["ls_mag_w2"]
    features["ls_r-z_color"] = features["ls_mag_r"] - features["ls_mag_z"]
    features["ls_r-w1_color"] = features["ls_mag_r"] - features["ls_mag_w1"]
    features["ls_r-w2_color"] = features["ls_mag_r"] - features["ls_mag_w2"]
    features["ls_z-w1_color"] = features["ls_mag_z"] - features["ls_mag_w1"]
    features["ls_z-w2_color"] = features["ls_mag_z"] - features["ls_mag_w2"]
    features["ls_w1-w2_color"] = features["ls_mag_w1"] - features["ls_mag_w2"]
    return features


def calc_ps_colors(features: pd.DataFrame) -> pd.DataFrame:
    pass  # TODO


def calc_sdss_colors(features: pd.DataFrame) -> pd.DataFrame:
    pass  # TODO

In [None]:
# read data and rename columns

def get_column_name_mapper(prefix: str):
        return lambda column_name: f"{prefix}_{{}}".format(column_name)


meta_object_data = pd.read_parquet(SRG_DATA_PATH / "surveys_metaobject.parquet")
erosita_data = (
        pd.read_parquet(SRG_DATA_PATH / "surveys_erosita.parquet")
        .rename(get_column_name_mapper("ero"), axis=1)
)
ls_data = (
        pd.read_parquet(SRG_DATA_PATH / "surveys_ls.parquet")
        .rename(get_column_name_mapper("ls"), axis=1)
)
ps_data = (
        pd.read_parquet(SRG_DATA_PATH / "surveys_ps.parquet")
        .rename(get_column_name_mapper("ps"), axis=1)
)
sdss_data = (
        pd.read_parquet(SRG_DATA_PATH / "surveys_sdss.parquet")
        .rename(get_column_name_mapper("sdss"), axis=1)
)
gaia_data = (
        pd.read_parquet(SRG_DATA_PATH / "surveys_gaia.parquet")
        .rename(get_column_name_mapper("gaia"), axis=1)
)

In [None]:
meta_object_data.shape

In [None]:
# merge tables

print(erosita_data.shape)
erosita_data = pd.merge(left=erosita_data, right=ls_data, left_on="ero_ls_dup", right_on="ls_id", how="left")
erosita_data = pd.merge(left=erosita_data, right=ps_data, left_on="ero_ps_dup", right_on="ps_id", how="left")
erosita_data = pd.merge(left=erosita_data, right=sdss_data, left_on="ero_sdss_dup", right_on="sdss_id", how="left")
erosita_data = pd.merge(left=erosita_data, right=gaia_data, left_on="ero_gaia_dup", right_on="gaia_id", how="left")
print(erosita_data.shape)


meta_object_to_erosita_relation = pd.read_parquet(SRG_DATA_PATH / "surveys_erosita_meta_objects.parquet")
print(meta_object_to_erosita_relation.shape)
meta_object_to_erosita_relation = pd.merge(left=meta_object_data, right=meta_object_to_erosita_relation,
                                           left_on="id", right_on="metaobject_id")
meta_object_to_erosita_relation = pd.merge(left=meta_object_to_erosita_relation, right=erosita_data,
                                           left_on="erosita_id", right_on="ero_id")
print(meta_object_to_erosita_relation.shape)

In [None]:
# as we do not have master survey connection, get source with max x-ray flux for every meta object
# and save the result

group_max_xflux = list()

for _, group in tqdm(meta_object_to_erosita_relation.groupby("metaobject_id")):
        group = group.loc[group["ero_flux_05_20"] == group["ero_flux_05_20"].max()]
        group_max_xflux.append(group)

assembled_srg_data = pd.concat(group_max_xflux, axis=0)

assembled_srg_data = calc_ls_mags(assembled_srg_data)
assembled_srg_data = cals_sdss_mags(assembled_srg_data)
assembled_srg_data = calc_ps_mags(assembled_srg_data)

assembled_srg_data.to_parquet(ASSEMBLED_DATA_PATH / "srg_data.parquet", compression="GZIP")
assembled_srg_data.shape

In [None]:
assembled_srg_data.head()

# Подготовка данных спектральных наблюдений

## Чтение данных с признаками и подготовка разметки

В этих данных содержатся комментарии наблюдателей, из которых класс источника нужно вычленить в отдельный столбец

В первом приближении в качестве класса возьмем программу наблюдений

In [None]:
triton_with_photometry_path = TRITON_DATA_PATH / "x1a" / "part-00000.features.gz_pkl"
triton_data = pd.read_pickle(triton_with_photometry_path, compression="gzip")
triton_data = triton_data.loc[triton_data["Prog"].isin(TRANSIENT_SURVEY._member_names_)]

triton_data = calc_ls_mags(triton_data)
triton_data = cals_sdss_mags(triton_data)
triton_data = calc_ps_mags(triton_data)

triton_data.to_parquet(ASSEMBLED_DATA_PATH / "triton_data.parquet", compression="GZIP")

triton_data.shape

In [None]:
triton_data.head()

# Анализ с кластеризацией TBD

**План**

Кластеризация все-равно делается перебором алгоритмов, поэтому делаем так:
- Нагенерить фичей (можно начать с дефолтных -- величины и цвета)
- Применить понижение размерности / feature selection.
- Применяем кластеризации и смотрим на резальтаты в разметке <z, class>

___TODO___

- Нужно привести датафреймы к одинаковым названиям столбцов + чтобы это было воспроизводимо
- Т.е. доработать предобработку
- Есть проблема, связанная с пропусками
- Сделать гиперболические величины
- Исследуем на источниках, для которых есть DESI LIS

In [None]:
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA, SparsePCA

from dummy_str_enum import DummyStrEnum

LS_MAG_COLUMNS = DummyStrEnum.from_list(
    "LS_MAG_COLUMNS", 'ls_mag_r', 'ls_mag_g', 'ls_mag_z', 'ls_mag_w1', 'ls_mag_w2')
LS_COLOR_COLUMNS = DummyStrEnum.from_list(
    "LS_COLOR_COLUMNS",
    'ls_color_g-r', 'ls_color_g-z', 'ls_color_g-w1', 'ls_color_g-w2',
    'ls_color_r-z', 'ls_color_r-w1', 'ls_color_r-w2',
    'ls_color_z-w1', 'ls_color_z-w2',
    'ls_color_w1-w2',
)

PS_MAG_COLUMNS = DummyStrEnum.from_list(
    "PS_MAG_COLUMNS", 'ps_gKronMag', 'ps_rKronMag', 'ps_iKronMag', 'ps_zKronMag', 'ps_yKronMag')
SDSS_MAG_COLUMNS = DummyStrEnum.from_list(
    "SDSS_MAG_COLUMNS", 'sdss_cModelMag_u', 'sdss_cModelMag_g', 'sdss_cModelMag_r',
    'sdss_cModelMag_i', 'sdss_cModelMag_z')
WISE_FORCED_MAG_COLUMNS = DummyStrEnum("WISE_FORCED_MAG_COLUMNS", w1mag="ps_w1mag", w2mag="ps_w2mag")
XRAY_COLUMNS = DummyStrEnum.from_list("XRAY_COLUMNS", "ln_flux_05_20")

ALL_FEATURES = LS_MAG_COLUMNS + PS_MAG_COLUMNS + SDSS_MAG_COLUMNS + WISE_FORCED_MAG_COLUMNS + XRAY_COLUMNS

In [None]:
srg_data = pd.read_parquet(ASSEMBLED_DATA_PATH / "srg_data.parquet")
srg_data[XRAY_COLUMNS.ln_flux_05_20] = np.log10(srg_data["ero_flux_05_20"])
srg_data.head()

In [None]:
triton_data = pd.read_parquet(ASSEMBLED_DATA_PATH / "triton_data.parquet")
# triton_data[XRAY_COLUMNS.ln_flux_05_20] = np.log10(triton_data["ero_flux_05_20"])
triton_data.head()

In [None]:
# calculate features

def calculate_colors(data: pd.DataFrame, passbands: list[str], mag_column_template: str, color_column_template: str):
    for pb1, pb2 in itertools.combinations(passbands, 2):
        # print(color_column_template.format(pb1, pb2))
        data[color_column_template.format(pb1, pb2)] = (
                data[mag_column_template.format(pb1)] - data[mag_column_template.format(pb2)]
        )

    return data


def calculate_ls_colors(data: pd.DataFrame, mag_colunm_template="ls_mag_{}", color_column_template="ls_color_{}-{}"):
    return calculate_colors(data, ["g", "r", "z", "w1", "w2"], mag_colunm_template, color_column_template)


srg_data = calculate_ls_colors(srg_data)
triton_data = calculate_ls_colors(triton_data)

In [None]:
def nans_anal(df: pd.DataFrame) -> pd.Series:
    """Analyse, how many nans are in magnitudes.

    :param df:
    :return: mask specified in result_mask variable (see code lol)
    """
    df = df.replace([-np.inf, np.inf], np.nan)

    notna_mask_ps = (
            df[PS_MAG_COLUMNS.elements.values()].notna()
            & (df[PS_MAG_COLUMNS.elements.values()] >= 0)
    ).all(axis=1)
    notna_mask_ls = df[LS_MAG_COLUMNS.elements.values()].notna().all(axis=1)
    notna_mask_sdss = df[SDSS_MAG_COLUMNS.elements.values()].notna().all(axis=1)
    notna_mask_wf = (
            df[WISE_FORCED_MAG_COLUMNS.elements.values()].notna()
            & (df[WISE_FORCED_MAG_COLUMNS.elements.values()] >= 0)
    ).all(axis=1)

    print("Total objects:", df.shape[0])
    print("Has all LS:", notna_mask_ls.sum())
    print("Has all PS:", notna_mask_ps.sum())
    print("Has all SDSS:", notna_mask_sdss.sum())
    print("Has all WISE FORCED:", notna_mask_wf.sum())

    print("Has all LS and PS:", (notna_mask_ls & notna_mask_ps).sum())
    print("Has all SDSS and PS:", (notna_mask_sdss & notna_mask_ps).sum())
    print("Has all LS and SDSS:", (notna_mask_ls & notna_mask_sdss).sum())

    print("Has all LS and PS and SDSS", (notna_mask_ps & notna_mask_sdss & notna_mask_ls).sum())

    result_mask = notna_mask_ls
    print(result_mask.sum())
    return result_mask


print("SRG Data:")
srg_data = srg_data.loc[nans_anal(srg_data)]
print("\n\nTriton Data:")
triton_data = triton_data.loc[nans_anal(triton_data)]

In [None]:
# Expert features by mesch
EXPERT_FEATURES_LIST = ['ls_star_zw2_rz_score', 'ls_X/w1_h', 'ls_Fx/Fz']


def calculate_expert_features_with_fx(df: pd.DataFrame) -> pd.DataFrame:
    """Вычисление экспертных признаков, хорошо разделяющих
    спектральные классы источников (звезды и квазары, галактики)
    по фотометрии DESI LIS и рентгеновскому потоку 0.5--2.0 кэв.

    Признаки:
    - ls_X/w1_h = (w1 - 2.699) - (-1.625 * lgFx - 8.8)
    - ls_Fx/Fz = z - (-2.5 * lgFx - 18)

    :param df: датафрейм с величинами
    :return: исходный датафрейм с добавленными признаками
    """

    Preconditions.check("ero_flux_05_20" in df.columns, "There must be ero_flux_05_20 column")
    Preconditions.check("ls_mag_w1" in df.columns, "There must be ls_mag_w1 column")
    Preconditions.check("ls_mag_z" in df.columns, "There must be ls_mag_z column")

    df['lg(Fx)'] = np.log10(df['ero_flux_05_20'])
    df['ls_X/w1_h']= (df['ls_mag_w1'] - 2.699) - (-1.625 * df['lg(Fx)'] - 8.8)

    df['-2.5lg(Fx)-18'] = -2.5*df['lg(Fx)'] - 18
    df['ls_Fx/Fz'] = df['ls_mag_z'] - df['-2.5lg(Fx)-18']
    return df


def calculate_expert_features(df: pd.DataFrame) -> pd.DataFrame:
    """Вычисление экспертных признаков, хорошо разделяющих
    спектральные классы источников (звезды и квазары, галактики)
    по фотометрии DESI LIS.

    Признаки:
    - ls_star_zw2_rz_score = z-w1 - (0.4 * r-z - 1.35)

    :param df: датафрейм с величинами
    :return: исходный датафрейм с добавленными признаками
    """

    Preconditions.check("ls_color_z-w2" in df.columns, "There must be ls_color_z-w2 column")
    Preconditions.check("ls_color_r-z" in df.columns, "There must be ls_color_r-z column")

    a_zw2_rz = 0.40
    b_zw2_rz = -1.35
    df['ls_star_zw2_rz_score'] = df["ls_color_z-w2"] - (a_zw2_rz*(df['ls_color_r-z']) + b_zw2_rz)

    return df


srg_data = calculate_expert_features_with_fx(srg_data)
srg_data = calculate_expert_features(srg_data)
triton_data = calculate_expert_features(triton_data)

### Графики с экспертными признаками и разделяющими плоскостями

In [None]:
# clustering

from sklearn.preprocessing import StandardScaler

features_array = srg_data[EXPERT_FEATURES_LIST + list((LS_MAG_COLUMNS).elements.values()) + [XRAY_COLUMNS.ln_flux_05_20]]
features_array = StandardScaler().fit_transform(features_array)

labels = KMeans(n_clusters=6, algorithm="full").fit_predict(features_array)
srg_data["clust_id"] = labels

In [None]:
pca = PCA(n_components=features_array.shape[1]).fit(features_array)
pca.explained_variance_ratio_

In [None]:
# Y: 'ls_Fx/Fz'
# X: 'decals8tr_Lw1-Lw2'

def line3lin(_x:np.array, _x1:float, _y1:float, _x2:float, _y2:float, _a1:float, _a2:float, _a3:float) -> np.array:
    '''
    Line
    x: np.array
    x<=x1 : y=a1*x + (y - a1*x)
    x> x1 : y=a2*x + (y - a1*x)
    '''
    _b1 = _y1 - _a1*_x1
    _b2 = _y1 - _a2*_x1
    _b3 = _y2 - _a3*_x2

    _y = _a1*_x + _b1
    _y[_x>_x1] =_a2*_x[_x>_x1] + _b2
    _y[_x>_x2] =_a3*_x[_x>_x2] + _b3

    return _y


def chart3lin(df: pd.DataFrame, *, labels: ArrayOrSeries = None,
              label_colors: list[str] = None, cmap="rainbow", marker_size=1, **axes_kws):
    x1=-0.7; y1=3; x2=0.3; y2=-1; a1=0; a2=-4; a3=0
    x = np.arange(-3,3,0.1)
    y = line3lin(x, x1, y1, x2, y2, a1, a2, a3)

    Preconditions.check_pre(
        labels is not None and label_colors is not None,
        labels is not None and label_colors is not None and np.unique(labels).shape[0] == len(label_colors),
        "Number of label colors must be equal to number of labels"
    )

    markers = itertools.cycle(["x", "+", ".", "v", "^"])

    if labels is None:
        labels = np.zeros(df.shape[0])
        unique_labels = [0]
        label_colors = ["blue"]
    else:
        unique_labels = np.unique(labels)
        if label_colors is None:
            cmap = mpl.cm.get_cmap(cmap)
            label_colors = [cmap(koef) for koef in np.linspace(0, 1, len(unique_labels))]

    fig, ax = plt.subplots(1, 1, figsize=(17, 10))

    for label, label_color in zip(unique_labels, label_colors):
        row_indices = np.where(labels == label)
        df_l = df.iloc[row_indices]
        marker = next(markers)
        ax.scatter(df_l["ls_color_w1-w2"], df_l['ls_Fx/Fz'], s=marker_size, color=label_color, marker=marker)

    ax.plot(x, y, ls="--", color="black")
    ax.set(xlabel="decals8tr_Lw1-Lw2", ylabel="ls_Fx/Fz")
    ax.set(**axes_kws)
    ax.grid(ls=":")

    fig.tight_layout()
    fig.show()


chart3lin(srg_data, labels=labels,
          xlim=(-2, 2), ylim=(-9, 9), marker_size=6)
chart3lin(srg_data, labels=labels,
          xlim=(-5, 5), ylim=(-10, 15), xticks=np.linspace(-5, 5, 21), yticks=np.linspace(-10, 14, 13), title="Full", marker_size=4)

In [None]:
# X: 'ls_star_zw2_rz_score'
# Y: 'ls_X/w1_h'

def line2lin(_x:np.array, _x1:float, _y1:float, _a1:float, _a2:float) -> np.array:
    '''
    Line
    x: np.array
    x<=x1 : y=a1*x + (y - a1*x)
    x> x1 : y=a2*x + (y - a1*x)
    '''
    _b1 = _y1 - _a1*_x1
    _b2 = _y1 - _a2*_x1

    _y = _a1*_x + _b1
    _y[_x>_x1] =_a2*_x[_x>_x1] + _b2

    return _y


def chart2lin(df: pd.DataFrame, *, labels: ArrayOrSeries = None,
              label_colors: list[str] = None, cmap="rainbow", marker_size=1, **axes_kws):
    x1=0.6; y1=-0.2; a1=-100; a2=-0.25
    x = np.arange(0,10,0.1)
    y = line2lin(x, x1, y1, a1, a2)

    Preconditions.check_pre(
        labels is not None and label_colors is not None,
        labels is not None and label_colors is not None and np.unique(labels).shape[0] == len(label_colors),
        "Number of label colors must be equal to number of labels"
    )

    markers = itertools.cycle(["x", "+", ".", "v", "^"])

    if labels is None:
        labels = np.zeros(df.shape[0])
        unique_labels = [0]
        label_colors = ["blue"]
    else:
        unique_labels = np.unique(labels)
        if label_colors is None:
            cmap = mpl.cm.get_cmap(cmap)
            label_colors = [cmap(koef) for koef in np.linspace(0, 1, len(unique_labels))]

    fig, ax = plt.subplots(1, 1, figsize=(17, 10))

    for label, label_color in zip(unique_labels, label_colors):
        row_indices = np.where(labels == label)
        df_l = df.iloc[row_indices]
        marker = next(markers)
        ax.scatter(df_l['ls_star_zw2_rz_score'], df_l['ls_X/w1_h'], s=marker_size, color=label_color, marker=marker)

    ax.plot(x, y, ls="--", color="black")
    ax.set(xlabel='ls_star_zw2_rz_score', ylabel='ls_X/w1_h')
    ax.set(**axes_kws)
    ax.grid(ls=":")

    fig.tight_layout()
    fig.show()


chart2lin(srg_data, labels=labels, xlim=(-4, 10), ylim=(-9, 9), marker_size=6)
chart2lin(srg_data, labels=labels, xlim=(-4, 10), ylim=(-9, 15), yticks=np.linspace(-8, 14, 12), title="Full", marker_size=4)

In [None]:
# KDE plots
import scipy.stats as st


def calculate_kde(x: ArrayOrSeries, y: ArrayOrSeries, *,
             xlim: tuple[float, float], ylim: tuple[float, float], grid_step: float = 0.1
             ) -> (np.array, np.array, np.array):
    xmin, xmax = xlim
    ymin, ymax = ylim
    xx, yy = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
    positions = np.vstack([xx.ravel(), yy.ravel()])
    values = np.vstack([x, y])
    kernel = st.gaussian_kde(values)
    f = np.reshape(kernel(positions).T, xx.shape)
    return xx, yy, f


def CustomCmap(from_rgba,to_rgba):
    r1,g1,b1,_ = from_rgba
    r2,g2,b2,_ = to_rgba
    cdict = {'red': ((0, r1, r1),
                     (1, r2, r2)),
             'green': ((0, g1, g1),
                       (1, g2, g2)),
             'blue': ((0, b1, b1),
                      (1, b2, b2))}

    cmap = mpl.colors.LinearSegmentedColormap('custom_cmap', cdict)
    return cmap


def chart_lin_kde(df: pd.DataFrame, xcol: str, ycol: str, line: Optional[tuple[np.array, np.array]], *,
                  nrowscols: tuple[int, int], xlim: tuple[int, int], ylim: tuple[int, int], axsize=(5, 5),
                  labels: ArrayOrSeries = None, label_colors: list[str] = None, cmap="rainbow", marker_size=1, **axes_kws):

    Preconditions.check_pre(
        labels is not None and label_colors is not None,
        labels is not None and label_colors is not None and np.unique(labels).shape[0] == len(label_colors),
        "Number of label colors must be equal to number of labels"
    )

    print(f"Hint: xcol={xcol}\t min={df[xcol].min():.5f}\t max={df[xcol].max():.5f}")
    print(f"Hint: ycol={ycol}\t min={df[ycol].min():.5f}\t max={df[ycol].max():.5f}")

    if labels is None:
        labels = np.zeros(df.shape[0])
        unique_labels = [0]
        label_colors = ["blue"]
    else:
        unique_labels = np.unique(labels)
        if label_colors is None:
            cmap = mpl.cm.get_cmap(cmap)
            label_colors = [cmap(koef) for koef in np.linspace(0, 1, len(unique_labels))]

    Preconditions.check(nrowscols[0] * nrowscols[1] >= len(unique_labels), "Not enough axes!")

    fig, axs = plt.subplots(nrowscols[0], nrowscols[1], figsize=(axsize[1] * nrowscols[1], axsize[0] * nrowscols[0]))

    clusters_kdes = dict()

    for ax_i in range(len(unique_labels)):
        ax = axs.flat[ax_i]
        for cluster_i, (label, label_color) in enumerate(zip(unique_labels, label_colors)):
            row_indices = np.where(labels == label)
            df_l = df.iloc[row_indices]

            if ax_i == cluster_i:
                marker = "."
                ax.scatter(df_l[xcol], df_l[ycol], s=marker_size, color=label_color, marker=marker, label=f"Cluster No. {label}")
                ax.legend()
                continue

            try:
                kde = clusters_kdes[label]
            except KeyError:
                kde = calculate_kde(df_l[xcol], df_l[ycol], xlim=xlim, ylim=ylim)
                clusters_kdes[label] = kde

            cset = ax.contour(*kde, levels=[0.01, 0.05, 0.1, 0.2, 0.5],cmap=CustomCmap(label_color, label_color), linewidths=1)
            ax.clabel(cset, inline=1, fontsize=10)

        if line is not None:
            ax.plot(*line, ls="--", color="black")

        ax.set(xlabel=xcol, ylabel=ycol)
        ax.set(xlim=xlim, ylim=ylim, **axes_kws)
        ax.grid(ls=":")

    for i, axs_row in enumerate(axs):
        for j, ax in enumerate(axs_row):
            if i < axs.shape[0] - 1:
                ax.set(xticklabels=[], xlabel="")

            if j > 0:
                ax.set(yticklabels=[], ylabel="")

    fig.tight_layout()
    return fig

In [None]:
x1=-0.7; y1=3; x2=0.3; y2=-1; a1=0; a2=-4; a3=0
x = np.arange(-3,3,0.1)
y = line3lin(x, x1, y1, x2, y2, a1, a2, a3)

chart_lin_kde(srg_data, "ls_color_w1-w2", 'ls_Fx/Fz', (x, y),
              labels=labels, nrowscols=(2, 3), xlim=(-4, 4), ylim=(-9, 15), yticks=np.linspace(-8, 14, 12), marker_size=4).show()

In [None]:
x1=0.6; y1=-0.2; a1=-100; a2=-0.25
x = np.arange(0,10,0.1)
y = line2lin(x, x1, y1, a1, a2)

chart_lin_kde(srg_data, 'ls_star_zw2_rz_score', 'ls_X/w1_h', (x, y),
              labels=labels, nrowscols=(2, 3), xlim=(-4, 10), ylim=(-9, 15), yticks=np.linspace(-8, 14, 12), marker_size=4).show()

In [None]:
chart_lin_kde(srg_data, 'ls_mag_g', 'ls_mag_w1', None,
              labels=labels, nrowscols=(2, 3), xlim=(5, 30), ylim=(5, 30), marker_size=4).show()

In [None]:
chart_lin_kde(srg_data, 'ls_color_w1-w2', 'ls_color_g-z', None,
              labels=labels, nrowscols=(2, 3), xlim=(-3, 3), ylim=(-4, 5), marker_size=4).show()

# Последовательное сравнение алгоритмов

In [None]:
from datetime import datetime
import os

from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans, AffinityPropagation, MeanShift, SpectralClustering, DBSCAN, OPTICS
from tqdm.auto import tqdm

In [None]:
def line2lin(_x:np.array, _x1:float, _y1:float, _a1:float, _a2:float) -> np.array:
    '''
    Line
    x: np.array
    x<=x1 : y=a1*x + (y - a1*x)
    x> x1 : y=a2*x + (y - a1*x)
    '''
    _b1 = _y1 - _a1*_x1
    _b2 = _y1 - _a2*_x1

    _y = _a1*_x + _b1
    _y[_x>_x1] =_a2*_x[_x>_x1] + _b2

    return _y


def line3lin(_x:np.array, _x1:float, _y1:float, _x2:float, _y2:float, _a1:float, _a2:float, _a3:float) -> np.array:
    '''
    Line
    x: np.array
    x<=x1 : y=a1*x + (y - a1*x)
    x> x1 : y=a2*x + (y - a1*x)
    '''
    _b1 = _y1 - _a1*_x1
    _b2 = _y1 - _a2*_x1
    _b3 = _y2 - _a3*_x2

    _y = _a1*_x + _b1
    _y[_x>_x1] =_a2*_x[_x>_x1] + _b2
    _y[_x>_x2] =_a3*_x[_x>_x2] + _b3

    return _y

In [None]:
# TODO

DRAW = True
CHARTS_PATH = Path(f"charts") / f"{datetime.now().strftime('%Y-%m-%d %H-%M-%S')}"

features_array = srg_data[EXPERT_FEATURES_LIST + list((LS_MAG_COLUMNS).elements.values()) + [XRAY_COLUMNS.ln_flux_05_20]]
features_array = StandardScaler().fit_transform(features_array)

nrowscols=(2, 4)

models = [
    KMeans(n_clusters=4, algorithm="full"),
    KMeans(n_clusters=6, algorithm="full"),
    KMeans(n_clusters=8, algorithm="full"),
    DBSCAN(),
]

for i, model in tqdm(enumerate(models)):
    print(f"=== {i}\t{model} ===")

    MODEL_CHARTS_PATH = CHARTS_PATH / f"{i:02d}"
    os.makedirs(MODEL_CHARTS_PATH)

    with open(MODEL_CHARTS_PATH / "model_description.txt", "w") as fout:
        fout.write(f"{model}\n{model.__class__.__name__}\n{model.get_params()}")

    labels = model.fit_predict(features_array)
    number_of_labels = np.unique(labels).shape[0]
    nrowscols = (number_of_labels // 4 + 1, 4)
    print(nrowscols)

    if not DRAW:
        continue

    x1=-0.7; y1=3; x2=0.3; y2=-1; a1=0; a2=-4; a3=0
    x = np.arange(-3,3,0.1)
    y = line3lin(x, x1, y1, x2, y2, a1, a2, a3)

    xcol, ycol = "ls_color_w1-w2", 'ls_Fx/Fz'
    FILENAME = f"{xcol}-x-{ycol}.png".replace("/", "").replace(":", "")
    fig = chart_lin_kde(srg_data, xcol, ycol, (x, y),
                  labels=labels, nrowscols=nrowscols, xlim=(-4, 4), ylim=(-9, 15), yticks=np.linspace(-8, 14, 12), marker_size=4)
    fig.savefig(MODEL_CHARTS_PATH / FILENAME)
    plt.close()

    x1=0.6; y1=-0.2; a1=-100; a2=-0.25
    x = np.arange(0,10,0.1)
    y = line2lin(x, x1, y1, a1, a2)

    xcol, ycol = 'ls_star_zw2_rz_score', 'ls_X/w1_h'
    FILENAME = f"{xcol}-x-{ycol}.png".replace("/", "").replace(":", "")
    fig = chart_lin_kde(srg_data, xcol, ycol, (x, y),
              labels=labels, nrowscols=nrowscols, xlim=(-4, 10), ylim=(-9, 15), yticks=np.linspace(-8, 14, 12), marker_size=4)
    fig.savefig(MODEL_CHARTS_PATH / FILENAME)
    plt.close()

    xcol, ycol = 'ls_mag_g', 'ls_mag_w1'
    FILENAME = f"{xcol}-x-{ycol}.png".replace("/", "").replace(":", "")
    fig = chart_lin_kde(srg_data, xcol, ycol, None,
                        labels=labels, nrowscols=nrowscols, xlim=(5, 30), ylim=(5, 30), marker_size=4)
    fig.savefig(MODEL_CHARTS_PATH / FILENAME)
    plt.close()

    xcol, ycol = 'ls_color_w1-w2', 'ls_color_g-z'
    FILENAME = f"{xcol}-x-{ycol}.png".replace("/", "").replace(":", "")
    fig = chart_lin_kde(srg_data, xcol, ycol, None,
                        labels=labels, nrowscols=nrowscols, xlim=(-3, 3), ylim=(-4, 5), marker_size=4)
    fig.savefig(MODEL_CHARTS_PATH / FILENAME)
    plt.close()
