In [None]:
import torch
from dataloader import TreeDL
from varnet import VARnet
from autoencoder import Morphologic
import pandas as pd
import xgboost as xgb
import pickle as pkl
from sklearn.metrics import confusion_matrix, classification_report, roc_auc_score, roc_curve, f1_score, precision_score, recall_score, accuracy_score, ConfusionMatrixDisplay
import numpy as np
import matplotlib.pyplot as plt
import tqdm
import joblib
from math import ceil

# dtrain = xgb.DMatrix('dtrain.dmatrix')
# dtest = xgb.DMatrix('dtest.dmatrix')
# dvalid = xgb.DMatrix('dvalid.dmatrix')
train = pd.read_parquet('train_featuretbl.parquet')
valid = pd.read_parquet('valid_featuretbl.parquet')
test = pd.read_parquet('test_featuretbl.parquet')

combined = pd.concat([train, valid, test])
objs = combined.groupby("cluster_id")
types = combined["type"].unique()

trainidxs = []
valididxs = []
testidxs = []

for type_ in types:
    indices = combined["type"] == type_
    groups = combined[indices].groupby("cluster_id")
    cids = list(groups.groups)
    n = len(cids)
    for i in cids[:ceil(0.7*n)]:
        trainidxs.extend(list(groups.get_group(i).index))
    for i in cids[ceil(0.7*n):ceil(0.85*n)]:
        valididxs.extend(list(groups.get_group(i).index))
    for i in cids[ceil(0.85*n):]:
        testidxs.extend(list(groups.get_group(i).index))

train = combined.loc[trainidxs]
valid = combined.loc[valididxs].sort_values("period_significance").groupby("cluster_id").first()
test = combined.loc[testidxs].sort_values("period_significance").groupby("cluster_id").first()

print(train.value_counts("type"))
print(valid.value_counts("type"))
print(test.value_counts("type"))

model = Morphologic(64, 7, features=2)
model.load_state_dict(torch.load("/home/mpaz/neovar/secondary/subclassifier/model/morpho.pth"))

In [None]:
progbar = tqdm.tqdm(total=400) # manually update with number of grid search combinations

# silence xgboost
xgb.set_config(verbosity=0)

def evaluate_params(n_est, max_depth):
    tree = xgb.XGBClassifier(n_estimators=n_est, max_depth=max_depth, learning_rate=0.1, n_jobs=20, objective="multi:softmax", num_class=8, verbosity=0)
    tree.fit(train.drop("type", axis=1), train["type"].values, eval_set=[(valid.drop("type", axis=1), valid["type"].values)])

    # print classification report
    pred_y = tree.predict(valid.drop("type", axis=1))
    true_y = valid["type"].values
    f1 = f1_score(true_y, pred_y, average="macro")
    precision = precision_score(true_y, pred_y, average="macro")
    recall = recall_score(true_y, pred_y, average="macro")
    ascore = accuracy_score(true_y, pred_y)

    progbar.update(1)

    return (f1, precision, recall, ascore)

def evaluate_params_many(pairs):
    results = [evaluate_params(*pair) for pair in pairs]
    return results
    

results = np.zeros((10,5))
pairs = []
for estimator_count in [5, 15, 30, 45, 60, 75, 90, 105, 120, 135]:
    for depth in [1, 5, 9, 13, 17, 25]:
        pairs.append((estimator_count, depth))

pairs = np.random.permutation(pairs).tolist() # randomize the order of the pairs for better parallelization and time estimation
chunksize=24

res = [evaluate_params(*pair) for pair in pairs]



In [None]:
tree = xgb.XGBClassifier(n_estimators=100, max_depth=8, learning_rate=0.2, n_jobs=24, verbosity=2, objective="multi:softmax", num_class=7)
tree.fit(train.drop("type", axis=1), train["type"].values, eval_set=[(valid.drop("type", axis=1), valid["type"].values)])

