#### Classify your own X-ray source

---

To classify your own X-ray source (or multiple sources), ensure that you have the following attributes with valid values:

`'hard_hm', 'hard_hs', 'hard_ms', 'powlaw_gamma', 'bb_kt', 'var_prob_*', 'var_mean_*', 'var_sigma_*', 'var_max_b', 'var_min_b'`

Here, the asterisk (*) represents 'b', 'h', or 's'.

**Note:** Use data from individual detections, not aggregated master properties. The more detections you have, the more robust and insightful your classification will be.

For detailed descriptions of each feature and how to obtain them, consult the [Chandra Source Catalog documentation](https://cxc.harvard.edu/csc/).

Proceed by running the cells below. They contain all the necessary code and modified functions to classify a new X-ray source based on the original dataset and methodology presented in our paper.

In [1]:
import numpy as np
import pandas as pd
import pickle

from umlcaxs_lib import votable_to_pandas, mahal_classifier_all
from sklearn.preprocessing import MinMaxScaler

from collections import Counter
import itertools


# Feature definitions
features = ['hard_hm', 'hard_hs', 'hard_ms', 'powlaw_gamma', 'bb_kt', 'var_prob_b', 'var_ratio_b', 'var_prob_h', 'var_ratio_h', 'var_prob_s', 'var_ratio_s', 'var_newq_b']
features_lognorm = ['bb_kt', 'var_ratio_h', 'var_ratio_b', 'var_ratio_s', 'var_newq_b']
features_norm = ['powlaw_gamma']

In [2]:
def transform_target(original_data, target_data, features, features_norm, features_lognorm):
    transformed_target = target_data.copy()
    min_max_scaler = MinMaxScaler(feature_range=(0, 1))
    
    for feature in features:
        if feature in original_data.columns and feature in transformed_target:
            
            if feature in features_lognorm:
                min_val = np.min(original_data[feature][original_data[feature] != 0]) / 10
                transformed_data = np.log(original_data[feature] + min_val)
            elif feature in features_norm:
                transformed_data = original_data[feature]
            else:
                continue
            
            scaler = min_max_scaler.fit(transformed_data.values.reshape(-1, 1))
            
            target_feature_data = np.array([transformed_target[feature]])
            
            if feature in features_lognorm:
                target_feature_data = np.log(target_feature_data + min_val)
            
            scaled_target_feature_data = scaler.transform(target_feature_data.reshape(-1, 1))
            transformed_target[feature] = scaled_target_feature_data.flatten()[0]
    
    return transformed_target


def load_original_data(file_path, features, features_lognorm, features_norm):
    data = votable_to_pandas(file_path)
    
    # Define new measurement columns
    data['var_ratio_b'] = data['var_sigma_b'] / data['var_mean_b']
    data['var_ratio_h'] = data['var_sigma_h'] / data['var_mean_h']
    data['var_ratio_s'] = data['var_sigma_s'] / data['var_mean_s']
    data['var_newq_b'] = ((data['var_max_b'] + data['var_min_b']) / 2) / data['var_mean_b']
    
    # Drop data with missing feature values
    data_out = data.dropna(subset=features)
    
    # Normalize or log-normalize features
    return data_out

def load_your_xray(file_path, original_data, features, features_lognorm, features_norm, original):
    data = votable_to_pandas(file_path)
    
    # Calculate medians of the original dataset for each feature
    feature_medians = original[features].median()
    
    # Define new measurement columns
    data['var_ratio_b'] = data['var_sigma_b'] / data['var_mean_b']
    data['var_ratio_h'] = data['var_sigma_h'] / data['var_mean_h']
    data['var_ratio_s'] = data['var_sigma_s'] / data['var_mean_s']
    data['var_newq_b'] = ((data['var_max_b'] + data['var_min_b']) / 2) / data['var_mean_b']
    
    # Fill missing values with medians from the original dataset
    for feature in features:
        data[feature].fillna(feature_medians[feature], inplace=True)
    
    X_df = data[features]
    
    # Normalize or log-normalize features
    return transform_target(original_data, X_df, features, features_norm, features_lognorm).to_numpy(), data


---

In this example, we will classify a detection for the source '2CXO J004307.5-092302', inspired by a query from my colleague Andrea Sacchi. 

Ensure that the detection data for your source(s) is available in VOTable format.

Modify line 2 in the input argument, replacing `"data/your_xray_example.vot"` with the path to your own data file.

If your data is correctly formatted, running the subsequent cells should be good.



#### 1. Load the data and give your source a cluster.

In [15]:
cscresults = load_original_data("data/cscresults.vot", features, features_lognorm, features_norm)
X_your_xray, data_out_your_xray = load_your_xray("data/your_xray_example.vot", cscresults, features, features_lognorm, features_norm, cscresults)

# Load the GMM
with open("gmm_model.pkl", "rb") as file:
    loaded_gmm = pickle.load(file)


data_out_your_xray['main_type'] = 'NaN'
data_out_your_xray['cluster'] = loaded_gmm.predict(X_your_xray)
print("Data loaded.")

Data loaded.


#### 2. Classify your source(s).

In [4]:
ltypes = ['QSO', 'AGN', 'Seyfert_1', 'Seyfert_2', 'HMXB', 'LMXB', 'XB', 'YSO', 'TTau*', 'Orion_V*']
uks = ['Star', 'X', 'Radio', 'IR', 'Blue', 'UV', 'gamma', 'PartofG', '**']

# Load data
df_csc_simbad = pd.read_csv('out_data/cluster_csc_simbad.csv', index_col=0)
df_csc_simbad.fillna({'main_type': 'NaN'}, inplace=True)

# Preprocess data
df_csc_out = df_csc_simbad.dropna(subset=features)

df_csc_out_with_target = pd.concat([df_csc_simbad, data_out_your_xray], ignore_index=True)
df_csc_lognorm = transform_target(df_csc_out, df_csc_out_with_target, features, features_norm, features_lognorm)

# Classification
classified_df = mahal_classifier_all(df_csc_lognorm, df_csc_out_with_target, features, ltypes, uks=uks)
print("Classification done.")

  has_raised = await self.run_ast_nodes(code_ast.body, cell_name,


***Cluster 0***
***Cluster 1***
***Cluster 2***
***Cluster 3***
***Cluster 4***
***Cluster 5***
Classification done.


#### 3. Review your classification results.

In this example, we have classified a single detection for one source. The same approach can be extended to multiple detections and multiple sources. Here, we will focus on examining the classification of just one source. If you wish to explore the classification of multiple sources, you can modify the `name_source` variable accordingly.


In [37]:
name_source = '2CXO J004307.5-092302'

In [38]:
classified_df.query(f"name == '{name_source}'")

Unnamed: 0,name,obsid,cluster,ra,dec,main_type,QSO,AGN,Seyfert_1,Seyfert_2,...,hard_ms,powlaw_gamma,bb_kt,var_prob_b,var_ratio_b,var_prob_h,var_ratio_h,var_prob_s,var_ratio_s,var_newq_b
1344,2CXO J004307.5-092302,4884,5,10.781298,-9.383954,XB,0.002228,0.263051,0.00796,0.173319,...,-0.136165,2.07377,0.563729,0.501655,0.18304,0.508149,0.137342,0.562971,0.129812,1.369809


In [39]:
prob_agn = classified_df.query(f"name == '{name_source}'").QSO + classified_df.query(f"name == '{name_source}'").AGN + classified_df.query(f"name == '{name_source}'").Seyfert_1 + classified_df.query(f"name == '{name_source}'").Seyfert_2
print("P(AGN)\n", prob_agn.to_numpy())

P(AGN)
 [0.44655752]


In [40]:
prob_xb = classified_df.query(f"name == '{name_source}'").XB + classified_df.query(f"name == '{name_source}'").LMXB + classified_df.query(f"name == '{name_source}'").HMXB
print("P(XB)\n", prob_xb.to_numpy())

P(XB)
 [0.55211735]


In [41]:
prob_yso = classified_df.query(f"name == '{name_source}'").YSO + classified_df.query(f"name == '{name_source}'")['TTau*'] + classified_df.query(f"name == '{name_source}'")['Orion_V*']
print("P(YSO)\n", prob_yso.to_numpy())

P(YSO)
 [0.00132512]


#### 4. Save your classification results.

In [19]:
# Save your classification
classified_df.query(f"name == '{name_source}'").to_csv('your_source_classified.csv')

#### 5. In the case of multiple detections, create a master hard and soft classification.

In this specific example, both classifications will be identical because we are working with only one detection. However, if you have multiple detections for your source, soft and hard classifications can be different.

In [42]:
# Helper functions
def mean_std_round(x):
    return f"{round(np.mean(x), 3)}±{round(np.std(x), 3)}"

def most_common(x): 
    return Counter(x).most_common(1)[0][0]

# Soft voting
summ_table = classified_df.groupby('name')[ltypes].agg(mean_std_round)
summ_table['detection_count'] = classified_df.groupby(['name']).size()
ra_dec_df = classified_df.groupby('name')[['ra', 'dec']].first()
features_df = classified_df.groupby('name')[features].first()
summ_table = summ_table.join([ra_dec_df, features_df])
summ_table_prov = classified_df.groupby('name')[ltypes].agg(['mean', 'std'])
class_mean_names = [list(tup) for tup in itertools.product(ltypes, ['mean'], repeat=1)]
names_comp = summ_table_prov[class_mean_names].idxmax(axis=1).to_list()
master_names = [name[0] for name in names_comp]
summ_table['soft_master_class'] = master_names
summ_table.sort_values('detection_count', ascending=False, inplace=True)

# Hard voting
summ_table_hard = classified_df.groupby('name').size().to_frame('detection_count')
summ_table_hard['hard_master_class'] = classified_df.groupby('name')['main_type'].agg(most_common)
ra_dec_df = classified_df.groupby('name')[['ra', 'dec']].first()
summ_table_hard = summ_table_hard.join(ra_dec_df)
summ_table_hard.sort_values('detection_count', ascending=False, inplace=True)

In [43]:
print("Hard classification:")
print(summ_table_hard.query("name == '2CXO J004307.5-092302'")['hard_master_class'])

Hard classification:
name
2CXO J004307.5-092302    XB
Name: hard_master_class, dtype: object


In [44]:
print("Soft classification:")
print(summ_table.query("name == '2CXO J004307.5-092302'")['soft_master_class'])

Soft classification:
name
2CXO J004307.5-092302    XB
Name: soft_master_class, dtype: object