# print classification report
pred_y = tree.predict(test.drop("type", axis=1))
true_y = np.array(test["type"].values, dtype=int)
a = classification_report(true_y, pred_y)
disp = ConfusionMatrixDisplay(confusion_matrix(true_y, pred_y, normalize='true'), display_labels=["ea", "ew", "lpv", "rot", "rr", "cep", "yso"])
disp.plot()
print(a)

In [None]:
# print most important features
tree.save_model("model/best.tree")

In [None]:
def confusion_matrix_plot(true_y, pred_y, labels, fname):

    cm = confusion_matrix(true_y, pred_y, normalize='true')
    disp = ConfusionMatrixDisplay(cm, display_labels=labels)
    disp.plot(cmap='Blues')
    disp.im_.colorbar.remove()
    plt.gcf().set_size_inches(7.5, 7.5)
    # set size to 10x10 inches
    plt.savefig(fname, dpi=500)
    plt.show()

    
def precision_recall_graph(true_y, pred_y, labels, fname):
    u = np.pi / (2*len(labels))
    stats = classification_report(true_y, pred_y, target_names=labels, output_dict=True)
    theta1 = np.linspace(0, 2*np.pi, len(labels), endpoint=False) - u
    theta2 = np.linspace(0, 2*np.pi, len(labels), endpoint=False) + u

    ax = plt.subplot(polar=True)
    ax.bar(theta1, [stats[type_]['precision'] for type_ in labels], width=2*u - 0.025, align='center', alpha=1, edgecolor='k', color="mediumblue", linewidth=1)
    ax.bar(theta2, [stats[type_]['recall'] for type_ in labels], width=2*u - 0.025, align='center', alpha=1, edgecolor='k', color="cornflowerblue", linewidth=1)
    
    r = (np.linspace(0,2*np.pi, 10000, endpoint=False) + 0.001) % (2*np.pi)
    get_current_f1 = lambda r: stats[labels[int(np.floor(7*(r) / (2*np.pi)))]]["f1-score"]
    y = np.vectorize(get_current_f1)(r)
    ax.plot(r, y, color="red", linewidth=2)

    ax.set_xticks(theta1 + u)
    ax.set_xticklabels(labels)
    ax.legend(["F1 Score", "Precision", "Recall"], loc=(0.9, 0.9))
    ax.set_yticks([0.5, 0.75, 0.9])
    ax.xaxis.grid(True)
    plt.gcf().set_size_inches(7.5, 7.5)
    plt.savefig(fname, dpi=500)
    plt.show()

def support_pie_chart(dset, labels, fname):
    counts = [dset["type"].value_counts()[i] for i in range(len(labels))]
    # shift the labels by one to make the pie chart more readable
    counts = counts[::2] + counts[1::2]
    labels = labels[::2] + labels[1::2]   
    # make a donut chart, with the support of each class
    fig, ax = plt.subplots(figsize=(4, 3.15))

    data = counts
    recipe = [l + " " + str(c) for l, c in zip(labels, counts)]

    wedges, texts = ax.pie(data, wedgeprops=dict(width=0.5), startangle=0, colors=["mediumblue", "cornflowerblue", "darkblue", "royalblue", "navy", "blue", "dodgerblue", "skyblue"])

    bbox_props = dict(boxstyle="square,pad=0.15", fc="w", ec="k", lw=0.62)
    kw = dict(arrowprops=dict(arrowstyle="-"),
            bbox=bbox_props, zorder=0, va="center")

    for i, p in enumerate(wedges):
        ang = (p.theta2 - p.theta1)/2. + p.theta1
        y = np.sin(np.deg2rad(ang))
        x = np.cos(np.deg2rad(ang))
        horizontalalignment = {-1: "right", 1: "left"}[int(np.sign(x))]
        connectionstyle = f"angle,angleA=0,angleB={ang}"
        kw["arrowprops"].update({"connectionstyle": connectionstyle})
        ax.annotate(recipe[i], xy=(x, y), xytext=(1.5*np.sign(x), 1.8*y),
                    horizontalalignment=horizontalalignment, **kw)
    
    plt.gcf().set_size_inches(5, 3.75)
    plt.savefig(fname, dpi=400)
    plt.show()


In [None]:
cm = confusion_matrix(true_y, pred_y, normalize='true') 
labels = ["ea", "ew", "lpv", "rot", "rr", "cep", "yso"]
disp = ConfusionMatrixDisplay(cm, display_labels=labels)
disp.plot(cmap='Blues')
disp.im_.colorbar.remove()
plt.show()

stats = classification_report(true_y, pred_y, target_names=labels, output_dict=True)
theta1 = np.linspace(0, 2*np.pi, len(labels), endpoint=False) - np.pi/14 + 0.025
theta2 = np.linspace(0, 2*np.pi, len(labels), endpoint=False) + np.pi/14 + 0.025
categories = labels

ax = plt.subplot(polar=True)
ax.bar(theta1, [stats[type_]['precision'] for type_ in categories], width=2*np.pi/len(categories) - np.pi/7 - 0.025, align='center', alpha=1, edgecolor='k', color="mediumblue", linewidth=1, label="Precision")
ax.bar(theta2, [stats[type_]['recall'] for type_ in categories], width=2*np.pi/len(categories) - np.pi/7 - 0.025, align='center', alpha=1, edgecolor='k', color="cornflowerblue", linewidth=1, label="Recall")
ax.set_xticks(theta1 + np.pi/14, minor=False)
ax.set_xticklabels(categories)
# make the x tick marks invisible but keep the labels
ax.tick_params(axis='x', which='both', length=0)
# each pi/7 degrees, draw a dotted spoke
for i in range(len(categories)):
    ax.plot([theta1[i] - np.pi/14, theta1[i]- np.pi/14], [0, 1], color="grey", linestyle="--", label=None)


r = (np.linspace(0,2*np.pi, 10000, endpoint=False) + 0.1) % (2*np.pi)
get_current_f1 = lambda r: stats[labels[int(np.floor(7*(r - 0.025 + np.pi/7) / (2*np.pi))) % len(labels)]]["f1-score"]
y = np.vectorize(get_current_f1)(r)
ax.plot(r, y, color="red", linewidth=1.25)

# set legend, cornflower blue = recall, medium blue = precision in very upper right

ax.legend(loc=(0.9, 0.9))

ax.set_yticks([0.5, 0.9, 1])
ax.set_ylim(0, 1)
ax.yaxis.grid(True, linestyle='--', color='black')
ax.xaxis.grid(False)
plt.savefig("precision.png", dpi=500)
plt.show()

In [None]:
bst = tree.get_booster()
p = bst.inplace_predict(test.drop("type", axis=1).values, predict_type="margin")
pred = np.argmax(p, axis=1)
prob = np.max(np.exp(p) / np.sum(np.exp(p), axis=1).reshape(-1,1), axis=1)
true_y = np.array(test["type"].values, dtype=int)
a = classification_report(true_y, pred_y)
disp = ConfusionMatrixDisplay(confusion_matrix(true_y, pred), display_labels=["ea", "ew", "lpv", "rot", "rr", "cep", "yso"])
disp.plot()
print(a)

confusion_matrix_plot(true_y, pred, ["ea", "ew", "lpv", "rot", "rr", "cep", "yso"], "confusion_matrix.png")
precision_recall_graph(true_y, pred, ["ea", "ew", "lpv", "rot", "rr", "cep", "yso"], "precision_recall.png")
support_pie_chart(test.groupby("cluster_id").first(), ["ea", "ew", "lpv", "rot", "rr", "cep", "yso"], "test_support.png")
support_pie_chart(train.groupby("cluster_id").first(), ["ea", "ew", "lpv", "rot", "rr", "cep", "yso"], "train_support.png")